Source code for ecs_composex.docdb.docdb_ecs

#  -*- coding: utf-8 -*-
# SPDX-License-Identifier: MPL-2.0
# Copyright 2020-2021 John Mille <>

Module to link DocDB cluster to ECS Services.

from compose_x_common.compose_x_common import keyisset

from ecs_composex.common import LOG
from ecs_composex.docdb.docdb_params import (
from ecs_composex.rds.rds_ecs import (
from ecs_composex.tcp_resources_settings import handle_new_tcp_resource

[docs]def create_docdb_cluster_config_mapping(resource, db_config): """ :param resource: :param db_config: :return: """ mapping = { resource.logical_name: { "VpcSecurityGroupIds": [ k["VpcSecurityGroupId"] for k in db_config["VpcSecurityGroups"] if k["Status"] == "active" ], "Port": db_config["Port"], resource.logical_name: db_config["DBClusterIdentifier"], } } if keyisset(DB_SECRET_T, db_config): mapping[resource.logical_name][DB_SECRET_T] = db_config[DB_SECRET_T] return mapping
[docs]def create_lookup_mappings(mappings, lookup_dbs, settings): """ Function to create the DocumentDB mappings to add to services templates :param dict mappings: :param list lookup_dbs: :param ecs_composex.common.settings.ComposeXSettings settings: The settings for ComposeX Execution """ for db in lookup_dbs: validate_rds_lookup(, db.lookup) db_config = lookup_rds_resource(db.lookup, settings.session) if not db_config: LOG.warning( f"No RDS DB Configuration could be defined from provided lookup. Skipping {}" ) return config = create_docdb_cluster_config_mapping(db, db_config) mappings.update(config) db.mappings = db_config
[docs]def docdb_to_ecs(resources, services_stack, res_root_stack, settings): """ Entrypoint function to map new and lookup resources to ECS Services :param list resources: :param ecs_composex.common.stacks.ComposeXStack services_stack: :param ecs_composex.common.stacks.ComposeXStack res_root_stack: :param ecs_composex.common.settings.ComposeXSettings settings: """ new_resources = [ resources[res_name] for res_name in resources if not resources[res_name].lookup ] lookup_resources = [ resources[res_name] for res_name in resources if resources[res_name].lookup ] for new_res in new_resources: handle_new_tcp_resource( new_res, res_root_stack, port_parameter=DOCDB_PORT, sg_parameter=DOCDB_SG, secret_parameter=DOCDB_SECRET, ) for lookup_res in lookup_resources: if keyisset(lookup_res.logical_name, settings.mappings[RES_KEY]): import_dbs( lookup_res, settings.mappings[RES_KEY], mapping_name=MAPPINGS_KEY )