Source code for ecs_composex.secrets.compose_secrets

#  -*- coding: utf-8 -*-
# SPDX-License-Identifier: MPL-2.0
# Copyright 2020-2021 John Mille <john@compose-x.io>

"""
Represent a service from the docker-compose services
"""

import re
from copy import deepcopy

from compose_x_common.compose_x_common import keyisset
from troposphere import AWS_ACCOUNT_ID, AWS_PARTITION, AWS_REGION, FindInMap, Sub
from troposphere.ecs import Secret as EcsSecret

from ecs_composex.common import LOG, NONALPHANUM
from ecs_composex.ecs.ecs_params import EXEC_ROLE_T, TASK_ROLE_T
from ecs_composex.kms.kms_params import KMS_KEY_ARN_RE
from ecs_composex.secrets.secrets_aws import lookup_secret_config
from ecs_composex.secrets.secrets_params import RES_KEY, XRES_KEY


[docs]def get_name_from_arn(secret_arn): secret_re = re.compile( r"(?:^arn:aws(?:-[a-z]+)?:secretsmanager:[\w-]+:[0-9]{12}:secret:)([\S]+)(?:-[A-Za-z0-9]{1,6})$" ) if not secret_re.match(secret_arn): raise ValueError( "The secret ARN is invalid", secret_arn, "No name cound be found from it via", r"(?:^arn:aws(?:-[a-z]+)?:secretsmanager:[\w-]+:[0-9]{12}:secret:)([\S]+)(?:-[A-Za-z0-9]{1,6})$", ) return secret_re.match(secret_arn).groups()[0]
[docs]def match_secrets_services_config(service, s_secret, secrets): """ Function to match the services and secrets :param service: :param s_secret: :param secrets: :return: """ if isinstance(s_secret, str): secret_name = s_secret elif isinstance(s_secret, dict) and keyisset("source", s_secret): secret_name = s_secret["source"] else: raise LookupError("Could not identify the secret source", s_secret) for gl_secret in secrets: if gl_secret.name == secret_name: LOG.info(f"Matched secret {gl_secret.name} with {service.name}") service.secrets.append(gl_secret) gl_secret.services.append(service)
[docs]def to_java_properties(name): """ Replaces `.` with `_` and set all cases to upper :param str name: :return: transformed test :rtype: str """ return name.upper().replace(".", "_")
[docs]def to_title(name): """ Function to title the name :param str name: :return: """ return name.title()
[docs]def to_capitalize(name): """ Function to capitalize/upper all letters and leave the rest empty :param name: :return: """ return name.upper()
[docs]def define_env_var_name(secret_key): """ Function to determine what the VarName key for secret will be :param dict secret_key: Key definition as defined in compose file :return: VarName value :rtype: str """ transforms = [ ("java_properties", to_java_properties), ("title", to_title), ("capitalize", to_capitalize), ] if keyisset("VarName", secret_key): return secret_key["VarName"] elif keyisset("Transform", secret_key) and secret_key["Transform"] in [ t[0] for t in transforms ]: for trans in transforms: if trans[0] == secret_key["Transform"] and trans[1]: return trans[1](secret_key["SecretKey"]) else: return secret_key["SecretKey"]
[docs]class ComposeSecret(object): """ Class to represent a Compose secret. """ x_key = XRES_KEY main_key = "secrets" map_kms_name = "KmsKeyId" map_arn_name = "Arn" map_name_name = "Name" json_keys_key = "JsonKeys" links_key = "LinksTo" map_name = RES_KEY allowed_keys = ["Name", "Lookup"] valid_keys = [json_keys_key, links_key] + allowed_keys def __init__(self, name, definition, settings): """ Method to init Secret for ECS ComposeX :param str name: :param dict definition: :param ecs_composex.common.settings.ComposeXSettings settings: """ self.services = [] if not any(key in definition[self.x_key].keys() for key in self.allowed_keys): raise KeyError( "You must define at least one of", self.allowed_keys, "Got", definition.keys(), ) elif not all(key in self.valid_keys for key in definition[self.x_key].keys()): raise KeyError( "Only valid keys are", self.valid_keys, "Got", definition[self.x_key].keys(), ) self.name = name self.logical_name = NONALPHANUM.sub("", self.name) self.definition = deepcopy(definition) self.links = [EXEC_ROLE_T] self.arn = None self.iam_arn = None self.aws_name = None self.kms_key = None self.kms_key_arn = None self.ecs_secret = [] self.mapping = {} if not keyisset("Lookup", self.definition[self.x_key]): self.define_names_from_import() else: self.define_names_from_lookup(settings.session) self.define_links() self.validate_links() if self.mapping: settings.secrets_mappings.update({self.logical_name: self.mapping}) self.add_json_keys()
[docs] def add_json_keys(self): """ Method to add secrets definitions based on JSON secret keys """ if not keyisset(self.json_keys_key, self.definition[self.x_key]): return required_keys = ["SecretKey"] allowed_keys = ["SecretKey", "VarName", "Transform"] unfiltered_secrets = self.definition[self.x_key][self.json_keys_key] LOG.debug(f"UN-FILTERED SECRETS {unfiltered_secrets}") filtered_secrets = [ dict(y) for y in set(tuple(x.items()) for x in unfiltered_secrets) ] LOG.debug(f"FILTERED SECRETS {filtered_secrets}") for secret_key in filtered_secrets: if not all(key in allowed_keys for key in secret_key): raise KeyError( "For Secrets JSON Key support, you must specify", required_keys, "Got", secret_key.keys(), ) json_key = secret_key["SecretKey"] secret_name = define_env_var_name(secret_key) if isinstance(self.arn, str): self.ecs_secret.append( EcsSecret(Name=secret_name, ValueFrom=f"{self.arn}:{json_key}::") ) elif isinstance(self.arn, Sub): self.ecs_secret.append( EcsSecret( Name=secret_name, ValueFrom=Sub( f"arn:${{{AWS_PARTITION}}}:secretsmanager:${{{AWS_REGION}}}:${{{AWS_ACCOUNT_ID}}}:" f"secret:${{SecretName}}:{json_key}::", SecretName=FindInMap( self.map_name, self.logical_name, self.map_name_name, ), ), ) ) elif isinstance(self.arn, FindInMap): self.ecs_secret.append( EcsSecret( Name=secret_name, ValueFrom=Sub( f"${{SecretArn}}:{json_key}::", SecretArn=FindInMap( self.map_name, self.logical_name, self.map_arn_name, ), ), ) )
[docs] def define_names_from_import(self): """ Method to define the names from docker-compose file content """ if not keyisset(self.map_name_name, self.definition[self.x_key]): raise KeyError( f"Missing {self.map_name_name} when doing non-lookup import for {self.name}" ) name_input = self.definition[self.x_key][self.map_name_name] if name_input.startswith("arn:"): self.aws_name = get_name_from_arn( self.definition[self.x_key][self.map_name_name] ) self.mapping = { self.map_arn_name: name_input, self.map_name_name: self.aws_name, } else: self.aws_name = name_input self.mapping = {self.map_name_name: self.aws_name} self.arn = Sub( f"arn:${{{AWS_PARTITION}}}:secretsmanager:${{{AWS_REGION}}}:${{{AWS_ACCOUNT_ID}}}:" "secret:${SecretName}", SecretName=FindInMap( self.map_name, self.logical_name, self.map_name_name ), ) self.iam_arn = Sub( f"arn:${{{AWS_PARTITION}}}:secretsmanager:${{{AWS_REGION}}}:${{{AWS_ACCOUNT_ID}}}:" "secret:${SecretName}*", SecretName=FindInMap( self.map_name, self.logical_name, self.map_name_name ), ) self.ecs_secret = [EcsSecret(Name=self.name, ValueFrom=self.arn)] if keyisset(self.map_kms_name, self.definition): if not self.definition[self.map_kms_name].startswith( "arn:" ) or not KMS_KEY_ARN_RE.match(self.definition[self.map_kms_name]): LOG.error( f"When specifying {self.map_kms_name} you must specify the full VALID ARN" ) else: self.mapping[self.map_kms_name] = self.definition[self.map_kms_name] self.kms_key_arn = FindInMap( self.map_name, self.logical_name, self.map_kms_name )
[docs] def define_names_from_lookup(self, session): """ Method to Lookup the secret based on its tags. :return: """ lookup_info = self.definition[self.x_key]["Lookup"] if keyisset("Name", self.definition[self.x_key]): lookup_info["Name"] = self.definition[self.x_key]["Name"] secret_config = lookup_secret_config(self.logical_name, lookup_info, session) self.aws_name = get_name_from_arn(secret_config[self.logical_name]) self.arn = secret_config[self.logical_name] self.iam_arn = secret_config[self.logical_name] if keyisset("KmsKeyId", secret_config) and not secret_config[ "KmsKeyId" ].startswith("alias"): self.kms_key = secret_config["KmsKeyId"] elif keyisset("KmsKeyId", secret_config) and secret_config[ "KmsKeyId" ].startswith("alias"): LOG.warning("The KMS Key retrieved is a KMS Key Alias, not importing.") self.mapping = { self.map_arn_name: secret_config[self.logical_name], self.map_name_name: secret_config[self.map_name_name], } if self.kms_key: self.mapping[self.map_kms_name] = self.kms_key self.kms_key_arn = FindInMap( self.map_name, self.logical_name, self.map_kms_name ) self.arn = FindInMap(self.map_name, self.logical_name, self.map_arn_name) self.ecs_secret = [EcsSecret(Name=self.name, ValueFrom=self.arn)]