Source code for ecs_composex.s3.s3_ecs_cluster

#  SPDX-License-Identifier: MPL-2.0
#  Copyright 2020-2022 John Mille <john@compose-x.io>

"""
Functions to allow the S3 Bucket to interpolate x-s3 values for ECS Cluster
"""

from __future__ import annotations

from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from ecs_composex.s3.s3_bucket import Bucket
    from ecs_composex.ecs_cluster import EcsCluster
    from troposphere.ecs import ExecuteCommandLogConfiguration
    from ecs_composex.common.settings import ComposeXSettings

from troposphere import GetAtt

from ecs_composex.s3.s3_params import S3_BUCKET_NAME


[docs]def interpolate_s3_bucket_name( log_config: ExecuteCommandLogConfiguration, bucket: Bucket ) -> None: """ Replaces the value for x-s3::<bucket_name> for the log configuration with the appropriate parameters :param log_config: :param bucket: """ bucket_id = bucket.attributes_outputs[S3_BUCKET_NAME] if bucket.cfn_resource: setattr( log_config, "S3BucketName", GetAtt(bucket.stack.title, f"Outputs.{bucket_id['ImportParameter'].title}"), ) elif bucket.mappings: setattr(log_config, "S3BucketName", bucket_id["ImportValue"])
[docs]def update_cluster_s3_property(ecs_cluster: EcsCluster, bucket: Bucket) -> None: """ :param ecs_cluster: :param bucket: """ if not ecs_cluster.cfn_resource or not hasattr( ecs_cluster.cfn_resource, "Configuration" ): return configuration = getattr(ecs_cluster.cfn_resource, "Configuration") if not hasattr(configuration, "ExecuteCommandConfiguration"): return exec_config = getattr(configuration, "ExecuteCommandConfiguration") if not hasattr(exec_config, "LogConfiguration"): return log_configuration = getattr(exec_config, "LogConfiguration") if not hasattr(log_configuration, "S3BucketName"): return bucket_name = getattr(log_configuration, "S3BucketName") if not bucket_name.startswith(f"{bucket.module.res_key}::") or not isinstance( bucket_name, str ): return x_s3_bucket_name = bucket_name.split(r"::")[1] if not x_s3_bucket_name == bucket.name: return interpolate_s3_bucket_name(log_configuration, bucket)
[docs]def handle_ecs_cluster(settings: ComposeXSettings, bucket: Bucket) -> None: """ Entrypoint function to updating the ECS Cluster properties :param settings: :param bucket: """ if settings.ecs_cluster and settings.ecs_cluster.cfn_resource: update_cluster_s3_property(settings.ecs_cluster, bucket)