# SPDX-License-Identifier: MPL-2.0
# Copyright 2020-2022 John Mille <john@compose-x.io>
"""
Module to define the ComposeX Resources into a simple object to make it easier to navigate through.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Union
if TYPE_CHECKING:
from ecs_composex.common.settings import ComposeXSettings
from ecs_composex.ecs.ecs_family import ComposeFamily
import json
import re
from copy import deepcopy
import jsonschema
from compose_x_common.aws import get_account_id
from compose_x_common.compose_x_common import (
attributes_to_mapping,
keyisset,
set_else_none,
)
from troposphere import AWSObject, Export, FindInMap, GetAtt, Join, Output, Ref, Sub
from troposphere.ecs import Environment
from ecs_composex.common import NONALPHANUM, get_nested_property
from ecs_composex.common.aws import (
define_lookup_role_from_info,
find_aws_resource_arn_from_tags_api,
)
from ecs_composex.common.cfn_conditions import define_stack_name
from ecs_composex.common.cfn_params import Parameter
from ecs_composex.common.ecs_composex import CFN_EXPORT_DELIMITER as DELIM
from ecs_composex.common.ecs_composex import TAGS_SEPARATOR, X_KEY
from ecs_composex.common.logging import LOG
from ecs_composex.common.troposphere_tools import (
add_parameters,
add_update_mapping,
add_update_parameter_recursively,
)
from ecs_composex.mods_manager import XResourceModule
from ecs_composex.resource_settings import get_parameter_settings
ENV_VAR_NAME = re.compile(r"([^a-zA-Z0-9_]+)")
[docs]class XResource:
"""
Class to represent each defined resource in the template
:cvar dict policies_scaffolds: IAM policies template to use to generate IAM policies for the given resource
:ivar str name: The name of the resource as defined in compose file
:ivar dict definition: The definition of the resource as defined in compose file
:ivar str logical_name: Name of the resource to use in CFN template as for export/import
:ivar bool requires_vpc: Whether or not the resource requires a VPC to function (i.e. RDS)
"""
def __init__(
self,
name: str,
definition: dict,
module: XResourceModule,
settings: ComposeXSettings,
):
"""
:param str name: Name of the resource in the template
:param dict definition: The definition of the resource as-is
:param ecs_composex.common.settings.ComposeXSettings settings:
"""
if not isinstance(module, XResourceModule):
raise TypeError(
name, "module must be", XResourceModule, "Got", module, type(module)
)
self.module = module
self.validate_schema(name, definition, module.mod_key)
self.name = name
self.requires_vpc = False
self.arn = None
self.iam_manager = None
self.cloud_control_attributes_mapping = {}
self.native_attributes_mapping = {}
self.definition = deepcopy(definition)
self.env_names = []
self.env_vars = []
self.validators = []
self.logical_name = NONALPHANUM.sub("", self.name)
self.settings = set_else_none("Settings", definition, alt_value={})
self._parameters = {}
self.lookup = set_else_none("Lookup", definition, alt_value={})
if self.lookup:
self.lookup_session = define_lookup_role_from_info(
self.lookup, settings.session
)
self.properties = {}
else:
self.lookup_session = settings.session
self.properties = set_else_none("Properties", definition)
self.support_defaults: bool = False
self.scaling = set_else_none("Scaling", self.definition)
self.scaling_target = None
self.cfn_resource = None
self.output_properties = {}
self.outputs = []
self.attributes_outputs = {}
self.is_nested = False
self.stack = None
self.ref_parameter = None
self.lookup_properties = {}
self.cloud_control_properties: dict = {}
self.mappings = {}
self.default_tags = {
f"compose-x{TAGS_SEPARATOR}module": self.module.mod_key,
f"compose-x{TAGS_SEPARATOR}resource_name": self.name,
f"compose-x{TAGS_SEPARATOR}logical_name": self.logical_name,
}
self.cloudmap_settings = set_else_none("x-cloudmap", self.settings, {})
self.default_cloudmap_settings = {}
self.cloudmap_dns_supported = False
self.policies_scaffolds = module.iam_policies
self.resource_policy = None
def __repr__(self):
return self.logical_name
@property
def parameters(self) -> dict:
return set_else_none("MacroParameters", self.definition, alt_value={})
@property
def uses_default(self) -> bool:
return not any([self.lookup, self.parameters, self.properties])
@property
def env_var_prefix(self) -> str:
return ENV_VAR_NAME.sub("", self.name.replace("-", "_").upper())
@property
def compose_x_arn(self) -> str:
return f"{self.module.res_key}::{self.name}"
@property
def property_to_parameter_mapping(self):
mapping = {}
if not self.attributes_outputs:
return mapping
for parameter in self.output_properties:
if parameter.return_value:
mapping[parameter.return_value] = parameter
else:
mapping[parameter.title] = parameter
return mapping
@property
def mod_res_key(self) -> str:
return self.module.res_key
@property
def mod_mapping_key(self) -> str:
return self.module.mapping_key
[docs] def validate(self, settings, root_stack=None, *args, **kwargs) -> None:
"""
Function to implement self-validation for the resource and the execution settings.
:param settings:
:param root_stack:
:param args:
:param kwargs:
"""
[docs] def validate_schema(
self, name, definition, module_name, module_schema: str = None
) -> None:
"""
JSON Validation of the resources module validation
"""
from jsonschema import Draft7Validator
from ecs_composex.specs import REGISTRY as COMPOSE_REGISTRY
if not self.module.json_schema and not module_schema:
return
_eval = Draft7Validator(self.module.json_schema, registry=COMPOSE_REGISTRY)
try:
_eval.validate(definition)
except jsonschema.exceptions.ValidationError:
LOG.error(f"{module_name}.{name} - Definition is not conform to schema.")
raise
[docs] def cloud_control_attributes_mapping_lookup(
self, resource_type, resource_id, **kwargs
) -> tuple[dict, dict]:
"""
Method to map the resource properties to the CCAPI description
:return:
"""
client = self.lookup_session.client("cloudcontrol")
try:
props_r = client.get_resource(
TypeName=resource_type, Identifier=resource_id, **kwargs
)
properties = json.loads(props_r["ResourceDescription"]["Properties"])
props = attributes_to_mapping(
properties, self.cloud_control_attributes_mapping
)
return props, properties
except client.exceptions.UnsupportedActionException:
LOG.warning("Resource not yet supported by Cloud Control API")
return {}, {}
[docs] def native_attributes_mapping_lookup(self, account_id, resource_id, function):
properties = function(self, account_id, resource_id)
if self.native_attributes_mapping:
conform_mapping = attributes_to_mapping(
properties, self.native_attributes_mapping
)
return conform_mapping
return properties
[docs] def init_outputs(self):
"""
Placeholder method
"""
self.output_properties = {}
[docs] def lookup_resource(
self,
arn_re,
native_lookup_function,
cfn_resource_type,
tagging_api_id,
subattribute_key=None,
use_arn_for_id: bool = False,
):
"""
Method to self-identify properties. It will try to use AWS Cloud Control API if possible, otherwise fallback
to using boto3 descriptions functions to create a mapping of the attributes.
"""
self.init_outputs()
lookup_attributes = self.lookup
if subattribute_key is not None:
lookup_attributes = self.lookup[subattribute_key]
if keyisset("Arn", lookup_attributes):
LOG.info(f"{self.module.res_key}.{self.name} - Lookup via ARN")
LOG.debug(
f"{self.module.res_key}.{self.name} - ARN is {lookup_attributes['Arn']}"
)
arn_parts = arn_re.match(lookup_attributes["Arn"])
if not arn_parts:
raise KeyError(
f"{self.module.res_key}.{self.name} - ARN {lookup_attributes['Arn']} is not valid. Must match",
arn_re.pattern,
)
self.arn = lookup_attributes["Arn"]
resource_id = arn_parts.group("id")
account_id = arn_parts.group("accountid")
elif keyisset("Tags", lookup_attributes):
LOG.info(f"{self.module.res_key}.{self.name} - Lookup via Tags")
LOG.debug(
f"{self.module.res_key}.{self.name} - Lookup tags -> {lookup_attributes}"
)
self.arn = find_aws_resource_arn_from_tags_api(
lookup_attributes, self.lookup_session, tagging_api_id
)
arn_parts = arn_re.match(self.arn)
resource_id = arn_parts.group("id")
account_id = arn_parts.group("accountid")
else:
raise KeyError(
f"{self.module.res_key}.{self.name} - You must specify Arn or Tags to identify existing resource"
)
if not self.arn:
raise LookupError(
f"{self.module.res_key}.{self.name} - Failed to find the AWS Resource with given tags"
)
props = {}
_account_id = get_account_id(self.lookup_session)
LOG.debug("arn: %s - account_id: %s", self.arn, _account_id)
if _account_id == account_id and self.cloud_control_attributes_mapping:
(
props,
self.cloud_control_properties,
) = self.cloud_control_attributes_mapping_lookup(
cfn_resource_type, self.arn if use_arn_for_id else resource_id
)
if not props:
props = self.native_attributes_mapping_lookup(
account_id,
self.arn if use_arn_for_id else resource_id,
native_lookup_function,
)
self.lookup_properties = props
self.generate_cfn_mappings_from_lookup_properties()
self.generate_outputs()
[docs] def generate_cfn_mappings_from_lookup_properties(self):
"""
Sets the .mappings attribute based on the lookup_attributes for CFN purposes
"""
for parameter, value in self.lookup_properties.items():
if not isinstance(parameter, Parameter):
raise TypeError(
f"{self.module.res_key}.{self.name} - lookup attribute {parameter} is",
parameter,
type(parameter),
"Expected",
Parameter,
)
if parameter.return_value:
if parameter.return_value not in self.mappings:
self.mappings[NONALPHANUM.sub("", parameter.return_value)] = value
else:
self.mappings[
parameter.title + NONALPHANUM.sub("", parameter.return_value)
] = value
else:
self.mappings[parameter.title] = value
[docs] def get_resource_attribute_value(
self, parameter, family: ComposeFamily
) -> tuple | None:
"""Finds the value"""
if isinstance(parameter, str):
try:
attr_parameter = self.property_to_parameter_mapping[parameter]
except KeyError:
LOG.error(
f"{self.module.res_key}.{self.name} - No return value {parameter} available."
)
return
elif isinstance(parameter, Parameter):
attr_parameter = parameter
else:
raise TypeError(
"parameter is", type(parameter), "must be one of", [str, Parameter]
)
params_to_add = []
attr_id = self.attributes_outputs[attr_parameter]
if self.cfn_resource:
value = Ref(attr_id["ImportParameter"])
params_to_add.append(attr_parameter)
elif self.lookup_properties:
value = attr_id["ImportValue"]
else:
raise LookupError("Unable to find attribute of resource")
if params_to_add:
params_values = {}
settings = [get_parameter_settings(self, param) for param in params_to_add]
resource_params_to_add = []
for setting in settings:
resource_params_to_add.append(setting[1])
params_values[setting[0]] = setting[2]
add_parameters(family.template, resource_params_to_add)
family.stack.Parameters.update(params_values)
return value, params_to_add
[docs] def set_update_container_env_var(
self, family: ComposeFamily, parameter: str | Parameter, env_var_name: str
) -> list:
"""
Function that will set or update the value of a given env var from Return value of a resource.
If the resource is new, adding the parameter to the top stack
If the resource is lookup, point to the mapping.
"""
env_vars: list[Environment] = []
try:
value, params_to_add = self.get_resource_attribute_value(parameter, family)
env_vars.append(Environment(Name=env_var_name, Value=value))
return env_vars
except Exception as error:
print(error)
print("Failed to define env vars")
return []
[docs] def generate_resource_service_env_vars(
self, target: tuple, target_definition: dict
) -> list:
"""
Generates env vars based on ReturnValues set for a give service. When the resource is new, adds the
parameter to the services stack appropriately.
"""
res_return_names = {}
if hasattr(self, "add_extra_outputs"):
self.add_extra_outputs()
for prop_param in self.attributes_outputs:
if prop_param.return_value:
res_return_names[prop_param.return_value] = prop_param
else:
res_return_names[prop_param.title] = prop_param
env_vars = []
params_to_add = []
if self.ref_parameter and self.ref_parameter.title in target_definition.keys():
LOG.debug(
f"{self.module.res_key}.{self.module.res_key} - Ref parameter {self.ref_parameter.title} override."
)
elif (
self.ref_parameter
and self.ref_parameter.title not in target_definition.keys()
):
env_var_name = ENV_VAR_NAME.sub("", self.name.replace("-", "_").upper())
target_definition[self.ref_parameter.title] = env_var_name
LOG.info(
f"{self.module.res_key}.{self.name} - Auto-added {env_var_name} for Ref value"
)
else:
LOG.warning(
f"{self.module.res_key}.{self.name} - Ref parameter not defined on {self.__class__}"
)
for prop_name, env_var_name in target_definition.items():
if prop_name in res_return_names:
if self.cfn_resource:
env_vars.append(
Environment(
Name=env_var_name,
Value=Ref(
self.attributes_outputs[res_return_names[prop_name]][
"ImportParameter"
]
),
)
)
params_to_add.append(res_return_names[prop_name])
elif self.lookup_properties:
env_vars.append(
Environment(
Name=env_var_name,
Value=self.attributes_outputs[res_return_names[prop_name]][
"ImportValue"
],
)
)
if params_to_add:
params_values = {}
settings = [get_parameter_settings(self, param) for param in params_to_add]
resource_params_to_add = []
for setting in settings:
resource_params_to_add.append(setting[1])
params_values[setting[0]] = setting[2]
add_parameters(target[0].template, resource_params_to_add)
target[0].stack.Parameters.update(params_values)
return env_vars
[docs] def generate_ref_env_var(self, target) -> list:
"""
Method to define all the env var of a resource based on its own defined output attributes
"""
if not self.ref_parameter:
LOG.error(
f"{self.module.res_key}.{self.name}. Default ref_parameter not set. Skipping env_vars"
)
return []
env_var_name = ENV_VAR_NAME.sub("", self.name.upper().replace("-", "_"))
if self.cfn_resource and self.attributes_outputs and self.ref_parameter:
ref_env_var = Environment(
Name=env_var_name,
Value=Ref(
self.attributes_outputs[self.ref_parameter]["ImportParameter"]
),
)
ref_param_settings = get_parameter_settings(self, self.ref_parameter)
add_parameters(target[0].template, [ref_param_settings[1]])
target[0].stack.Parameters.update(
{ref_param_settings[0]: ref_param_settings[2]}
)
elif self.lookup_properties and self.ref_parameter:
ref_env_var = Environment(
Name=env_var_name,
Value=self.attributes_outputs[self.ref_parameter]["ImportValue"],
)
else:
raise ValueError(
f"{self.module.res_key}.{self.name} - Unable to set the default env var"
)
return [ref_env_var]
[docs] def set_attributes_from_mapping(self, attribute_parameter):
"""
Method to define the attribute outputs for lookup resources, which use FindInMap or Ref
:param attribute_parameter: The parameter mapped to the resource attribute
:type attribute_parameter: ecs_composex.common.cfn_params.Parameter
:return: The FindInMap setting for mapped resource
"""
if attribute_parameter.return_value:
long_name = attribute_parameter.title + NONALPHANUM.sub(
"", attribute_parameter.return_value
)
else:
long_name = attribute_parameter.title
if self.mappings and long_name in self.mappings:
return FindInMap(self.module.mapping_key, self.logical_name, long_name)
elif attribute_parameter.return_value:
return FindInMap(
self.module.mapping_key,
self.logical_name,
NONALPHANUM.sub("", attribute_parameter.return_value),
)
else:
return FindInMap(
self.module.mapping_key, self.logical_name, attribute_parameter.title
)
[docs] def define_export_name(self, output_definition, attribute_parameter):
"""
Method to define the export name for the resource
:return:
"""
if len(output_definition) == 5 and output_definition[4]:
LOG.debug(f"Adding portback output for {self.name}")
export = Export(
Sub(
f"${{STACK_NAME}}{DELIM}{self.name}{DELIM}{output_definition[4]}",
STACK_NAME=define_stack_name(),
)
)
else:
export = Export(
Sub(
f"${{STACK_NAME}}{DELIM}{self.logical_name}{DELIM}{attribute_parameter.title}",
STACK_NAME=define_stack_name(),
),
)
return export
[docs] def set_new_resource_outputs(self, output_definition, attribute_parameter):
"""
Method to define the outputs for the resource when new
"""
if output_definition[2] is Ref and issubclass(
type(output_definition[1]), AWSObject
):
value = Ref(output_definition[1])
elif output_definition[2] is GetAtt:
value = GetAtt(output_definition[1], output_definition[3])
elif output_definition[2] is Sub and len(output_definition) == 4:
value = Sub(output_definition[3])
elif output_definition[2] is Sub and len(output_definition) == 5:
value = Sub(output_definition[3], output_definition[4])
elif output_definition[2] is Join:
if not isinstance(output_definition[3], list):
raise ValueError(
"For Join, the parameter must be",
list,
"Got",
type(output_definition[3]),
)
value = Join(*output_definition[3])
elif (
isinstance(output_definition[2], (str, int))
and output_definition[3] is False
):
value = (
output_definition[2]
if isinstance(output_definition[2], str)
else str(output_definition[2])
)
else:
raise TypeError(
output_definition,
f"3rd argument for {output_definition[0]} must be one of",
(Ref, GetAtt, Sub, Join),
"Got",
output_definition[2],
)
export = self.define_export_name(output_definition, attribute_parameter)
return value, export
[docs] def add_new_output_attribute(
self,
attribute_id: Parameter,
attribute_config: tuple,
generate_outputs: bool = True,
):
"""
Adds a new output to attributes and re-generates all outputs
"""
if not self.output_properties:
self.output_properties = {attribute_id: attribute_config}
else:
self.output_properties.update({attribute_id: attribute_config})
if generate_outputs:
self.generate_outputs()
[docs] def generate_outputs(self):
"""
Method to create the outputs for XResources
"""
LOG.debug("%s.%s - Generating outputs", self.module.res_key, self.name)
if self.lookup_properties:
for attribute_parameter, value in self.lookup_properties.items():
output_name = f"{self.logical_name}{attribute_parameter.title}"
self.attributes_outputs[attribute_parameter] = {
"Name": output_name,
"ImportValue": self.set_attributes_from_mapping(
attribute_parameter
),
"ImportParameter": Parameter(
output_name,
group_label=(
attribute_parameter.group_label
if attribute_parameter.group_label
else self.module.mod_key
),
return_value=attribute_parameter.return_value,
Type=attribute_parameter.Type,
),
}
elif self.output_properties and not self.lookup_properties:
for (
attribute_parameter,
output_definition,
) in self.output_properties.items():
output_name = NONALPHANUM.sub("", output_definition[0])
settings = self.set_new_resource_outputs(
output_definition, attribute_parameter
)
value = settings[0]
export = settings[1]
self.attributes_outputs[attribute_parameter] = {
"Name": output_name,
"Output": Output(output_name, Value=value, Export=export),
"ImportParameter": Parameter(
output_name,
group_label=(
attribute_parameter.group_label
if attribute_parameter.group_label
else self.module.mod_key
),
return_value=attribute_parameter.return_value,
Type=attribute_parameter.Type,
),
"ImportValue": GetAtt(
(
self.stack.get_top_root_stack()
if self.stack
else self.module.mapping_key
),
f"Outputs.{output_name}",
),
"Original": attribute_parameter,
}
for attr in self.attributes_outputs.values():
if keyisset("Output", attr):
self.outputs.append(attr["Output"])
[docs] def add_parameter_to_family_stack(
self, family, settings: ComposeXSettings, parameter: Union[str, Parameter]
) -> dict:
if self.stack == family.stack:
LOG.warning(
"Cannot add parameter to resource",
f"{self.name}",
"because it is in the same stack as family",
"{family.name}",
)
return self
if (
isinstance(parameter, str)
and parameter in self.property_to_parameter_mapping.keys()
):
the_parameter = self.property_to_parameter_mapping[parameter]
elif (
isinstance(parameter, Parameter)
and parameter in self.property_to_parameter_mapping.values()
):
the_parameter = parameter
else:
raise TypeError(
"parameter must be one of", str, Parameter, "got", type(parameter)
)
if self.mappings and self.lookup:
add_update_mapping(
family.template,
self.module.mapping_key,
settings.mappings[self.module.mapping_key],
)
return self.attributes_outputs[the_parameter]
param_id = self.attributes_outputs[the_parameter]
add_parameters(family.template, [param_id["ImportParameter"]])
family.stack.Parameters.update(
{param_id["ImportParameter"].title: param_id["ImportValue"]}
)
return param_id
[docs] def add_attribute_to_another_stack(
self, ext_stack, attribute: Parameter, settings: ComposeXSettings
):
attr_id = self.attributes_outputs[attribute]
if self.mappings and self.lookup:
add_update_mapping(
ext_stack.stack_template, self.module.mapping_key, self.module.mappings
)
elif self.cfn_resource:
add_update_parameter_recursively(ext_stack, settings, attr_id)
else:
raise AttributeError(
self.module.res_key, self.name, "No lookup nor mappings ??"
)
return attr_id
[docs] def post_processing(self, settings: ComposeXSettings):
if not self.cfn_resource or not hasattr(self, "post_processing_properties"):
LOG.debug("Not a new cluster or no post_processing_properties. Skipping")
return
LOG.info(f"Post processing {self.module.res_key}.{self.name}")
for _property in getattr(self, "post_processing_properties", []):
properties_to_update: list[tuple] = get_nested_property(
self.cfn_resource, _property
)
for _prop_to_update in properties_to_update:
aws_property_object, property_name, value = _prop_to_update
value = validate_input_value(aws_property_object, property_name, value)
if not value:
continue
resource, parameter = settings.get_resource_attribute(value)
if not resource or not parameter:
LOG.error(
"%s.%s - Failed to find resource/attribute for %s with value %s",
self.module.res_key,
self.name,
resource,
value,
)
continue
res_param_id = resource.add_attribute_to_another_stack(
self.stack, parameter, settings
)
if res_param_id is resource:
res_propery_value = Ref(resource.cfn_resource)
elif res_param_id is not resource and resource.cfn_resource:
res_propery_value = Ref(res_param_id["ImportParameter"])
else:
res_propery_value = res_param_id["ImportValue"]
setattr(aws_property_object, property_name, res_propery_value)
LOG.info(
"%s.%s - Successfully mapped %s to %s.%s",
self.module.res_key,
self.name,
_property,
resource.module.res_key,
resource.name,
)