Source code for ecs_composex.elasticache.elasticache_aws

# SPDX-License-Identifier: MPL-2.0
# Copyright 2020-2022 John Mille <john@compose-x.io>

"""
Module to scan and find the DB and Secret for Lookup of x-rds
"""

import re

from compose_x_common.compose_x_common import keyisset

from ecs_composex.common.aws import (
    define_lookup_role_from_info,
    find_aws_resource_arn_from_tags_api,
)
from ecs_composex.common.logging import LOG
from ecs_composex.elasticache import elasticache_params


[docs]def get_cluster_config(resource, cluster_name, session): client = session.client("elasticache") try: cluster_r = client.describe_cache_clusters( CacheClusterId=cluster_name, ShowCacheClustersNotInReplicationGroups=True, ShowCacheNodeInfo=True, ) cluster = cluster_r["CacheClusters"][0] if cluster["Engine"] == "memcached": resource.port_attr = elasticache_params.CLUSTER_MEMCACHED_PORT return { elasticache_params.CLUSTER_MEMCACHED_ADDRESS.title: cluster[ "ConfigurationEndpoint" ]["Address"], elasticache_params.CLUSTER_MEMCACHED_PORT.title: cluster[ "ConfigurationEndpoint" ]["Port"], elasticache_params.CLUSTER_SG.title: [ cluster["SecurityGroups"][0]["SecurityGroupId"] ], } elif cluster["Engine"] == "redis": if keyisset("ReplicationGroupId", cluster): raise LookupError( "The Cluster identified is part of a replication group." ) resource.port_attr = elasticache_params.CLUSTER_REDIS_PORT return { elasticache_params.CLUSTER_REDIS_PORT.title: cluster["CacheNodes"][0][ "Endpoint" ]["Port"], elasticache_params.CLUSTER_REDIS_ADDRESS.title: cluster["CacheNodes"][ 0 ]["Endpoint"]["Address"], elasticache_params.CLUSTER_SG.title: [ cluster["SecurityGroups"][0]["SecurityGroupId"] ], } except client.exceptions.CacheClusterNotFoundFault: LOG.error(f"Could not find the configurations for cluster {cluster_name}")
[docs]def get_replica_group_config(resource, cluster_name, session): client = session.client("elasticache") try: cluster_r = client.describe_replication_groups(ReplicationGroupId=cluster_name) cluster = cluster_r["ReplicationGroups"][0] node_r = client.describe_cache_clusters( CacheClusterId=cluster["MemberClusters"][0] ) sg_id = node_r["CacheClusters"][0]["SecurityGroups"][0]["SecurityGroupId"] resource.port_attr = elasticache_params.REPLICA_PRIMARY_PORT return { elasticache_params.REPLICA_PRIMARY_ADDRESS.title: cluster["NodeGroups"][0][ "PrimaryEndpoint" ]["Address"], elasticache_params.REPLICA_PRIMARY_PORT.title: cluster["NodeGroups"][0][ "PrimaryEndpoint" ]["Port"], elasticache_params.REPLICA_READ_ENDPOINT_ADDRESSES.title: [ cluster["NodeGroups"][0]["ReaderEndpoint"]["Address"] ], elasticache_params.REPLICA_READ_ENDPOINT_PORTS.title: [ cluster["NodeGroups"][0]["ReaderEndpoint"]["Port"] ], elasticache_params.CLUSTER_SG.title: [sg_id], } except client.exceptions.ReplicationGroupNotFoundFault as error: LOG.error(f"Could not fetch information about {cluster_name}") LOG.error(error) return None
[docs]def return_cluster_config(resource, cluster_arn, session): """ Function to retrieve the DB information we need for services integration :param ecs_composex.elasticache.elasticache_stack.CacheCluster resource: :param cluster_arn: :param session: :type cluster_arn: str :type session: boto3.session.Session :return: the DB details """ if isinstance(cluster_arn, str): cluster_name = re.sub( r"(?:^arn:aws(?:-[a-z]+)?:elasticache:[\w-]+:[0-9]{12}:cluster:)", "", cluster_arn, ) return get_cluster_config(resource, cluster_name, session) elif isinstance(cluster_arn, list): if not re.match( r"(?:^arn:aws(?:-[a-z]+)?:elasticache:[\w-]+:[0-9]{12}:cluster:)([\S]+)(?:-[0-9]+)$", cluster_arn[0], ).groups(): raise ValueError("Could not match the ARN to a specific Replica Group") cluster_name = re.match( r"(?:^arn:aws(?:-[a-z]+)?:elasticache:[\w-]+:[0-9]{12}:cluster:)([\S]+)(?:-[0-9]+)$", cluster_arn[0], ).groups()[0] return get_replica_group_config(resource, cluster_name, session)
[docs]def lookup_cluster_resource(resource, session): """ Function to find the DB in AWS account :param boto3.session.Session session: Boto3 session for clients :return: """ elasticache_types = { "elasticache:cluster": { "regexp": r"(?:^arn:aws(?:-[a-z]+)?:elasticache:[\w-]+:[0-9]{12}:cluster:)([\S]+)$" } } res_type = "elasticache:cluster" lookup_session = define_lookup_role_from_info(resource.lookup, session) cluster_arn = find_aws_resource_arn_from_tags_api( resource.lookup, lookup_session, res_type, types=elasticache_types, allow_multi=True, ) if not cluster_arn: return None cluster_config = return_cluster_config(resource, cluster_arn, lookup_session) LOG.debug(cluster_config) return cluster_config