Source code for ecs_composex.compose.compose_volumes.ecs_family_helpers

#  SPDX-License-Identifier: MPL-2.0
#  Copyright 2020-2022 John Mille <john@compose-x.io>

from __future__ import annotations

from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from ecs_composex.ecs.ecs_family import ComposeFamily
    from ecs_composex.compose.compose_services import ComposeService

from compose_x_common.compose_x_common import keyisset, keypresent, set_else_none
from troposphere import If, NoValue
from troposphere.ecs import DockerVolumeConfiguration, Host, MountPoint, Volume

from ecs_composex.common import NONALPHANUM
from ecs_composex.ecs.ecs_conditions import USE_FARGATE_CON_T


[docs]def mount_point_exists( mount_points: list[MountPoint], new_mount: MountPoint, family: ComposeFamily, service: ComposeService, ) -> bool: """ Function to list out all unique mount points defined. :param list[MountPoint] mount_points: Existing mount points :param MountPoint new_mount: :param ComposeFamily family: Family the mount point is for :param ComposeService service: Service the mount point is added to :return: Unique list of mount points :rtype: list """ for existing_mnt in mount_points: if not isinstance(existing_mnt, MountPoint): raise TypeError( family.name, service.name, "mount point is not", MountPoint, "Got", existing_mnt, type(existing_mnt), ) if ( existing_mnt.ContainerPath == new_mount.ContainerPath and existing_mnt.SourceVolume == new_mount.SourceVolume ): if existing_mnt.ReadOnly and not new_mount.ReadOnly: print( "Duplicate mounts but existing one is ReadOnly, not the new one. Continue", existing_mnt.to_dict(), ) return True return False
[docs]def set_services_mount_points(family): """ Method to set the mount points to the Container Definition of the defined service if the volume["volume"] is none, this is not a shared volume, which then works only when not using Fargate (i.e. EC2 host/ ECS Anywhere) """ for service in family.services: mount_points = [] if not hasattr(service.container_definition, "MountPoints"): setattr(service.container_definition, "MountPoints", mount_points) else: mount_points = getattr(service.container_definition, "MountPoints") for volume in service.volumes: if keyisset("volume", volume): mnt_point = MountPoint( ContainerPath=volume["target"], ReadOnly=volume["read_only"], SourceVolume=volume["volume"].volume_name, ) else: mnt_point = If( USE_FARGATE_CON_T, NoValue, MountPoint( ContainerPath=volume["target"], ReadOnly=volume["read_only"], SourceVolume=NONALPHANUM.sub("", volume["target"]), ), ) if not mount_point_exists(mount_points, mnt_point, family, service): mount_points.append(mnt_point)
[docs]def define_shared_volumes(family): """ Method to create a list of shared volumes within the task family and set the volume to shared = True if not. :return: list of shared volumes within the task definition :rtype: list """ family_task_volumes = [] for service in family.services: for volume in service.volumes: if not keyisset("volume", volume): continue if volume["volume"] and volume["volume"] not in family_task_volumes: family_task_volumes.append(volume["volume"]) else: volume["volume"].is_shared = True return family_task_volumes
[docs]def define_host_volumes(family): """ Goes over all volumes of all services and if the volume is None, source starts with / then this is a host volume :return: list of volumes :rtype: list[dict] """ host_volumes = [] for service in family.services: for volume in service.volumes: v_volume = set_else_none("volume", volume) v_source = set_else_none("source", volume) if ( not v_volume and v_source and v_source.startswith(r"/") or v_volume.type == "bind" ): host_volumes.append(volume) return host_volumes
[docs]def set_volumes(family): """ Method to create the volumes definition to the Task Definition :return: """ family_task_volumes = define_shared_volumes(family) host_volumes = define_host_volumes(family) if not hasattr(family.task_definition, "Volumes"): family_definition_volumes = [] setattr(family.task_definition, "Volumes", family_definition_volumes) else: family_definition_volumes = getattr(family.task_definition, "Volumes") for volume in family_task_volumes: if volume.type == "volume" and volume.driver == "local": volume.cfn_volume = Volume( Host=NoValue, Name=volume.volume_name, DockerVolumeConfiguration=If( USE_FARGATE_CON_T, NoValue, DockerVolumeConfiguration( Scope="task", # if not volume.is_shared else "shared", Autoprovision=NoValue, # if not volume.is_shared else True, ), ), ) if volume.cfn_volume: family_definition_volumes.append(volume.cfn_volume) for volume_config in host_volumes: v_volume = set_else_none("volume", volume_config) if v_volume and v_volume.driver == "local" and v_volume.type == "bind": cfn_volume = If( USE_FARGATE_CON_T, NoValue, Volume( Host=Host(SourcePath=v_volume.driver_opts["device"]), DockerVolumeConfiguration=NoValue, Name=v_volume.name, ), ) else: cfn_volume = If( USE_FARGATE_CON_T, NoValue, Volume( Host=Host(SourcePath=volume_config["source"]), DockerVolumeConfiguration=NoValue, Name=NONALPHANUM.sub("", volume_config["target"]), ), ) if cfn_volume not in family_definition_volumes: family_definition_volumes.append(cfn_volume) set_services_mount_points(family)