Source code for ecs_composex.efs.efs_ecs

# SPDX-License-Identifier: MPL-2.0
# Copyright 2020-2022 John Mille <john@compose-x.io>

from __future__ import annotations

from typing import TYPE_CHECKING, Union

if TYPE_CHECKING:
    from ecs_composex.common.settings import ComposeXSettings
    from ecs_composex.ecs.ecs_family import ComposeFamily
    from ecs_composex.efs.efs_stack import Efs

from compose_x_common.compose_x_common import keyisset, set_else_none
from troposphere import GetAtt, Ref
from troposphere.ecs import AuthorizationConfig, EFSVolumeConfiguration, Volume
from troposphere.efs import AccessPoint, CreationInfo, PosixUser, RootDirectory
from troposphere.iam import PolicyType

from ecs_composex.common.logging import LOG
from ecs_composex.common.troposphere_tools import add_parameters, add_resource
from ecs_composex.ecs.ecs_params import TASK_T
from ecs_composex.efs.efs_params import FS_ARN, FS_ID, FS_MNT_PT_SG_ID, FS_PORT
from ecs_composex.rds_resources_settings import (
    add_security_group_ingress,
    handle_new_tcp_resource,
)


[docs]def get_volumes(task_definition): """ Function to fetch the volumes from the task definition :param troposphere.ecs.TaskDefinition task_definition: :return: the volumes list of the task definition or new ones :rtype: list """ volumes = ( getattr(task_definition, "Volumes") if (hasattr(task_definition, "Volumes")) else [] ) if not volumes: setattr(task_definition, "Volumes", volumes) return volumes
[docs]def get_service_mount_points(service): """ Function to get the MountPoints from the container definition of a service :param ecs_composex.common.compose_services.ComposeService service: :return: list of mount points or new list :rtype: list """ mount_points = [] if not hasattr(service.container_definition, "MountPoints"): setattr(service.container_definition, "MountPoints", mount_points) else: mount_points = getattr(service.container_definition, "MountPoints") return mount_points
[docs]def add_task_iam_access_to_access_point(family, access_points, efs): """ Function to add IAM Permissions to mount to EFS via AccessPoint for ECS Task :param ecs_composex.ecs.ecs_family.ComposeFamily family: :param list<troposphere.efs.AccessPoint> access_points: :param ecs_composex.efs.efs_stack.Efs efs: """ policy = PolicyType( f"{family.logical_name}IamAccess", PolicyName=f"IamAccessToEfs{efs.logical_name}", PolicyDocument={ "Version": "2012-10-17", "Statement": [ { "Sid": f"{family.logical_name}EfsAccess", "Effect": "Allow", "Action": [ "elasticfilesystem:ClientMount", "elasticfilesystem:ClientWrite", "elasticfilesystem:ClientRootAccess", ], "Resource": [ Ref(efs.attributes_outputs[FS_ARN]["ImportParameter"]) ], "Condition": { "StringEquals": { "elasticfilesystem:AccessPointArn": [ GetAtt(access_point, "Arn") for access_point in access_points ] } }, } ], }, Roles=[family.iam_manager.task_role.name], ) add_resource(family.template, policy)
[docs]def add_efs_definition_to_target_family(new_efs, target): add_parameters( target[0].template, [new_efs.attributes_outputs[FS_ARN]["ImportParameter"]], ) target[0].stack.Parameters.update( { new_efs.attributes_outputs[FS_ARN][ "ImportParameter" ].title: new_efs.attributes_outputs[FS_ARN]["ImportValue"] } )
[docs]def override_service_volume(new_efs, fs_id, target, access_points, volumes): """ Function to override service volume if a specific definition was set for it """ for service in target[2]: if not service.user: continue mount_points = get_service_mount_points(service) for count, mount_pt in enumerate(mount_points): if mount_pt.SourceVolume == new_efs.volume.volume_name: container_volume_name = ( f"{new_efs.volume.volume_name}{service.logical_name}" ) setattr( mount_pt, "SourceVolume", container_volume_name, ) sub_service_specific_access_point = AccessPoint( f"{new_efs.logical_name}{service.logical_name}ServiceEfsAccessPoint", FileSystemId=Ref(fs_id), PosixUser=PosixUser( Uid=service.user, Gid=service.group if service.group else service.user, ), RootDirectory=RootDirectory( CreationInfo=CreationInfo( OwnerUid=service.user, OwnerGid=service.group if service.group else service.user, Permissions=set_else_none( "RootDirectoryCreateMode", new_efs.parameters, "0775" ), ), Path=mount_pt.ContainerPath, ), ) add_resource(target[0].template, sub_service_specific_access_point) access_points.append(sub_service_specific_access_point) volumes.append( Volume( EFSVolumeConfiguration=EFSVolumeConfiguration( FilesystemId=Ref(fs_id), AuthorizationConfig=AuthorizationConfig( AccessPointId=Ref(sub_service_specific_access_point), IAM="ENABLED", ), ), Name=container_volume_name, ) )
[docs]def set_user_to_access_points(efs, fs_id, access_points, service): """ Function to set the PosixUser to a specific access point for a specific given service """ group_id = service.group if service.group else service.user mount_points = get_service_mount_points(service) for mount_pt in mount_points: if mount_pt.SourceVolume == efs.volume.volume_name: for access_point in access_points: setattr( access_point, "PosixUser", PosixUser(Uid=service.user, Gid=group_id), ) setattr( access_point, "RootDirectory", RootDirectory( CreationInfo=CreationInfo( OwnerUid=service.user, OwnerGid=group_id, Permissions=set_else_none( "RootDirectoryCreateMode", efs.parameters, "0775" ), ), Path=mount_pt.ContainerPath, ), ),
[docs]def override_efs_settings(new_efs, target, fs_id_parameter, access_points, volumes): """ Function to determine if access points should be set on a per service of the task definition and update the volumes and mount points accordingly :param ecs_composex.efs.efs_stack.Efs new_efs: :param tuple target: :param ecs_composex.common.cfn_params.Parameter fs_id_parameter: :param list access_points: :param list volumes: :return: """ if [service.user for service in target[2]] and len( [service.user for service in target[2]] ) > 1: override_service_volume( new_efs, fs_id_parameter, target, access_points, volumes ) elif [service.user for service in target[2]] and len( [service.user for service in target[2]] ) == 1: for service in target[2]: if service.user: set_user_to_access_points( new_efs, fs_id_parameter, access_points, service, )
[docs]def looked_up_efs_family_hook( efs: Efs, family: ComposeFamily, settings: ComposeXSettings ) -> None: sg_id = efs.add_attribute_to_another_stack(family.stack, FS_MNT_PT_SG_ID, settings) add_parameters(family.template, [sg_id["ImportParameter"]]) add_security_group_ingress( family.stack, efs.logical_name, Ref(sg_id["ImportParameter"]), 2049 ) family.stack.Parameters.update( {sg_id["ImportParameter"].title: sg_id["ImportValue"]} )
[docs]def expand_family_with_efs_volumes( efs_root_stack_title: str, efs: Efs, settings: ComposeXSettings ): """ Function to add the EFS Volume definition to the task definition for the service to use. """ fs_id_parameter = efs.attributes_outputs[FS_ID]["ImportParameter"] fs_id_getatt = efs.attributes_outputs[FS_ID]["ImportValue"] for target in efs.families_targets: family: ComposeFamily = target[0] if family.service_compute.launch_type == "EXTERNAL": LOG.warning( f"x-efs - {family.name} - When using EXTERNAL Launch Type, networking settings cannot be set." ) continue if efs.lookup: looked_up_efs_family_hook(efs, family, settings) access_points = [] family.stack.Parameters.update({fs_id_parameter.title: fs_id_getatt}) add_parameters(family.template, [fs_id_parameter]) task_definition = family.template.resources[TASK_T] efs_config_kwargs = {"FilesystemId": Ref(fs_id_parameter)} access_point_title: str = ( f"{efs.logical_name}{family.logical_name}EfsAccessPoint" ) efs_access_point = None if ( efs.parameters and keyisset("EnforceIamAuth", efs.parameters) or [service.user for service in target[2]] ): add_efs_definition_to_target_family(efs, target) efs_access_point = add_resource( family.template, AccessPoint( access_point_title, FileSystemId=Ref(fs_id_parameter), ), ) if not efs_access_point and access_point_title in family.template.resources: efs_access_point = family.template.resources[access_point_title] if efs_access_point not in access_points: access_points.append(efs_access_point) if efs_access_point: efs_config_kwargs.update( { "AuthorizationConfig": AuthorizationConfig( AccessPointId=Ref(efs_access_point), IAM="ENABLED" ), "TransitEncryption": "ENABLED", } ) efs_volume_definition = Volume( EFSVolumeConfiguration=EFSVolumeConfiguration(**efs_config_kwargs), Name=efs.volume.volume_name, ) volumes = get_volumes(task_definition) volumes.append(efs_volume_definition) override_efs_settings(efs, target, fs_id_parameter, access_points, volumes) add_task_iam_access_to_access_point(family, access_points, efs)
[docs]def efs_to_ecs(resources, services_stack, res_root_stack, settings): """Function to associate back the EFS FS to services.""" for resource_name, resource in resources.items(): LOG.info(f"{resource.module.res_key}.{resource_name} - Linking to services") if not resource.mappings and resource.cfn_resource: handle_new_tcp_resource( resource, port_parameter=FS_PORT, sg_parameter=FS_MNT_PT_SG_ID, settings=settings, ) expand_family_with_efs_volumes(res_root_stack.title, resource, settings) else: expand_family_with_efs_volumes(None, resource, settings)