Source code for ecs_composex.compose.compose_services

#  SPDX-License-Identifier: MPL-2.0
#  Copyright 2020-2022 John Mille <john@compose-x.io>

"""
Module to import the services defined in compose files and import / transform the settings into
Compose-X usable properties
"""

from __future__ import annotations

import re
import shlex
from copy import deepcopy
from os import path
from typing import TYPE_CHECKING, Union

if TYPE_CHECKING:
    from ecs_composex.ecs.ecs_family import ComposeFamily
    from ecs_composex.common.settings import ComposeXSettings

from compose_x_common.compose_x_common import keyisset, keypresent, set_else_none
from troposphere import AWSHelperFn, If, NoValue, Ref, Sub
from troposphere.ecs import (
    ContainerDefinition,
    Environment,
    HealthCheck,
    KernelCapabilities,
    LinuxParameters,
    PortMapping,
    SystemControl,
    Tmpfs,
    Ulimit,
)

from ecs_composex.common import NONALPHANUM
from ecs_composex.common.cfn_params import ROOT_STACK_NAME, Parameter
from ecs_composex.common.logging import LOG
from ecs_composex.compose.compose_secrets.services_helpers import map_secrets
from ecs_composex.compose.compose_services.docker_tools import (
    import_time_values_to_seconds,
    set_compute_resources,
    set_memory_to_mb,
)
from ecs_composex.compose.compose_services.helpers import (
    define_ingress_mappings,
    import_env_variables,
    sub_generate,
    validate_healthcheck,
)
from ecs_composex.compose.compose_volumes.services_helpers import map_volumes
from ecs_composex.ecs.ecs_conditions import (
    IPC_FROM_HOST_CON_T,
    USE_FARGATE_CON_T,
    USE_WINDOWS_OS_T,
)

from ...common.troposphere_tools import add_parameters
from .helpers import extend_container_envvars
from .service_image import ServiceImage


[docs]class ComposeService: """ Class to represent a docker-compose singleton service :ivar str container_name: name of the container to use in definitions :ivar ecs_composex.compose.compose_services.service_logging.ServiceLogging logging: :ivar ServiceImage image: """ main_key = "services" ecs_plugin_aws_keys = [ ("x-aws-role", dict), ("x-aws-policies", list), ("x-aws-autoscaling", dict), ("x-aws-pull_credentials", str), ("x-aws-logs_retention", int), ("x-aws-min_percent", int), ("x-aws-max_percent", int), ] def __init__( self, name, definition, volumes=None, secrets=None, image_param: Parameter = None, ): for setting in self.ecs_plugin_aws_keys: if keyisset(setting[0], definition) and not isinstance( definition[setting[0]], setting[1] ): raise TypeError( setting[0], "is of type", type(definition[setting][0]), "Expected", setting[1], ) self._definition = deepcopy(definition) self.name = name self.container_definition = None self.x_scaling = set_else_none("x-scaling", self.definition, None, False) self.x_network = set_else_none("x-network", self.definition, None, False) self.x_cloudmap = set_else_none("x-cloudmap", self.x_network, None, False) self.x_ecs_connect = set_else_none("x-ecs_connect", self.x_network, None) self.x_ecs = set_else_none("x-ecs", self.definition, {}) self.ecr_config = set_else_none("x-ecr", self.definition, None) self.x_ecr = set_else_none("x-ecr", self.definition, {}) self.eip_auto_assign = set_else_none("AssignPublicIp", self.x_network, False) self.x_ray = set_else_none( "x-xray", self.definition, set_else_none("x-ray", self.definition, False, True), True, ) self.x_repo_credentials = None self.ipc = set_else_none("ipc", self.definition) self.import_x_aws_settings() self.volumes = [] self.logging = None self.secrets = [] self.tmpfses = [] self.user = None self.group = None self.user_group = None self.code_profiler = None self.environment = set_else_none("environment", self.definition, None, False) self.cfn_environment = ( import_env_variables(self.environment) if self.environment else NoValue ) self.depends_on: dict = set_else_none("depends_on", self.definition, {}, False) if isinstance(self.depends_on, list): services_names = [_s_name for _s_name in self.depends_on] self.depends_on: dict = {} for service_name in services_names: self.depends_on[service_name] = {"condition": "service_started"} self.docker_labels: dict = {} self.import_docker_labels(self._definition) if not keyisset("image", self.definition): raise KeyError("You must specify the image to use for", self.name) if not image_param: self.image = ServiceImage(self) else: self.image = ServiceImage(self, image_param) self._mem_alloc = None self._mem_resa = None self._cpu_amount = None self.__family = None self.is_aws_sidecar = False self.deploy = set_else_none("deploy", self.definition, None) self.deploy_labels = set_else_none("labels", self.deploy, alt_value={}) if self.deploy: set_compute_resources(self, self.deploy) self.ports = set_else_none("ports", self.definition, []) self.expose_ports = set_else_none("expose", self.definition, []) self.ingress_mappings = define_ingress_mappings(self.ports) self.set_user_group() map_volumes(self, volumes) map_secrets(self, secrets) self.set_container_definition() self.links = set_else_none("links", definition) self.family_links: list = [] self.x_environment: dict = set_else_none("x-environment", definition, {}) def __repr__(self): return self.name @property def definition(self): return self._definition @property def container_name(self) -> str: return set_else_none("container_name", self._definition, self.name) @property def compose_x_arn(self) -> str: if self.family: return f"{self.family.name}::{self.name}" else: return self.name @property def family(self) -> ComposeFamily: return self.__family if self.__family else None @family.setter def family(self, family: ComposeFamily): self.__family = family if family.template: add_parameters(family.template, [self.image.image_param]) if ( family.stack and isinstance(self.image, ServiceImage) and isinstance(self.image.image, str) ): family.stack.Parameters.update( {self.image.image_param.title: self.image.image_uri} ) @property def ecs_user(self) -> Union[str, AWSHelperFn]: __user = set_else_none("user", self.definition, alt_value=None) if not __user: return NoValue return str(__user) @property def deploy_labels(self): return set_else_none("labels", self.deploy, alt_value={}) @deploy_labels.setter def deploy_labels(self, value: dict): if not self.deploy: self.deploy: dict = {"labels": value} if keypresent("labels", self.deploy) and not keyisset("labels", self.deploy): self.deploy["labels"]: dict = value elif keyisset("labels", self.deploy): self.deploy.update(value) @property def networks(self): _networks = set_else_none("networks", self.definition, alt_value={}) if isinstance(_networks, list): new_definition = {} for name in _networks: new_definition[name] = {} return new_definition elif isinstance(_networks, dict): return _networks else: raise TypeError( self.name, "networks is of type", type(_networks), "must be one of", (dict, list), ) @networks.setter def networks(self, value): if not isinstance(value, (list, dict)) or not issubclass( type(value), AWSHelperFn ): raise TypeError( self.name, "networks is of type", type(value), "must be one of", (dict, list), "or", AWSHelperFn, ) if isinstance(value, list): new_definition = {} for name in value: new_definition[name] = {} value = new_definition self.definition["networks"]: dict = value @property def logical_name(self) -> str: return NONALPHANUM.sub("", self.name) @property def resources(self): return set_else_none("resources", self.deploy, alt_value={}) @property def cpu_amount(self) -> Union[int, Ref]: if not self._cpu_amount or self.container_start_condition in [ "SUCCESS", "COMPLETE", ]: return NoValue alloc = "limits" resa = "reservations" resource = "cpus" _set_limit = float( set_else_none( resource, set_else_none(alloc, self.resources, alt_value={}), alt_value=0, ) ) _set_resa = float( set_else_none( resource, set_else_none(resa, self.resources, alt_value={}), alt_value=0, ) ) to_set = float(max([_set_limit, _set_resa])) if to_set: return int(float(to_set * 1024)) return NoValue @cpu_amount.setter def cpu_amount(self, value: Union[int, AWSHelperFn, None]): self._cpu_amount = value @property def memory_limit(self): if self.container_start_condition in [ "SUCCESS", "COMPLETE", ]: return NoValue resource = "memory" str_value = set_else_none( resource, set_else_none("limits", self.resources, alt_value=None) ) if not str_value: return NoValue return set_memory_to_mb(str_value) @property def memory_reservations(self): if self.container_start_condition in [ "SUCCESS", "COMPLETE", ]: return NoValue resource = "memory" str_value = set_else_none( resource, set_else_none("reservations", self.resources, alt_value=None) ) if not str_value: return NoValue return set_memory_to_mb(str_value) @property def command(self): _command = set_else_none("command", self.definition, alt_value=NoValue) if isinstance(_command, str): return shlex.split(_command) else: return _command @command.setter def command(self, new_command): self.definition.update({"command": new_command}) if self.container_definition: setattr(self.container_definition, "Command", new_command) @property def entrypoint(self): _entrypoint = set_else_none("entrypoint", self.definition, alt_value=NoValue) if isinstance(_entrypoint, str): return shlex.split(_entrypoint) else: return _entrypoint @entrypoint.setter def entrypoint(self, new_entrypoint): if isinstance(new_entrypoint, str): new_entrypoint = shlex.split(new_entrypoint) self.definition.update({"entrypoint": new_entrypoint}) if self.container_definition: setattr(self.container_definition, "EntryPoint", new_entrypoint) @property def runtime_architecture(self): return set_else_none("CpuArchitecture", self.x_ecs, None) @property def runtime_os_family(self): return set_else_none("OperatingSystemFamily", self.x_ecs, None) @property def capacity_provider_strategy(self): return set_else_none("CapacityProviderStrategy", self.x_ecs, None) @property def replicas(self): return int(set_else_none("replicas", self.deploy, alt_value=1, eval_bool=True)) @replicas.setter def replicas(self, value: int): if not isinstance(value, int): raise ValueError(self.name, "replicas must be an integer") if self.deploy: self.deploy["replicas"] = value else: self.deploy = {"replicas": value} @property def working_dir(self): return set_else_none("working_dir", self.definition, alt_value=NoValue) @property def is_essential(self) -> bool: """ In order of absolutes * If only 1 container in service, it must be essential * If user defined value (bool) and start condition is not SUCCESS or COMPLETE, then user defined * If not user defined value (None) and start condition is SUCCESS or COMPLETE, then it cannot be essential, as it is expected to shutdown """ _tmp = True if self.family and len(self.family.services) == 1: _tmp = True elif self.user_define_essential and not ( self.container_start_condition == "SUCCESS" or self.container_definition == "COMPLETE" ): _tmp = self.user_define_essential elif ( self.container_start_condition == "SUCCESS" or self.container_definition == "COMPLETE" ): _tmp = False self.is_essential = _tmp if self.container_definition and hasattr( self.container_definition, "Essential" ): return self.container_definition.Essential else: return _tmp @is_essential.setter def is_essential(self, value: bool): if not isinstance(value, bool): raise TypeError( self.name, "is_essential must be one of", (bool, Ref), "got", type(value), ) if self.container_definition: setattr(self.container_definition, "Essential", value) @property def user_define_essential(self) -> Union[None, bool]: """ Allows user to override whether a container is essential or not. By default, in absence of the label, service is considered essential as it might be the only one in the family """ essential_key = "ecs.essential" _defined_essential = set_else_none( essential_key, self.deploy_labels, alt_value=None ) if _defined_essential is None: return None positive_values = [True, "yes", "True"] negative_values = [False, "no", "False"] if ( _defined_essential not in positive_values or _defined_essential not in negative_values ): raise ValueError( "The values allowed for", essential_key, "are", positive_values, negative_values, "Got", _defined_essential, ) if _defined_essential in negative_values: return False return True @property def container_start_condition(self) -> str: if ( isinstance(self.ecs_healthcheck, HealthCheck) and self.ecs_healthcheck != NoValue ): return "HEALTHY" depends_key = "ecs.depends.condition" return set_else_none( depends_key, self.deploy_labels, alt_value="START", ) @container_start_condition.setter def container_start_condition(self, value): depends_key = "ecs.depends.condition" valid_conditions = ["START", "COMPLETE", "SUCCESS", "HEALTHY"] if value not in valid_conditions: raise ValueError( self.name, depends_key, "is set to ", value, "must be one of", valid_conditions, ) if self.deploy: if keyisset("labels", self.deploy): self.deploy["labels"][depends_key] = value else: self.deploy["labels"]: dict = {depends_key: value} else: self.deploy: dict = {"labels": {depends_key: value}} @property def ephemeral_storage(self) -> int: storage_key = "ecs.ephemeral.storage" storage_value = set_else_none(storage_key, self.deploy_labels, 0) if not storage_value: return 0 if isinstance(storage_value, (int, float)): ephemeral_storage = int(storage_value) elif isinstance(storage_value, str): ephemeral_storage = int(set_memory_to_mb(storage_value) / 1024) else: raise TypeError( f"The value for {storage_key} is of type", type(storage_value), "Expected one of", [int, float, str], ) if ephemeral_storage <= 21: LOG.debug( f"{self.name} - {storage_key} - defined value is <= 21. Leaving to default" ) return 0 elif ephemeral_storage > 200: LOG.warning( f"{self.name} - {storage_key} set to maximum 200 ({ephemeral_storage} > 200)" ) return 200 else: LOG.debug(f"{self.name} - {storage_key} set to {ephemeral_storage}") return int(ephemeral_storage) @property def launch_type(self) -> Union[str, None]: compute_key = "ecs.compute.platform" return set_else_none( compute_key, set_else_none("labels", self.deploy, alt_value={}), alt_value=None, ) @launch_type.setter def launch_type(self, value: str): compute_key = "ecs.compute.platform" valid = ["EC2", "FARGATE", "EXTERNAL"] if value not in valid: raise ValueError( self.name, compute_key, value, "is invalid. Must be one of", valid ) if self.deploy: if keyisset("labels", self.deploy): self.deploy["labels"].update({compute_key: value}) else: self.deploy["labels"] = {compute_key: value} else: self.deploy = {"labels": {compute_key: value}} @property def healthcheck(self): return set_else_none("healthcheck", self.definition, alt_value={}) @property def ecs_healthcheck(self) -> Union[HealthCheck, AWSHelperFn]: """ If HealthCheck already set ContainerDefinition and value is "None" but service.healtheck defined, define HealthCheck() from service.healthcheck. Elif already defined and not "None", return current value """ __current = None if self.container_definition and hasattr( self.container_definition, "HealthCheck" ): __current = getattr(self.container_definition, "HealthCheck") if ( (__current is None or __current == NoValue) and self.healthcheck and not self.container_definition ): if keyisset("disable", self.healthcheck): return NoValue valid_keys = ["test", "interval", "timeout", "retries", "start_period"] attr_mappings = { "test": ("Command", None), "interval": ("Interval", import_time_values_to_seconds), "timeout": ("Timeout", import_time_values_to_seconds), "retries": ("Retries", None), "start_period": ("StartPeriod", import_time_values_to_seconds), } required_keys = ["test"] validate_healthcheck(self.healthcheck, valid_keys, required_keys) params = {} for key, value in self.healthcheck.items(): _mapping = attr_mappings[key] ecs_key = _mapping[0] if _mapping[1] and callable(_mapping[1]): params[ecs_key] = _mapping[1](value) else: params[ecs_key] = value if isinstance(params["Command"], str): params["Command"] = ["CMD-SHELL", params["Command"]] return HealthCheck(**params) elif isinstance(__current, HealthCheck) or issubclass( type(__current), AWSHelperFn ): return __current return NoValue @property def family_hostname(self) -> str: hostname = "ecs.task.family.hostname" return set_else_none( hostname, set_else_none("labels", self.deploy, alt_value={}), alt_value=None ) @property def update_config(self): _config = set_else_none("update_config", self.deploy, alt_value={}) if not isinstance(_config, dict): raise TypeError( "The deploy.update_config must be a dict/map. Got", _config, type(_config), ) return _config @property def families(self): ecs_task_family = "ecs.task.family" __families = set_else_none(ecs_task_family, self.deploy_labels) if __families is None: return [self.name] if not isinstance(__families, str): raise TypeError( ecs_task_family, "must be", str, "got", __families, type(__families) ) return __families.split(r",") @property def tmpfs(self): """ Method to define the tmpfs settings """ tmpfs_key = "tmpfs" tmpfses = set_else_none(tmpfs_key, self.definition, alt_value=[]) if not tmpfses: return NoValue if isinstance(self.definition[tmpfs_key], str): self.tmpfses.append( {"ContainerPath": self.definition[tmpfs_key], "Size": 0} ) elif isinstance(self.definition[tmpfs_key], list): for container_path in self.definition[tmpfs_key]: self.tmpfses.append({"ContainerPath": container_path, "Size": 0}) rendered_fs = [Tmpfs(**args) for args in self.tmpfses] return If(USE_FARGATE_CON_T, NoValue, rendered_fs) @property def sysctls(self): """ Method to define the SystemControls """ sysctls_key = "sysctls" __sysctls = set_else_none(sysctls_key, self.definition, alt_value={}) if not __sysctls: return NoValue def_dict = {} if isinstance(__sysctls, list): for prop in __sysctls: splits = prop.split(r"=") if not splits or len(splits) != 2: raise ValueError(f"Property define {prop} is not valid.") def_dict[splits[0]] = splits[1] elif isinstance(__sysctls, dict): def_dict = __sysctls controls = [] for name, value in def_dict.items(): controls.append(SystemControl(Namespace=name, Value=str(value))) return If( IPC_FROM_HOST_CON_T, NoValue, If(USE_FARGATE_CON_T, NoValue, controls) ) @property def shm_size(self): """ Method to import and determine SHM SIZE """ __shm_size = set_else_none("shm_size", self.definition) if not __shm_size: return NoValue if not isinstance(__shm_size, (int, str, float)): raise TypeError(self.name) memory_value = set_memory_to_mb(__shm_size) return If(USE_FARGATE_CON_T, NoValue, memory_value) @property def kernel_properties(self) -> KernelCapabilities: from .kernel_options_helpers import define_kernel_options return define_kernel_options(self) @property def ulimits(self) -> Union[list, Ref]: """ Set the ulimits """ _ulimits = set_else_none("ulimits", self.definition, alt_value={}) if not _ulimits: return NoValue rendered_limits = [] fargate_supported = ["nofile"] allowed = [ "core", "cpu", "data", "fsize", "locks", "memlock", "msgqueue", "nice", "nofile", "nproc", "rss", "rtprio", "rttime", "sigpending", "stack", ] for limit_name, limit_value in _ulimits.items(): if limit_name not in allowed: raise KeyError( f"{self.name} - ulimit property {limit_name} is not supported by ECS. Valid ones are", allowed, ) elif isinstance(limit_value, (str, int)): ulimit = Ulimit( SoftLimit=int(limit_value), HardLimit=int(limit_value), Name=limit_name, ) elif isinstance(limit_value, dict): if keyisset("soft", limit_value) and keyisset("hard", limit_value): ulimit = Ulimit( SoftLimit=int(limit_value["soft"]), HardLimit=int(limit_value["hard"]), Name=limit_name, ) else: raise KeyError( f"Missing hard or soft properties for ulimit {limit_name}" ) else: raise TypeError(f"{self.name} - ulimit is not of the proper definition") if limit_name not in fargate_supported: rendered_limits.append(If(USE_FARGATE_CON_T, NoValue, ulimit)) else: rendered_limits.append(ulimit) return rendered_limits if rendered_limits else NoValue @property def stop_grace_period(self): """ Method to import and determine StopTimeout """ __stop_grace_period = set_else_none("stop_grace_period", self.definition) if not __stop_grace_period: return NoValue if not isinstance(__stop_grace_period, (str)): raise TypeError(self.name) stop_timeout = import_time_values_to_seconds(__stop_grace_period) if stop_timeout < 2: LOG.warning( f"services.{self.name} - stop_grace_period/StopTimeout {stop_timeout} < 2 - setting to 2" ) stop_timeout = 2 elif stop_timeout > 120: LOG.warning( f"services.{self.name} - stop_grace_period/StopTimeout {stop_timeout} > 120 - setting to 120" ) stop_timeout = 120 return stop_timeout @property def x_iam(self) -> dict: __iam = set_else_none( "x-iam", self.definition, alt_value={ "ManagedPolicyArns": [], "Policies": [], "PermissionsBoundary": None, }, ) if keyisset("x-aws-policies", self.definition): __iam["ManagedPolicyArns"] += self.definition["x-aws-policies"] if keyisset("x-aws-role", self.definition): __iam["Policies"].append( { "PolicyName": "ImportedFromXAWSRole", "PolicyDocument": self.definition["x-aws-role"], } ) return __iam @property def env_files(self) -> list: """ Method to list all the env files and check the files are found and available. """ env_file_key = "env_file" _env_files = set_else_none(env_file_key, self.definition) if not _env_files: return [] if not isinstance(_env_files, (str, list)): raise TypeError( self.name, env_file_key, "must be one of", (str, list), "Got", _env_files, type(_env_files), ) env_files = [] if isinstance(self.definition[env_file_key], str): env_files = [_env_files] for file_path in _env_files: if not isinstance(file_path, str): raise TypeError( "Files in the env_file is supposed to be a list of paths to files (str). Got", type(file_path), ) if not path.exists(path.abspath(file_path)): raise FileNotFoundError("No file found at", path.abspath(file_path)) env_files.append(path.abspath(file_path)) return env_files
[docs] def handle_expose_ports(self, aws_vpc_mappings): """ Import the expose ports to AWS VPC Mappings :param list[troposphere.ecs.PortMapping] aws_vpc_mappings: List of ECS Port Mappings defined from ports[] """ expose_port_re = re.compile(r"^(?P<target>\d{1,5})(?=/(?P<protocol>udp|tcp))") for expose_port in self.expose_ports: if isinstance(expose_port, str): parts = expose_port_re.match(expose_port) if not parts: raise ValueError( "Expose port value is invalid. Must match", expose_port_re.pattern, ) port = int(parts.group("target")) protocol = parts.group("protocol") or "tcp" elif isinstance(expose_port, int): port = expose_port protocol = "tcp" else: raise TypeError( expose_port, "is", type(expose_port), "expected one of", (str, int) ) if port not in [p.ContainerPort for p in aws_vpc_mappings]: aws_vpc_mappings.append( PortMapping( HostPort=NoValue, ContainerPort=port, Protocol=protocol.lower(), ) ) else: LOG.debug( f"{self.name} - Port {port} was already defined as ``ports``." " In awsvpc mode the Container Ports must be unique." f" Skipping {self.name}.expose.{expose_port}" )
[docs] def define_port_mappings(self) -> list: """ Define the list of port mappings to use for either AWS VPC deployments or else (bridge etc). Not in use atm as AWS VPC is made mandatory """ if self.container_definition: service_port_mappings = getattr(self.container_definition, "PortMappings") else: return [] for protocol, mappings in self.ingress_mappings.items(): for target_port, published_ports in mappings.items(): if published_ports: for port in published_ports: published_port, service_port = port service_port_mappings.append( PortMapping( ContainerPort=target_port, HostPort=If(USE_FARGATE_CON_T, NoValue, published_port), Protocol=protocol.lower(), Name=set_else_none( "name", service_port, f"{protocol.lower()}_{target_port}", ), AppProtocol=set_else_none( "app_protocol", target_port, NoValue ), ) ) else: service_port_mappings.append( PortMapping( ContainerPort=target_port, HostPort=NoValue, Protocol=protocol.lower(), Name=f"{protocol.lower()}_{target_port}", ) ) self.handle_expose_ports(service_port_mappings) return service_port_mappings
[docs] def import_docker_labels(self, definition: dict): """ Import the Docker labels if defined """ labels = {} if not keyisset("labels", definition): return labels else: if isinstance(definition["labels"], dict): self.docker_labels.update(definition["labels"]) elif isinstance(definition["labels"], list): for label in definition["labels"]: splits = label.split("=") self.docker_labels.update( {splits[0]: splits[1] if len(splits) == 2 else ""} )
[docs] def set_container_definition(self): """ Function to define the container definition matching the service definition """ secrets = [secret for secrets in self.secrets for secret in secrets.ecs_secret] self.container_definition = ContainerDefinition( Image=Ref(self.image.image_param), Name=self.name, Cpu=self.cpu_amount, Memory=self.memory_limit, MemoryReservation=self.memory_reservations, PortMappings=[], Environment=self.cfn_environment, LogConfiguration=NoValue, Command=self.command, EntryPoint=self.entrypoint, HealthCheck=self.ecs_healthcheck, DependsOn=NoValue, Essential=self.is_essential, Secrets=secrets, Ulimits=self.ulimits, LinuxParameters=If( USE_WINDOWS_OS_T, NoValue, LinuxParameters( Capabilities=self.kernel_properties, SharedMemorySize=self.shm_size, Tmpfs=self.tmpfs, ), ), Privileged=If( USE_FARGATE_CON_T, NoValue, If(USE_WINDOWS_OS_T, NoValue, keyisset("Privileged", self.definition)), ), WorkingDirectory=self.working_dir, DockerLabels=self.docker_labels, ReadonlyRootFilesystem=If( USE_WINDOWS_OS_T, NoValue, keyisset("read_only", self.definition) ), StopTimeout=self.stop_grace_period, SystemControls=self.sysctls, User=( If(USE_WINDOWS_OS_T, NoValue, self.ecs_user) if self.ecs_user != NoValue else self.ecs_user ), ) _to_add = [secret.env_var for secret in self.secrets] extend_container_envvars(self.container_definition, _to_add)
[docs] def set_user_group(self): """ Method to assign the user / group IDs for the container """ user_value = set_else_none("user", self.definition, alt_value=None) if isinstance(user_value, int): self.user = str(user_value) self.group = self.user elif isinstance(user_value, str): valid_pattern = re.compile( r"(^\d{1,5}$|(?P<user>^\d{1,5}):(?P<group>\d{1,5})$)" ) groups = valid_pattern.match(user_value) if not groups: raise ValueError("when using user:group, use the UID instead of name") if groups.group("user") and groups.group("group"): self.user = str(groups.group("user")) self.group = str(groups.group("group")) else: self.user = groups.groups()[0] self.group = self.user if self.user and self.group: self.user_group = f"{self.user}:{self.group}"
[docs] def set_x_credentials_secret(self, key): """ Method that will set the secret associated to the service to retrieve the docker image if defined through x-aws-pull_credentials """ if not keyisset(key, self.definition): return self.x_repo_credentials = self.definition[key]
[docs] def import_x_aws_settings(self): aws_keys = [ ("x-aws-autoscaling", dict, None), ("x-aws-pull_credentials", str, self.set_x_credentials_secret), ] for setting in aws_keys: if keyisset(setting[0], self.definition) and not isinstance( self.definition[setting[0]], setting[1] ): raise TypeError( f"{setting[0]} is of type", type(self.definition[setting[0]]), "Expected", setting[1], ) elif keyisset(setting[0], self.definition) and callable(setting[2]): setting[2](setting[0])
[docs] def composed_env_processing(self, settings: ComposeXSettings): env_vars: list[Environment] = getattr( self.container_definition, "Environment", [] ) for env_var in env_vars: if not isinstance(env_var, Environment) or not isinstance( env_var.Value, str ): continue sub_name, sub_params = sub_generate( env_var.Value, {}, settings, self.family ) env_var.Value = Sub(sub_name, **sub_params)