Source code for ecs_composex.common.services_helpers

#  -*- coding: utf-8 -*-
# SPDX-License-Identifier: MPL-2.0
# Copyright 2020-2021 John Mille <john@compose-x.io>

from compose_x_common.compose_x_common import keyisset, keypresent
from troposphere import FindInMap, GetAtt, ImportValue, Ref, Sub
from troposphere.ecs import ContainerDefinition, Environment

from ecs_composex.common import LOG
from ecs_composex.ecs.ecs_params import LOG_GROUP_RETENTION


[docs]def import_secrets(template, service, container, settings): """ Function to import secrets from composex mapping to AWS Secrets in Secrets Manager :param troposphere.Template template: :param troposhere.ecs.ContainerDefinition container: :param ecs_composex.common.settings.ComposeXSettings settings: :return: """ if not service.secrets: return if not keyisset("secrets", settings.compose_content): return else: settings_secrets = settings.compose_content["secrets"] for secret in service.secrets: if ( isinstance(secret, str) and secret in settings_secrets and keyisset("ComposeSecret", settings_secrets[secret]) ): settings_secrets[secret]["ComposeSecret"].assign_to_task_definition( template, container ) elif isinstance(secret, dict) and keyisset("source", secret): secret_name = secret["source"] if keyisset("ComposeSecret", settings_secrets[secret_name]): settings_secrets[secret_name][ "ComposeSecret" ].assign_to_task_definition(template, container)
[docs]def define_string_interpolation(var_value): """ Function to determine whether an env variable string should use Sub. :param str var_value: The env var string as defined in compose file :return: String as is or Sub for interpolation :rtype: str """ if var_value.find(r"${AWS::") >= 0: LOG.debug(var_value) return Sub(var_value) return var_value
[docs]def import_env_variables(environment): """ Function to import Docker compose env variables into ECS Env Variables :param environment: Environment variables as defined on the ecs_service definition :type environment: dict :return: list of Environment :rtype: list<troposphere.ecs.Environment> """ env_vars = [] for key in environment: if not isinstance(environment[key], str): env_vars.append(Environment(Name=key, Value=str(environment[key]))) else: env_vars.append( Environment( Name=key, Value=define_string_interpolation(environment[key]), ) ) return env_vars
[docs]def extend_container_secrets(container, secret): """ Function to add secrets to a Container definition :param container: container definition :type container: troposphere.ecs.ContainerDefinition :param secret: secret to add :type secret: troposphere.ecs.Secret """ if hasattr(container, "Secrets"): secrets = getattr(container, "Secrets") if secrets: uniq = [secret.Name for secret in secrets] if secret.Name not in uniq: secrets.append(secret) else: setattr(container, "Secrets", [secret]) else: setattr(container, "Secrets", [secret])
[docs]def extend_container_envvars(container, env_vars): ignored_containers = ["xray-daemon", "envoy", "cw_agent"] if ( isinstance(container, ContainerDefinition) and not isinstance(container.Name, (Ref, Sub, GetAtt, ImportValue, FindInMap)) and container.Name in ignored_containers ): LOG.debug(f"Ignoring AWS Container {container.Name}") return environment = ( getattr(container, "Environment") if hasattr(container, "Environment") and not isinstance(getattr(container, "Environment"), Ref) else [] ) if environment: existing = [var.Name for var in environment] for var in env_vars: if var.Name not in existing: LOG.debug(f"Adding {var.Name} to {existing}") environment.append(var) else: setattr(container, "Environment", env_vars) LOG.debug(f"{container.Name}, {[env.Name for env in environment]}")
[docs]def define_ingress_mappings(service_ports): """ Function to create a mapping of sources for a common target """ ingress_mappings = {} for port in service_ports: if not keyisset("target", port): raise KeyError("The ports must always at least define the target.") if not keyisset("published", port): port["published"] = port["target"] if not port["target"] in ingress_mappings.keys(): ingress_mappings[port["target"]] = [port["published"]] elif ( port["target"] in ingress_mappings.keys() and not port["published"] in ingress_mappings[port["target"]] ): ingress_mappings[port["target"]].append(port["published"]) return ingress_mappings
[docs]def validate_healthcheck(healthcheck, valid_keys, required_keys): """ Healthcheck definition validation :param dict healthcheck: :param list valid_keys: :param list required_keys: """ for key in healthcheck.keys(): if key not in valid_keys: raise AttributeError(f"Key {key} is not valid. Expected", valid_keys) if not all(required_keys) not in healthcheck.keys(): raise AttributeError( f"Expected at least {required_keys}. Got", healthcheck.keys() )
[docs]def set_else_none(key, props, alt_value=None, eval_bool=False): """ Function to serialize if not keyisset () set other value :param str key: :param dict props: :param alt_value: :param bool eval_bool: Allows to gets booleans properties :return: """ if not eval_bool: return alt_value if not keyisset(key, props) else props[key] elif eval_bool: return alt_value if not keypresent(key, props) else props[key]
[docs]def get_closest_valid_log_retention_period(set_expiry): return min( LOG_GROUP_RETENTION.AllowedValues, key=lambda x: abs(x - max([set_expiry])), )
[docs]def set_logging_expiry(service): """ Method to reset the logging retention period to the closest valid value. :param ecs_composex.common.compose_services.ComposeService service: :return: """ closest_valid = LOG_GROUP_RETENTION.Default if service.x_logging and keyisset("RetentionInDays", service.x_logging): set_expiry = int(service.x_logging["RetentionInDays"]) if set_expiry not in LOG_GROUP_RETENTION.AllowedValues: closest_valid = get_closest_valid_log_retention_period(set_expiry) service.x_logging.update({"RetentionInDays": closest_valid}) return closest_valid