Source code for ecs_composex.neptune.neptune_stack

# SPDX-License-Identifier: MPL-2.0
# Copyright 2020-2022 John Mille <john@compose-x.io>

"""
AWS DocumentDB entrypoint for ECS ComposeX
"""
from __future__ import annotations

from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from ecs_composex.common.settings import ComposeXSettings
    from ecs_composex.mods_manager import XResourceModule, ModManager

from compose_x_common.aws.neptune import NEPTUNE_DB_CLUSTER_ARN_RE
from compose_x_common.compose_x_common import attributes_to_mapping, keyisset
from troposphere import AWS_ACCOUNT_ID, AWS_PARTITION, AWS_REGION, GetAtt, Ref, Sub
from troposphere.neptune import DBCluster as CfnDBCluster

from ecs_composex.common.logging import LOG
from ecs_composex.common.stacks import ComposeXStack
from ecs_composex.common.troposphere_tools import build_template
from ecs_composex.compose.x_resources.network_x_resources import DatabaseXResource
from ecs_composex.neptune.neptune_params import (
    DB_CLUSTER_RESOURCES_ARN,
    DB_ENDPOINT,
    DB_PORT,
    DB_READ_ENDPOINT,
    DB_RESOURCE_ID,
)
from ecs_composex.rds.rds_params import DB_CLUSTER_ARN, DB_CLUSTER_NAME, DB_SG
from ecs_composex.rds_resources_settings import (
    handle_new_tcp_resource,
    import_dbs,
    lookup_rds_resource,
)
from ecs_composex.resource_settings import link_resource_to_services
from ecs_composex.vpc.vpc_params import STORAGE_SUBNETS, VPC_ID

from .neptune_template import create_neptune_template


[docs]def get_db_cluster_config(db, account_id, resource_id): client = db.lookup_session.client("neptune") try: db_config_r = client.describe_db_clusters( DBClusterIdentifier=db.arn, Filters=[ { "Name": "engine", "Values": [ "neptune", ], }, ], )["DBClusters"] db_cluster = db_config_r[0] except (client.exceptions.DBClusterNotFoundFault,) as error: LOG.error(f"{db.module.res_key}.{db.name} - Failed to retrieve configuration") LOG.error(error) raise if keyisset("VpcSecurityGroups", db_config_r): db_config_r["VpcSecurityGroups"] = [ sg for sg in db_config_r["VpcSecurityGroups"] if keyisset("Status", sg) and sg["Status"] == "active" ] attributes_mappings = { db.port_param: "Port", db.security_group_param: "VpcSecurityGroups::0::VpcSecurityGroupId", db.db_cluster_arn_parameter: DB_CLUSTER_ARN.title, db.db_cluster_ro_endpoint_param: DB_READ_ENDPOINT.return_value, db.db_cluster_endpoint_param: db.db_cluster_endpoint_param.return_value, db.ref_parameter: "DBClusterIdentifier", } config = attributes_to_mapping(db_cluster, attributes_mappings) config[DB_CLUSTER_RESOURCES_ARN] = ( f"{config[DB_CLUSTER_ARN].replace('rds', 'neptune-db', 1)}/*" ) return config
[docs]class NeptuneDBCluster(DatabaseXResource): """ Class to manage Neptune Cluster """ subnets_param = STORAGE_SUBNETS def __init__( self, name, definition, module: XResourceModule, settings: ComposeXSettings ): """ Init method :param str name: :param dict definition: :param ecs_composex.common.settings.ComposeXSettings settings: """ self.db_sg = None self.db_subnets_group = None super().__init__(name, definition, module, settings) self.set_override_subnets() self.security_group_param = DB_SG self.db_cluster_arn_parameter = DB_CLUSTER_ARN self.port_param = DB_PORT self.db_cluster_ro_endpoint_param = DB_READ_ENDPOINT self.db_cluster_endpoint_param = DB_ENDPOINT self.ref_parameter = DB_CLUSTER_NAME
[docs] def init_outputs(self): """ Method to init the DocDB output attributes """ self.output_properties = { DB_CLUSTER_NAME: (self.logical_name, self.cfn_resource, Ref, None), DB_RESOURCE_ID: ( f"{self.logical_name}{DB_RESOURCE_ID.return_value}", self.cfn_resource, GetAtt, DB_RESOURCE_ID.return_value, ), self.db_cluster_arn_parameter: ( f"{self.logical_name}{self.db_cluster_arn_parameter.title}", self.cfn_resource, Sub, f"arn:${{{AWS_PARTITION}}}:rds:${{{AWS_REGION}}}:${{{AWS_ACCOUNT_ID}}}:" f"${{{self.cfn_resource.title}}}", ), DB_CLUSTER_RESOURCES_ARN: ( f"{self.logical_name}{DB_CLUSTER_RESOURCES_ARN.title}", self.cfn_resource, Sub, f"arn:${{{AWS_PARTITION}}}:neptune-db:${{{AWS_REGION}}}:${{{AWS_ACCOUNT_ID}}}:" f"${{{self.cfn_resource.title}.{DB_RESOURCE_ID.return_value}}}", ), DB_ENDPOINT: ( f"{self.logical_name}{DB_ENDPOINT.return_value}", self.cfn_resource, GetAtt, DB_ENDPOINT.return_value, ), DB_READ_ENDPOINT: ( f"{self.logical_name}{DB_READ_ENDPOINT.return_value}", self.cfn_resource, GetAtt, DB_READ_ENDPOINT.return_value, ), self.port_param: ( f"{self.logical_name}{DB_PORT.return_value}", self.cfn_resource, GetAtt, self.port_param.return_value, ), self.security_group_param: ( f"{self.logical_name}{DB_SG.return_value}", self.db_sg, GetAtt, self.security_group_param.return_value, "VpcSecurityGroupId", ), }
[docs] def lookup_resource( self, arn_re, native_lookup_function, cfn_resource_type, tagging_api_id, subattribute_key=None, use_arn_for_id: bool = False, ): """ Method to self-identify properties :return: """ lookup_rds_resource( self, arn_re, native_lookup_function, cfn_resource_type, tagging_api_id, subattribute_key, )
[docs] def to_ecs( self, settings: ComposeXSettings, modules: ModManager, root_stack: ComposeXStack = None, ) -> None: LOG.info(f"{self.module.res_key}.{self.name} - Linking to services") if not self.mappings and self.cfn_resource: handle_new_tcp_resource( self, port_parameter=DB_PORT, sg_parameter=DB_SG, settings=settings ) link_resource_to_services( settings, self, arn_parameter=DB_CLUSTER_RESOURCES_ARN, access_subkeys=["NeptuneDB"], ) link_resource_to_services( settings, self, arn_parameter=self.db_cluster_arn_parameter, access_subkeys=["DBCluster"], ) elif not self.cfn_resource and self.mappings: import_dbs(self, settings) link_resource_to_services( settings, self, arn_parameter=self.db_cluster_arn_parameter, access_subkeys=["DBCluster"], ) link_resource_to_services( settings, self, arn_parameter=DB_CLUSTER_RESOURCES_ARN, access_subkeys=["NeptuneDB"], )
[docs]class XStack(ComposeXStack): """ Class for the Stack of x-neptune """ def __init__( self, title, settings: ComposeXSettings, module: XResourceModule, **kwargs ): if module.lookup_resources: if not keyisset(module.mapping_key, settings.mappings): settings.mappings[module.mapping_key] = {} for resource in module.lookup_resources: resource.lookup_resource( NEPTUNE_DB_CLUSTER_ARN_RE, get_db_cluster_config, CfnDBCluster.resource_type, "rds:cluster", ) resource.generate_cfn_mappings_from_lookup_properties() resource.generate_outputs() settings.mappings[module.mapping_key].update( {resource.logical_name: resource.mappings} ) if module.new_resources: stack_template = build_template( "Root template for Neptune by ComposeX", [VPC_ID, STORAGE_SUBNETS] ) super().__init__(title, stack_template, **kwargs) create_neptune_template( stack_template, module.new_resources, settings, self ) if not hasattr(self, "DeletionPolicy"): setattr(self, "DeletionPolicy", module.module_deletion_policy) else: self.is_void = True for resource in module.resources_list: resource.stack = self