Source code for ecs_composex.compose.compose_services.helpers

# SPDX-License-Identifier: MPL-2.0
# Copyright 2020-2022 John Mille <john@compose-x.io>

from __future__ import annotations

import re
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from ecs_composex.common import ComposeXSettings

if TYPE_CHECKING:
    from troposphere import Template
    from troposphere.ecs import ContainerDefinition, Secret
    from ecs_composex.common.settings import ComposeXSettings
    from ecs_composex.compose.compose_services import ComposeService

from compose_x_common.compose_x_common import keyisset, set_else_none
from troposphere import FindInMap, GetAtt, ImportValue, NoValue, Ref, Sub
from troposphere.ecs import ContainerDefinition, Environment

from ecs_composex.common import NONALPHANUM
from ecs_composex.common.logging import LOG


[docs]def import_secrets( template: Template, service: ComposeService, container: ContainerDefinition, settings: ComposeXSettings, ): """Function to import secrets from compose-x and map those to AWS Secrets in Secrets Manager""" if not service.secrets: return if not keyisset("secrets", settings.compose_content): return else: settings_secrets = settings.compose_content["secrets"] for secret in service.secrets: if ( isinstance(secret, str) and secret in settings_secrets and keyisset("ComposeSecret", settings_secrets[secret]) ): settings_secrets[secret]["ComposeSecret"].assign_to_task_definition( template, container ) elif isinstance(secret, dict) and keyisset("source", secret): secret_name = secret["source"] if keyisset("ComposeSecret", settings_secrets[secret_name]): settings_secrets[secret_name][ "ComposeSecret" ].assign_to_task_definition(template, container)
[docs]def define_string_interpolation(var_value: str) -> str: """ Function to determine whether an env variable string should use Sub. :param str var_value: The env var string as defined in compose file :return: String as is or Sub for interpolation :rtype: str """ if var_value.find(r"${AWS::") >= 0: LOG.debug(var_value) return Sub(var_value) return var_value
[docs]def set_environment_dict_from_list(environment: list) -> dict[str, str]: """Transforms a list of string with a ``key=value`` into a dict of key/value""" env_vars_to_map = {} for key in environment: if not isinstance(key, str) or key.find(r"=") < 0: raise TypeError( f"Environment variable {key} must be a string in the Key=Value format" ) splits = key.split(r"=") if splits[0] not in env_vars_to_map: env_vars_to_map[splits[0]] = splits[1] else: LOG.warning(f"{splits[0]} was already defined. Overriding to newer value") env_vars_to_map[splits[0]] = splits[1] return env_vars_to_map
[docs]def import_env_variables(environment) -> list: """ Function to import Docker compose env variables into ECS Env Variables :param environment: Environment variables as defined on the ecs_service definition :type environment: dict :return: list of Environment :rtype: list<troposphere.ecs.Environment> """ env_vars = [] if isinstance(environment, list): env_vars_to_map = set_environment_dict_from_list(environment) elif isinstance(environment, dict): env_vars_to_map = environment else: raise TypeError( "Enviroment must be a list of string or a dict of key/value where value is a string" ) for key, value in env_vars_to_map.items(): if not isinstance(value, str): env_vars.append(Environment(Name=key, Value=str(environment[key]))) else: env_vars.append( Environment( Name=key, Value=define_string_interpolation(value), ) ) return env_vars
[docs]def extend_container_secrets(container: ContainerDefinition, secret: Secret): """Add secrets to a Container definition""" if hasattr(container, "Secrets"): secrets = getattr(container, "Secrets") if secrets: uniq = [secret.Name for secret in secrets] if secret.Name not in uniq: secrets.append(secret) else: setattr(container, "Secrets", [secret]) else: setattr(container, "Secrets", [secret])
[docs]def set_validate_environment(container: ContainerDefinition) -> None: """ Validates that the environment property of the container definition is valid. If is NoValue """ _environment = getattr(container, "Environment") if isinstance(_environment, Ref) and _environment == NoValue: setattr(container, "Environment", []) elif not isinstance(_environment, list): raise TypeError( f"container def Environment {_environment} is not list or Ref(AWS_NO_VALUE).", _environment, )
[docs]def extend_container_envvars( container: ContainerDefinition, env_vars: list, replace: bool = False ) -> None: """Extends the container environment variables with new ones to add. If not already set, defines.""" ignored_containers = ["xray-daemon", "envoy", "cw_agent"] if ( isinstance(container, ContainerDefinition) and not isinstance(container.Name, (Ref, Sub, GetAtt, ImportValue, FindInMap)) and container.Name in ignored_containers ): LOG.debug(f"Ignoring AWS Container {container.Name}") return if not hasattr(container, "Environment"): setattr(container, "Environment", []) set_validate_environment(container) environment = getattr(container, "Environment") existing_names = [ var.Name for var in environment if isinstance(var, Environment) and isinstance(var.Name, str) ] for var in env_vars: if not isinstance(var, Environment): if var not in environment: LOG.debug(f"var already exists {var}") else: environment.append(var) continue if var.Name not in existing_names: LOG.debug(f"Adding {var.Name} to {existing_names}") environment.append(var) elif var.Name in existing_names and replace: for defined_env_var in environment: if defined_env_var.Name == var.Name: setattr(defined_env_var, "Value", var.Value) break LOG.debug( f"{container.Name}, {[env.Name for env in environment if isinstance(env, Environment)]}" )
[docs]def define_ingress_mappings(service_ports: list) -> dict: """ Function to create a mapping of sources for a common target """ udp_mappings = {} tcp_mappings = {} ports_mappings = {"tcp": tcp_mappings, "udp": udp_mappings} for port in service_ports: if not keyisset("target", port): raise KeyError("The ports must always at least define the target.") _port_protocol = set_else_none("protocol", port, "tcp") _port_target = port["target"] _port_published = set_else_none("published", port, None) if _port_protocol == "udp": mappings = udp_mappings else: mappings = tcp_mappings if _port_target not in mappings: if _port_published: mappings[_port_target] = [(_port_published, service_ports)] else: mappings[_port_target] = [] elif _port_target in mappings and _port_published not in mappings[_port_target]: mappings[_port_target].append((_port_published, service_ports)) return ports_mappings
[docs]def validate_healthcheck(healthcheck, valid_keys, required_keys): """ Healthcheck definition validation :param dict healthcheck: :param list valid_keys: :param list required_keys: """ for key in healthcheck.keys(): if key not in valid_keys: raise AttributeError(f"Key {key} is not valid. Expected", valid_keys) if not all(required_keys) not in healthcheck.keys(): raise AttributeError( f"Expected at least {required_keys}. Got", healthcheck.keys() )
[docs]def sub_generate( input_s: str, sub_args: dict, settings: ComposeXSettings, service_family ) -> tuple: eval_r = re.compile( r"(?P<intrinsic>x-aws::(?P<pseudo>AWS::[a-zA-Z0-9]+))|" r"(?P<res_key>x-[a-zA-Z0-9-_]+)::(?P<res_name>[a-zA-Z0-9-_]+)::(?P<return_value>[a-zA-Z0-9-_]+)" ) for _inter in eval_r.findall(input_s): first_match = eval_r.search(input_s) if not first_match: break aws, resource = first_match.group("pseudo"), first_match.group("res_key") if resource: res_key: str = first_match.group("res_key") res_name: str = first_match.group("res_name") return_value: str = first_match.group("return_value") resource, parameter = settings.get_resource_attribute( f"{res_key}::{res_name}::{return_value}" ) value, params_to_add = resource.get_resource_attribute_value( parameter, service_family ) sub_arg_name_r: str = ( NONALPHANUM.sub("", res_key) + NONALPHANUM.sub("", res_name) + NONALPHANUM.sub("", return_value) ) sub_arg_name: str = f"${{{sub_arg_name_r}}}" sub_args[sub_arg_name_r] = value else: sub_arg_name: str = f"${{{first_match.group('pseudo')}}}" input_s = eval_r.sub(sub_arg_name, input_s, count=1) return input_s, sub_args