# -*- coding: utf-8 -*-
# SPDX-License-Identifier: MPL-2.0
# Copyright 2020-2021 John Mille <john@compose-x.io>
import re
from botocore.exceptions import ClientError
from compose_x_common.aws import get_assume_role_session
from compose_x_common.aws.ecs import (
CLUSTER_NAME_FROM_ARN,
describe_all_ecs_clusters_from_ccapi,
list_all_ecs_clusters,
)
from compose_x_common.compose_x_common import keyisset
from troposphere import (
AWS_ACCOUNT_ID,
AWS_NO_VALUE,
AWS_PARTITION,
AWS_REGION,
AWS_STACK_NAME,
AWS_URL_SUFFIX,
FindInMap,
GetAtt,
Ref,
Sub,
)
from troposphere.ecs import (
CapacityProviderStrategyItem,
Cluster,
ClusterConfiguration,
ExecuteCommandConfiguration,
ExecuteCommandLogConfiguration,
)
from troposphere.logs import LogGroup
from ecs_composex.common import LOG
from ecs_composex.common.services_helpers import get_closest_valid_log_retention_period
from ecs_composex.ecs import metadata
from ecs_composex.ecs.ecs_params import CLUSTER_NAME, CLUSTER_T
from ecs_composex.kms.kms_stack import KmsKey
from ecs_composex.resources_import import import_record_properties
from ecs_composex.s3.s3_stack import Bucket
from ecs_composex.s3.s3_template import evaluate_parameters, generate_bucket
RES_KEY = "x-cluster"
FARGATE_PROVIDER = "FARGATE"
FARGATE_SPOT_PROVIDER = "FARGATE_SPOT"
DEFAULT_PROVIDERS = [FARGATE_PROVIDER, FARGATE_SPOT_PROVIDER]
DEFAULT_STRATEGY = [
CapacityProviderStrategyItem(
Weight=2, Base=1, CapacityProvider=FARGATE_SPOT_PROVIDER
),
CapacityProviderStrategyItem(Weight=1, CapacityProvider=FARGATE_PROVIDER),
]
[docs]def get_kms_key_config(cluster_name, allow_kms_reuse=False):
return
[docs]class EcsCluster(object):
"""
Class to make it easier to manipulate the ECS Cluster to use and its various properties
"""
mappings_key = "ecs"
res_key = "x-cluster"
def __init__(self, root_stack, definition=None, **kwargs):
self.cfn_resource = None
self.mappings = {}
self.log_group = None
self.log_bucket = None
self.log_prefix = None
self.log_key = None
self.stack = None
self.template = None
self.platform_override = None
self.capacity_providers = []
self.default_strategy_providers = []
self.cluster_identifier = Ref(AWS_STACK_NAME)
if definition is None:
self.set_default_cluster_config(root_stack)
self.parameters = {}
self.platform_override = None
else:
self.definition = definition
self.use = (
self.definition["Use"] if keyisset("Use", self.definition) else {}
)
self.lookup = (
self.definition["Lookup"] if keyisset("Lookup", self.definition) else {}
)
self.properties = (
self.definition["Properties"]
if keyisset("Properties", self.definition)
else {}
)
self.parameters = (
self.definition["MacroParameters"]
if keyisset("MacroParameters", self.definition)
else {}
)
[docs] def set_from_definition(self, root_stack, session, settings):
if self.definition and self.use:
self.mappings = {CLUSTER_NAME.title: {"Name": self.use}}
root_stack.stack_template.add_mapping(self.mappings_key, self.mappings)
self.cluster_identifier = FindInMap(
self.mappings_key, CLUSTER_NAME.title, "Name"
)
elif self.lookup:
self.lookup_cluster(session)
root_stack.stack_template.add_mapping(self.mappings_key, self.mappings)
elif self.properties:
self.define_cluster(root_stack, settings)
[docs] def set_default_cluster_config(self, root_stack):
"""
Function to get the default defined ECS Cluster configuration
:return: cluster
:rtype: troposphere.ecs.Cluster
"""
self.log_group = LogGroup(
"EcsExecLogGroup",
LogGroupName=Sub(f"ecs/execute-logs/${{{AWS_STACK_NAME}}}"),
RetentionInDays=120,
)
self.cfn_resource = Cluster(
CLUSTER_T,
ClusterName=Ref(AWS_STACK_NAME),
CapacityProviders=DEFAULT_PROVIDERS,
DefaultCapacityProviderStrategy=DEFAULT_STRATEGY,
Configuration=ClusterConfiguration(
ExecuteCommandConfiguration=ExecuteCommandConfiguration(
Logging="DEFAULT",
LogConfiguration=ExecuteCommandLogConfiguration(
CloudWatchLogGroupName=Ref(self.log_group),
),
)
),
Metadata=metadata,
)
root_stack.stack_template.add_resource(self.log_group)
root_stack.stack_template.add_resource(self.cfn_resource)
self.capacity_providers = DEFAULT_PROVIDERS
self.cluster_identifier = Ref(self.cfn_resource)
[docs] def import_log_config(self, exec_config):
"""
Sets the properties for bucket and cw log group to use for ECS Execute
:param dict exec_config:
:return:
"""
if keyisset("LogConfiguration", exec_config):
log_config = exec_config["LogConfiguration"]
if keyisset("CloudWatchLogGroupName", log_config):
self.mappings[CLUSTER_NAME.title][
"CloudWatchLogGroupName"
] = log_config["CloudWatchLogGroupName"]
self.log_group = FindInMap(
self.mappings_key,
CLUSTER_NAME.title,
"CloudWatchLogGroupName",
)
if keyisset("S3BucketName", log_config):
self.mappings[CLUSTER_NAME.title]["S3BucketName"] = log_config[
"S3BucketName"
]
self.log_bucket = FindInMap(
self.mappings_key, CLUSTER_NAME.title, "S3BucketName"
)
[docs] def set_cluster_mappings(self, cluster_api_def):
"""
From the API info on the cluster, evaluate whether config is needed to enable
ECS Execution
:param dict cluster_api_def:
"""
if keyisset("Configuration", cluster_api_def):
config = cluster_api_def["Configuration"]
if keyisset("ExecuteCommandConfiguration", config):
exec_config = config["ExecuteCommandConfiguration"]
if keyisset("KmsKeyId", exec_config):
self.mappings[CLUSTER_NAME.title]["KmsKeyId"] = exec_config[
"KmsKeyId"
]
self.log_key = FindInMap(
self.mappings_key, CLUSTER_NAME.title, "KmsKeyId"
)
self.import_log_config(exec_config)
[docs] def lookup_cluster(self, session):
"""
Define the ECS Cluster properties and definitions from ECS API.
:param boto3.session.Session session: Boto3 session to make API calls.
:return: The cluster details
:rtype: dict
"""
if not isinstance(self.lookup, (str, dict)):
raise TypeError(
"The value for Lookup must be", str, dict, "Got", type(self.lookup)
)
ecs_session = session
if isinstance(self.lookup, dict):
if keyisset("RoleArn", self.lookup):
ecs_session = get_assume_role_session(
session,
self.lookup["RoleArn"],
session_name="EcsClusterLookup@ComposeX",
)
cluster_name = self.lookup["ClusterName"]
else:
cluster_name = self.lookup
try:
clusters = list_all_ecs_clusters(session=ecs_session)
cluster_names = [
CLUSTER_NAME_FROM_ARN.match(c_name).group("name") for c_name in clusters
]
clusters_config = describe_all_ecs_clusters_from_ccapi(
clusters, return_as_map=True, use_cluster_name=True, session=ecs_session
)
if cluster_name not in clusters_config.keys():
raise LookupError(
f"Failed to find {cluster_name}. Available clusters are",
cluster_names,
)
the_cluster = clusters_config[cluster_name]
LOG.info(
f"Found ECS Cluster {cluster_name}. Setting {CLUSTER_NAME.title} accordingly."
)
self.mappings = {CLUSTER_NAME.title: {"Name": the_cluster["ClusterName"]}}
self.set_cluster_mappings(the_cluster)
self.capacity_providers = evaluate_capacity_providers(the_cluster)
if self.capacity_providers:
self.default_strategy_providers = get_default_capacity_strategy(
the_cluster
)
self.platform_override = evaluate_fargate_is_set(
self.capacity_providers, the_cluster
)
self.cluster_identifier = FindInMap(
self.mappings_key, CLUSTER_NAME.title, "Name"
)
except ClientError as error:
LOG.error(error)
raise
[docs] def set_kms_key(
self, cluster_name, root_stack, settings, log_settings, log_configuration
):
"""
Defines the KMS Key created to encrypt ECS Execute commands
:param str cluster_name:
:param ecs_composex.common.stacks.ComposeXStack root_stack:
:param ecs_composex.common.settings.ComposeXSettings settings:
:param dict log_settings:
:param dict log_configuration:
"""
action = [
"kms:Encrypt*",
"kms:Decrypt*",
"kms:ReEncrypt*",
"kms:GenerateDataKey*",
"kms:Describe*",
]
statement = [
{
"Sid": "Allow direct access to key metadata to the account",
"Effect": "Allow",
"Principal": {
"AWS": Sub(
f"arn:${{{AWS_PARTITION}}}:iam::${{{AWS_ACCOUNT_ID}}}:root"
)
},
"Action": ["kms:*"],
"Resource": "*",
"Condition": {
"StringEquals": {"kms:CallerAccount": Ref(AWS_ACCOUNT_ID)}
},
},
{
"Sid": "Allows SSM to use the KMS key to encrypt/decrypt messages",
"Effect": "Allow",
"Principal": {"Service": Sub(f"ssm.${{{AWS_URL_SUFFIX}}}")},
"Action": action,
"Resource": "*",
},
]
if keyisset("CreateExecLoggingLogGroup", self.parameters):
statement.append(
{
"Sid": "Allow aws logs to encrypt decrypt messages",
"Effect": "Allow",
"Principal": {
"Service": Sub(f"logs.${{{AWS_REGION}}}.${{{AWS_URL_SUFFIX}}}")
},
"Action": action,
"Resource": "*",
"Condition": {
"ArnLike": {
"kms:EncryptionContext:aws:logs:arn": Sub(
f"arn:${{{AWS_PARTITION}}}:logs:${{{AWS_REGION}}}:${{{AWS_ACCOUNT_ID}}}:"
"log-group:*"
if keyisset("AllowKmsKeyReuse", self.parameters)
else f"arn:${{{AWS_PARTITION}}}:logs:${{{AWS_REGION}}}:${{{AWS_ACCOUNT_ID}}}:"
f"log-group:/ecs/execute-logs/{cluster_name}"
)
}
},
}
)
elif keyisset("AllowKmsKeyReuse", self.parameters):
statement.append(
{
"Sid": "Allow aws logs to encrypt decrypt messages",
"Effect": "Allow",
"Principal": {
"Service": Sub(f"logs.${{{AWS_REGION}}}.${{{AWS_URL_SUFFIX}}}")
},
"Action": action,
"Resource": "*",
"Condition": {
"ArnLike": {
"kms:EncryptionContext:aws:logs:arn": Sub(
f"arn:${{{AWS_PARTITION}}}:logs:${{{AWS_REGION}}}:${{{AWS_ACCOUNT_ID}}}:"
"log-group:*"
)
}
},
}
)
key_config = {
"Properties": {
"EnableKeyRotationg": True,
"Enabled": True,
"Description": Sub(
f"ECS Cluster {cluster_name} execute logging encryption key"
),
"KeyPolicy": {
"Version": "2012-10-17",
"Id": "ecscluster-logging",
"Statement": statement,
},
},
"Settings": {"Alias": Sub(f"alias/ecs/execute-logs/{cluster_name}")},
}
self.log_key = KmsKey(
"ECSClusterLoggingKmsKey", key_config, "cluster", settings
)
self.log_key.stack = root_stack
self.log_key.define_kms_key()
root_stack.stack_template.add_resource(self.log_key.cfn_resource)
log_settings["KmsKeyId"] = GetAtt(self.log_key.cfn_resource, "Arn")
log_configuration["CloudWatchEncryptionEnabled"] = True
[docs] def set_log_group(self, cluster_name, root_stack, log_configuration):
self.log_group = LogGroup(
"EcsExecLogGroup",
LogGroupName=Sub(f"/ecs/execute-logs/{cluster_name}"),
RetentionInDays=120
if not keyisset("LogGroupRetentionInDays", self.parameters)
else get_closest_valid_log_retention_period(
self.parameters["LogGroupRetentionInDays"]
),
KmsKeyId=GetAtt(self.log_key.cfn_resource, "Arn")
if isinstance(self.log_key, KmsKey)
else Ref(AWS_NO_VALUE),
DependsOn=[self.log_key.cfn_resource.title]
if isinstance(self.log_key, KmsKey)
else [],
)
root_stack.stack_template.add_resource(self.log_group)
log_configuration["CloudWatchLogGroupName"] = Ref(self.log_group)
if isinstance(self.log_key, KmsKey):
log_configuration["CloudWatchEncryptionEnabled"] = True
[docs] def set_log_bucket(self, cluster_name, root_stack, settings, log_configuration):
"""
Defines the S3 bucket and settings to log ECS Execution commands
:param str cluster_name:
:param ecs_composex.common.stacks.ComposeXStack root_stack:
:param ecs_composex.common.settings.ComposeXSettings settings:
:param dict log_configuration:
:return:
"""
bucket_config = {
"Properties": {
"AccessControl": "BucketOwnerFullControl",
"PublicAccessBlockConfiguration": {
"BlockPublicAcls": True,
"BlockPublicPolicy": True,
"IgnorePublicAcls": True,
"RestrictPublicBuckets": True,
},
},
"MacroParameters": {
"ExpandRegionToBucket": True,
"ExpandAccountIdToBucket": True,
"BucketPolicy": {
"PredefinedBucketPolicies": ["enforceSecureConnection"]
},
},
}
if isinstance(self.log_key, KmsKey):
bucket_config["Properties"]["BucketEncryption"] = {
"ServerSideEncryptionConfiguration": [
{
"BucketKeyEnabled": True,
"ServerSideEncryptionByDefault": {
"SSEAlgorithm": "aws:kms",
"KMSMasterKeyID": GetAtt(self.log_key.cfn_resource, "Arn"),
},
}
]
}
self.log_bucket = Bucket(
"ECSClusterLoggingBucket",
bucket_config,
"cluster",
settings,
)
self.log_bucket.stack = root_stack
generate_bucket(self.log_bucket)
evaluate_parameters(self.log_bucket, root_stack.stack_template)
root_stack.stack_template.add_resource(self.log_bucket.cfn_resource)
log_configuration["S3BucketName"] = Ref(self.log_bucket.cfn_resource)
log_configuration["S3KeyPrefix"] = Sub(f"ecs/execute-logs/{cluster_name}/")
log_configuration["S3EncryptionEnabled"] = True
[docs] def update_props_from_parameters(self, root_stack, settings):
"""
Aadapt cluster config to settings
:param ecs_composex.common.stacks.ComposeXStack root_stack:
:param ecs_composex.common.settings.ComposeXSettings settings:
"""
cluster_name = (
f"${{{AWS_STACK_NAME}}}"
if isinstance(self.cfn_resource.ClusterName, (Ref, Sub, FindInMap))
else self.cfn_resource.ClusterName
)
self.log_group = Ref(AWS_NO_VALUE)
self.log_bucket = Ref(AWS_NO_VALUE)
log_settings = {}
log_configuration = {}
if keyisset("CreateExecLoggingKmsKey", self.parameters):
self.set_kms_key(
cluster_name, root_stack, settings, log_settings, log_configuration
)
if keyisset("CreateExecLoggingLogGroup", self.parameters):
self.set_log_group(cluster_name, root_stack, log_configuration)
if keyisset("CreateExecLoggingBucket", self.parameters):
self.set_log_bucket(cluster_name, root_stack, settings, log_configuration)
log_settings["LogConfiguration"] = ExecuteCommandLogConfiguration(
**log_configuration
)
log_settings["Logging"] = "OVERRIDE"
configuration = ClusterConfiguration(
ExecuteCommandConfiguration=ExecuteCommandConfiguration(**log_settings)
)
if not hasattr(self.cfn_resource, "Configuration"):
setattr(self.cfn_resource, "Configuration", configuration)
[docs] def define_cluster(self, root_stack, settings):
"""
Function to create the cluster from provided properties.
:param ecs_composex.common.stacks.ComposeXStack root_stack:
:param ecs_composex.common.settings.ComposeXSettings settings:
"""
props = import_record_properties(self.properties, Cluster)
props["Metadata"] = metadata
if not keyisset("ClusterName", props):
props["ClusterName"] = Ref(AWS_STACK_NAME)
if keyisset("DefaultCapacityProviderStrategy", props) and not keyisset(
"CapacityProviders", props
):
raise KeyError(
"When specifying DefaultCapacityProviderStrategy"
" you must specify CapacityProviders"
)
self.cfn_resource = Cluster(CLUSTER_T, **props)
root_stack.stack_template.add_resource(self.cfn_resource)
if self.parameters:
self.update_props_from_parameters(root_stack, settings)
self.cluster_identifier = Ref(self.cfn_resource)
[docs]def evaluate_fargate_is_set(providers, cluster_def):
"""
Evaluate if FARGATE or FARGATE_SPOT is defined in the cluster
:param list[str] providers:
:param dict cluster_def:
:return: Whether FARGATE or FARGATE_SPOT is available
:rtype: bool
"""
fargate_present = FARGATE_PROVIDER in providers
fargate_spot_present = FARGATE_SPOT_PROVIDER in providers
if not fargate_present and not fargate_spot_present:
LOG.warning(
f"{cluster_def['ClusterName']} - "
f"No {FARGATE_PROVIDER} nor {FARGATE_SPOT_PROVIDER} listed in Capacity Providers."
"Overriding to EC2 Launch Type"
)
return "EC2"
return None
[docs]def evaluate_capacity_providers(cluster_def):
"""
When using Looked'Up cluster, if there is no Fargate Capacity Provider, defined on cluster,
rollback to EC2 mode.
:param dict cluster_def:
:return: List of capacity providers set on the ECS Cluster.
:rtype: list
"""
providers = []
if keyisset("CapacityProviders", cluster_def):
providers = cluster_def["CapacityProviders"]
if not providers:
LOG.warning(
f"{cluster_def['ClusterName']} - No capacityProvider defined. Fallback to ECS Default"
"Overriding to EC2"
)
return providers
[docs]def get_default_capacity_strategy(cluster_def):
strategy_providers = (
[
cap["CapacityProvider"]
for cap in cluster_def["DefaultCapacityProviderStrategy"]
]
if keyisset("DefaultCapacityProviderStrategy", cluster_def)
else []
)
return strategy_providers
[docs]def import_from_x_aws_cluster(compose_content):
"""
Function to handle and override settings if x-aws-cluster is defined.
:param compose_content:
:return:
"""
x_aws_key = "x-aws-cluster"
if not keyisset(x_aws_key, compose_content):
return
if compose_content[x_aws_key].startswith("arn:aws"):
cluster_name = re.sub(
pattern=r"(arn:aws(?:-[a-z]+)?:ecs:[\S]+:[0-9]{12}:cluster/)",
repl="",
string=compose_content[x_aws_key],
)
else:
cluster_name = compose_content[x_aws_key]
compose_content[RES_KEY] = {"Use": cluster_name}
[docs]def add_ecs_cluster(root_stack, settings):
"""
Function to create the ECS Cluster.
:param ecs_composex.common.stacks.ComposeXStack root_stack:
:param ecs_composex.common.settings.ComposeXSettings settings:
"""
if keyisset("x-aws-cluster", settings.compose_content):
import_from_x_aws_cluster(settings.compose_content)
LOG.info("x-aws-cluster was set. Overriding any defined x-cluster settings")
if not keyisset(EcsCluster.res_key, settings.compose_content):
LOG.info("No cluster information provided. Creating a new one")
cluster = EcsCluster(root_stack)
elif isinstance(settings.compose_content[RES_KEY], dict):
cluster = EcsCluster(root_stack, settings.compose_content[EcsCluster.res_key])
cluster.set_from_definition(root_stack, settings.session, settings)
else:
raise LookupError("Unable to determine what to do for x-cluster")
settings.ecs_cluster = cluster