Source code for ecs_composex.compose.compose_secrets.ecs_family_helpers

#  SPDX-License-Identifier: MPL-2.0
#  Copyright 2020-2022 John Mille <john@compose-x.io>

from troposphere.ecs import RepositoryCredentials
from troposphere.iam import PolicyType

import ecs_composex.common.troposphere_tools


[docs]def identify_repo_credentials_secret(settings, task, secret_name): """ Function to identify the secret_arn :param settings: :param ComposeFamily task: :param secret_name: :return: """ for secret in settings.secrets: if secret.name == secret_name: secret_arn = secret.arn if secret_name not in [s.name for s in settings.secrets]: raise KeyError( f"secret {secret_name} was not found in the defined secrets", [s.name for s in settings.secrets], ) if ( secret.kms_key_arn and task.template and "RepositoryCredsKmsKeyAccess" not in task.template.resources ): task.template.add_resource( PolicyType( "RepositoryCredsKmsKeyAccess", PolicyName="RepositoryCredsKmsKeyAccess", PolicyDocument={ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["kms:Decrypt"], "Resource": [secret.kms_key_arn], } ], }, Roles=[task.exec_role.name], ) ) return secret_arn return None
[docs]def set_repository_credentials(family, settings): """ Method to go over each service and identify which ones have credentials to pull the Docker image from a private repository :param ecs_composex.ecs.ecs_family.ComposeFamily family: :param ecs_composex.common.settings.ComposeXSettings settings: :return: """ for service in family.services: if not service.x_repo_credentials: continue if service.x_repo_credentials.startswith("arn:aws"): secret_arn = service.x_repo_credentials elif service.x_repo_credentials.startswith("secrets::"): secret_name = service.x_repo_credentials.split("::")[-1] secret_arn = identify_repo_credentials_secret(settings, family, secret_name) else: raise ValueError( "The secret for private repository must be either an ARN or the name of a secret defined in secrets" ) setattr( service.container_definition, "RepositoryCredentials", RepositoryCredentials(CredentialsParameter=secret_arn), ) policy = PolicyType( "AccessToRepoCredentialsSecret", PolicyName="AccessToRepoCredentialsSecret", PolicyDocument={ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["secretsmanager:GetSecretValue"], "Sid": "AccessToRepoCredentialsSecret", "Resource": [secret_arn], } ], }, Roles=[family.iam_manager.exec_role.name], ) if family.template and policy.title not in family.template.resources: family.template.add_resource(policy)