Source code for ecs_composex.compose.compose_secrets

#  SPDX-License-Identifier: MPL-2.0
#  Copyright 2020-2022 John Mille <john@compose-x

"""
Package to manage docker-compose secrets
"""

import re
from copy import deepcopy

from compose_x_common.aws.kms import KMS_KEY_ARN_RE
from compose_x_common.aws.secrets_manager import get_secret_name_from_arn
from compose_x_common.compose_x_common import keyisset, set_else_none
from troposphere import AWS_ACCOUNT_ID, AWS_PARTITION, AWS_REGION, FindInMap, Sub
from troposphere.ecs import Environment as EcsEnvVar
from troposphere.ecs import Secret as EcsSecret

from ecs_composex.common import NONALPHANUM
from ecs_composex.common.logging import LOG
from ecs_composex.ecs.ecs_params import EXEC_ROLE_T, TASK_ROLE_T
from ecs_composex.secrets.secrets_aws import lookup_secret_config
from ecs_composex.secrets.secrets_params import RES_KEY, XRES_KEY

from .helpers import define_env_var_name


[docs]class ComposeSecret: """ Class to represent a Compose secret. """ x_key = XRES_KEY main_key = "secrets" map_kms_name = "KmsKeyId" map_arn_name = "Arn" map_name_name = "Name" json_keys_key = "JsonKeys" links_key = "LinksTo" map_name = RES_KEY def __init__(self, name, definition, settings): """ Method to init Secret for ECS ComposeX :param str name: :param dict definition: :param ecs_composex.common.settings.ComposeXSettings settings: """ self.services = [] self.name = name self.logical_name = NONALPHANUM.sub("", self.name) self.definition = deepcopy(definition) self.links = [EXEC_ROLE_T, TASK_ROLE_T] self.arn = None self.iam_arn = None self.aws_name = None self.kms_key = None self.kms_key_arn = None self.ecs_secret = [] self.mapping = {} if not keyisset("Lookup", self.definition[self.x_key]): self.define_names_from_import() else: self.define_names_from_lookup(settings.session) self.define_links() if self.mapping: settings.secrets_mappings.update({self.logical_name: self.mapping}) self.add_json_keys() @property def env_var(self) -> EcsEnvVar: env_var_name = set_else_none( "VarName", set_else_none("x-secrets", self.definition, alt_value={}), alt_value=re.sub(r"\W+", "", self.name.replace("-", "_").upper()), ) return EcsEnvVar(Name=env_var_name, Value=self.arn)
[docs] def define_secret(self, secret_name, json_key) -> EcsSecret: if isinstance(self.arn, str): secret = EcsSecret(Name=secret_name, ValueFrom=f"{self.arn}:{json_key}::") elif isinstance(self.arn, Sub): secret = EcsSecret( Name=secret_name, ValueFrom=Sub( f"arn:${{{AWS_PARTITION}}}:secretsmanager:${{{AWS_REGION}}}:${{{AWS_ACCOUNT_ID}}}:" f"secret:${{SecretName}}:{json_key}::", SecretName=FindInMap( self.map_name, self.logical_name, self.map_name_name, ), ), ) elif isinstance(self.arn, FindInMap): secret = EcsSecret( Name=secret_name, ValueFrom=Sub( f"${{SecretArn}}:{json_key}::", SecretArn=FindInMap( self.map_name, self.logical_name, self.map_arn_name, ), ), ) else: raise TypeError( f"secrets.{self.name} - ARN is", type(self.arn), "must be one of", str, Sub, FindInMap, ) return secret
[docs] def add_json_keys(self): """ Add secrets definitions based on JSON secret keys """ if not keyisset(self.json_keys_key, self.definition[self.x_key]): return unfiltered_secrets = self.definition[self.x_key][self.json_keys_key] filtered_secrets = [ dict(y) for y in {tuple(x.items()) for x in unfiltered_secrets} ] old_secrets = deepcopy(self.ecs_secret) secrets_to_map = {} self.ecs_secret = [] for secret_json_key in filtered_secrets: secret_key = secret_json_key["SecretKey"] secret_name = define_env_var_name(secret_json_key) if secret_name not in secrets_to_map: secrets_to_map[secret_name] = self.define_secret( secret_name, secret_key ) else: LOG.warning( f"secrets.{self.name} - Secret VarName {secret_name} already defined. Overriding value" ) secrets_to_map[secret_name] = self.define_secret( secret_name, secret_key ) self.ecs_secret = [ _defined_secret for _defined_secret in secrets_to_map.values() ] if not self.ecs_secret: self.ecs_secret = old_secrets
[docs] def define_names_from_import(self): """ Define the names from docker-compose file content """ if not keyisset(self.map_name_name, self.definition[self.x_key]): raise KeyError( f"Missing {self.map_name_name} when doing non-lookup import for {self.name}" ) name_input = self.definition[self.x_key][self.map_name_name] if name_input.startswith("arn:"): self.aws_name = get_secret_name_from_arn( self.definition[self.x_key][self.map_name_name] ) self.mapping = { self.map_arn_name: name_input, self.map_name_name: self.aws_name, } else: self.aws_name = name_input self.mapping = {self.map_name_name: self.aws_name} self.arn = Sub( f"arn:${{{AWS_PARTITION}}}:secretsmanager:${{{AWS_REGION}}}:${{{AWS_ACCOUNT_ID}}}:" "secret:${SecretName}", SecretName=FindInMap( self.map_name, self.logical_name, self.map_name_name ), ) self.iam_arn = Sub( f"arn:${{{AWS_PARTITION}}}:secretsmanager:${{{AWS_REGION}}}:${{{AWS_ACCOUNT_ID}}}:" "secret:${SecretName}*", SecretName=FindInMap( self.map_name, self.logical_name, self.map_name_name ), ) self.ecs_secret = [EcsSecret(Name=self.name, ValueFrom=self.arn)] if keyisset(self.map_kms_name, self.definition): if not self.definition[self.map_kms_name].startswith( "arn:" ) or not KMS_KEY_ARN_RE.match(self.definition[self.map_kms_name]): LOG.error( f"secrets.{self.name} - When specifying {self.map_kms_name} you must specify the full ARN" ) else: self.mapping[self.map_kms_name] = self.definition[self.map_kms_name] self.kms_key_arn = FindInMap( self.map_name, self.logical_name, self.map_kms_name )
[docs] def define_names_from_lookup(self, session): """ Method to Lookup the secret based on its tags. """ lookup_info = self.definition[self.x_key]["Lookup"] if keyisset("Name", self.definition[self.x_key]): lookup_info["Name"] = self.definition[self.x_key]["Name"] secret_config = lookup_secret_config(self.logical_name, lookup_info, session) self.aws_name = get_secret_name_from_arn(secret_config[self.logical_name]) self.arn = secret_config[self.logical_name] self.iam_arn = secret_config[self.logical_name] if keyisset("KmsKeyId", secret_config) and not secret_config[ "KmsKeyId" ].startswith("alias"): self.kms_key = secret_config["KmsKeyId"] elif keyisset("KmsKeyId", secret_config) and secret_config[ "KmsKeyId" ].startswith("alias"): LOG.warning( f"secrets.{self.name} - The KMS Key retrieved is a KMS Key Alias, not importing." ) self.mapping = { self.map_arn_name: secret_config[self.logical_name], self.map_name_name: secret_config[self.map_name_name], } if self.kms_key: self.mapping[self.map_kms_name] = self.kms_key self.kms_key_arn = FindInMap( self.map_name, self.logical_name, self.map_kms_name ) self.arn = FindInMap(self.map_name, self.logical_name, self.map_arn_name) self.ecs_secret = [EcsSecret(Name=self.name, ValueFrom=self.arn)]