Source code for ecs_composex.rds.rds_stack

# SPDX-License-Identifier: MPL-2.0
# Copyright 2020-2022 John Mille <john@compose-x.io>

"""
Module to handle AWS RDS CFN Templates creation
"""

from __future__ import annotations

from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from ecs_composex.common.settings import ComposeXSettings
    from ecs_composex.mods_manager import XResourceModule
    from ecs_composex.common.cfn_params import Parameter

from compose_x_common.aws.rds import RDS_DB_CLUSTER_ARN_RE, RDS_DB_INSTANCE_ARN_RE
from compose_x_common.compose_x_common import attributes_to_mapping, keyisset
from troposphere import AWS_ACCOUNT_ID, AWS_PARTITION, AWS_REGION, GetAtt, Ref, Sub
from troposphere.rds import DBCluster as CfnDBCluster
from troposphere.rds import DBInstance as CfnDBInstance

from ecs_composex.common.logging import LOG
from ecs_composex.common.stacks import ComposeXStack
from ecs_composex.common.troposphere_tools import build_template
from ecs_composex.compose.x_resources.network_x_resources import DatabaseXResource
from ecs_composex.rds.rds_features import apply_extra_parameters
from ecs_composex.rds.rds_params import (
    DB_CLUSTER_ARN,
    DB_CLUSTER_NAME,
    DB_ENDPOINT_ADDRESS,
    DB_ENDPOINT_PORT,
    DB_INSTANCE_ARN,
    DB_NAME,
    DB_RO_ENDPOINT_ADDRESS,
    DB_SECRET_ARN,
    DB_SG,
)
from ecs_composex.rds.rds_template import generate_rds_templates
from ecs_composex.rds_resources_settings import lookup_rds_resource, lookup_rds_secret
from ecs_composex.vpc.vpc_params import (
    APP_SUBNETS,
    PUBLIC_SUBNETS,
    STORAGE_SUBNETS,
    VPC_ID,
)


[docs]def get_db_instance_config(db, account_id, resource_id): """ :param Rds db: :param account_id: :param resource_id: :return: """ client = db.lookup_session.client("rds") try: db_config_r = client.describe_db_instances(DBInstanceIdentifier=db.arn)[ "DBInstances" ][0] except (client.exceptions.DBInstanceNotFoundFault,) as error: LOG.error(f"{db.module.res_key}.{db.name} - Failed to retrieve configuration") LOG.error(error) raise attributes_mappings = { DB_NAME: "DBName", db.port_param: "Endpoint::Port", db.security_group_param: "VpcSecurityGroups::0::VpcSecurityGroupId", db.db_cluster_arn_parameter: "DBInstanceArn", db.db_cluster_endpoint_param: "Endpoint::Address", } if keyisset("VpcSecurityGroups", db_config_r): db_config_r["VpcSecurityGroups"] = [ sg for sg in db_config_r["VpcSecurityGroups"] if keyisset("Status", sg) and sg["Status"] == "active" ] config = attributes_to_mapping(db_config_r, attributes_mappings) return config
[docs]def get_db_cluster_config(db, account_id, resource_id): """ Creates the DB configuration to use then in Mappings :param Rds db: :param str account_id: :param str resource_id: :return: The config :rtype: dict """ client = db.lookup_session.client("rds") try: db_config_r = client.describe_db_clusters(DBClusterIdentifier=db.arn)[ "DBClusters" ][0] except (client.exceptions.DBClusterNotFoundFault,) as error: LOG.error(f"{db.module.res_key}.{db.name} - Failed to retrieve configuration") LOG.error(error) raise if keyisset("VpcSecurityGroups", db_config_r): db_config_r["VpcSecurityGroups"] = [ sg for sg in db_config_r["VpcSecurityGroups"] if keyisset("Status", sg) and sg["Status"] == "active" ] attributes_mappings = { DB_NAME: "DatabaseName", db.port_param: "Port", db.security_group_param: "VpcSecurityGroups::0::VpcSecurityGroupId", db.db_cluster_arn_parameter: "DBClusterArn", db.db_cluster_endpoint_param: "Endpoint", db.db_cluster_ro_endpoint_param: "ReaderEndpoint", } config = attributes_to_mapping(db_config_r, attributes_mappings) return config
[docs]class Rds(DatabaseXResource): """ Class to represent RDS DB """ subnets_param = STORAGE_SUBNETS def __init__( self, name, definition, module: XResourceModule, settings: ComposeXSettings ): self.db_secret = None self.db_sg = None self.db_subnet_group = None super().__init__(name, definition, module, settings) self.set_override_subnets() self.port_param = DB_ENDPOINT_PORT self.db_secret_arn_parameter = DB_SECRET_ARN self.security_group_param = DB_SG self.db_cluster_arn_parameter = DB_CLUSTER_ARN self.ref_parameter = DB_CLUSTER_NAME self.db_cluster_endpoint_param = DB_ENDPOINT_ADDRESS self.db_cluster_ro_endpoint_param = DB_RO_ENDPOINT_ADDRESS self.support_defaults = True @property def arn_parameter(self) -> Parameter: if isinstance(self.cfn_resource, CfnDBCluster): self.db_cluster_arn_parameter = DB_CLUSTER_ARN else: self.db_cluster_arn_parameter = DB_INSTANCE_ARN return self.db_cluster_arn_parameter @arn_parameter.setter def arn_parameter(self, value): if value is None: pass self.db_cluster_arn_parameter = value
[docs] def init_outputs(self): """ Method to init the RDS Output attributes """ self.output_properties = { DB_CLUSTER_NAME: ( self.logical_name, self.cfn_resource, Ref, None, "DbName", ), self.arn_parameter: ( f"{self.logical_name}{self.arn_parameter.title}", self.cfn_resource, GetAtt, self.arn_parameter.return_value, ), self.port_param: ( f"{self.logical_name}{self.port_param.return_value}", self.cfn_resource, GetAtt, self.port_param.return_value, self.port_param.return_value.replace(r".", ""), ), self.db_secret_arn_parameter: ( self.db_secret.title, self.db_secret, Ref, None, "SecretArn", ), self.security_group_param: ( self.db_sg.title, self.db_sg, GetAtt, self.security_group_param.return_value, "VpcSecurityGroupId", ), self.db_cluster_endpoint_param: ( f"{self.logical_name}{self.db_cluster_endpoint_param.title}", self.cfn_resource, GetAtt, self.db_cluster_endpoint_param.return_value, self.db_cluster_endpoint_param.return_value.replace(".", ""), ), } if isinstance(self.cfn_resource, CfnDBCluster): self.output_properties.update( { self.db_cluster_ro_endpoint_param: ( f"{self.logical_name}{self.db_cluster_ro_endpoint_param.title}", self.cfn_resource, GetAtt, self.db_cluster_ro_endpoint_param.return_value, self.db_cluster_ro_endpoint_param.return_value.replace(".", ""), ) } )
[docs] def lookup_resource( self, arn_re, native_lookup_function, cfn_resource_type, tagging_api_id, subattribute_key=None, use_arn_for_id: bool = False, ): """ Method to self-identify properties :return: """ lookup_rds_resource( self, arn_re, native_lookup_function, cfn_resource_type, tagging_api_id, subattribute_key, )
[docs] def handle_x_dependencies(self, settings, root_stack=None): """ Handles x-rds to other x-resource dependencies and features :param settings: :param root_stack: :return: """ if self.parameters: apply_extra_parameters(settings, self, self.stack)
[docs]class XStack(ComposeXStack): """ Class to handle ECS root stack specific settings """ def __init__( self, title, settings: ComposeXSettings, module: XResourceModule, **kwargs ): if module.new_resources: stack_template = build_template( "Root stack for RDS DBs", [VPC_ID, STORAGE_SUBNETS, APP_SUBNETS, PUBLIC_SUBNETS], ) super().__init__(title, stack_template, **kwargs) generate_rds_templates(self, stack_template, module.new_resources, settings) self.parent_stack = settings.root_stack if not hasattr(self, "DeletionPolicy"): setattr(self, "DeletionPolicy", module.module_deletion_policy) else: self.is_void = True if module.lookup_resources and module.mapping_key not in settings.mappings: settings.mappings[module.mapping_key] = {} for resource in module.lookup_resources: resource.stack = self if keyisset("cluster", resource.lookup): resource.lookup_resource( RDS_DB_CLUSTER_ARN_RE, get_db_cluster_config, CfnDBCluster.resource_type, "rds:cluster", "cluster", ) elif keyisset("db", resource.lookup): resource.lookup_resource( RDS_DB_INSTANCE_ARN_RE, get_db_instance_config, CfnDBInstance.resource_type, "rds:db", "db", ) else: raise KeyError( f"{resource.module.res_key}.{resource.name} - " "You must specify the cluster or instance to lookup" ) if keyisset("secret", resource.lookup): lookup_rds_secret(resource, resource.lookup["secret"]) resource.generate_cfn_mappings_from_lookup_properties() resource.generate_outputs() settings.mappings[module.mapping_key].update( {resource.logical_name: resource.mappings} )