Source code for ecs_composex.ecs.service_networking.ingress_helpers

#  SPDX-License-Identifier: MPL-2.0
#  Copyright 2020-2022 John Mille <john@compose-x.io>

from __future__ import annotations

from typing import TYPE_CHECKING

import ecs_composex.common.troposphere_tools

if TYPE_CHECKING:
    from ecs_composex.ecs.ecs_family import ComposeFamily
    from ecs_composex.compose.compose_services import ComposeService
    from ecs_composex.common.settings import ComposeXSettings
    from ecs_composex.common.stacks import ComposeXStack
    from troposphere.ecs import PortMapping
    from ecs_composex.ecs_ingress.ecs_ingress_stack import XStack as EcsIngressStack

from json import dumps

from compose_x_common.compose_x_common import keyisset, keypresent, set_else_none
from troposphere import AWS_ACCOUNT_ID, GetAtt, NoValue, Ref, Sub
from troposphere.ec2 import SecurityGroupIngress
from troposphere.ecs import (
    ServiceConnectClientAlias,
    ServiceConnectConfiguration,
    ServiceConnectService,
)

from ecs_composex.cloudmap.cloudmap_params import RES_KEY as CLOUDMAP_KEY
from ecs_composex.common.cfn_params import Parameter
from ecs_composex.common.logging import LOG
from ecs_composex.common.troposphere_tools import add_parameters, add_resource
from ecs_composex.ecs.ecs_params import SERVICE_NAME
from ecs_composex.ingress_settings import Ingress
from ecs_composex.resources_import import import_record_properties
from ecs_composex.vpc.vpc_params import SG_ID_TYPE


[docs]def handle_ext_sources(existing_sources: list, new_sources: list) -> None: """ Adds up external sources if they are not defined yet :param existing_sources: :param new_sources: """ set_ipv4_sources = [ s[Ingress.ipv4_key] for s in existing_sources if keyisset(Ingress.ipv4_key, s) ] for new_s in new_sources: if new_s[Ingress.ipv4_key] not in set_ipv4_sources: existing_sources.append(new_s)
[docs]def handle_aws_sources(existing_sources: list, new_sources: list) -> None: """ Function to handle merge of aws sources between two services for one family :param existing_sources: :param new_sources: :return: """ set_ids = [s["Id"] for s in existing_sources if keyisset("Id", s)] for new_s in new_sources: if keyisset("Id", new_s) and new_s["Id"] not in set_ids: existing_sources.append(new_s)
[docs]def handle_services(existing_sources: list, new_sources: list) -> None: """ Function to merge source services definitions :param list existing_sources: :param list new_sources: :return: """ set_ids = [s["Name"] for s in existing_sources if keyisset("Name", s)] for new_s in new_sources: if new_s["Name"] not in set_ids: existing_sources.append(new_s)
[docs]def handle_ingress_rules(source_config: dict, ingress_config: dict) -> None: valid_keys = [ ("Myself", bool, None), (Ingress.ext_sources_key, list, handle_ext_sources), (Ingress.aws_sources_key, list, handle_aws_sources), (Ingress.services_key, list, handle_services), ] for key in valid_keys: if key[1] is bool: ingress_config[key[0]] = set_else_none( key[0], source_config, alt_value=False, eval_bool=True ) elif keyisset(key[0], source_config) and key[2]: key[2](ingress_config[key[0]], source_config[key[0]])
[docs]def merge_family_network_setting( family, key: str, definition: dict, network: dict, network_config: dict ) -> None: """ Merges a network setting (key) and its definition (definition) with new definition (network) into network_config If the key is x-cloudmap, and is unset, set to value. If another service of the family comes in, comes second. :param ecs_composex.ecs.ecs_family.ComposeFamily family: :param str key: :param dict definition: :param dict network: :param dict network_config: :return: """ if keyisset(key, network) and key == Ingress.master_key: handle_ingress_rules(network[key], network_config[key]) elif keyisset(key, network) and key == CLOUDMAP_KEY: if definition: LOG.warning( family.name, f"x-network.{CLOUDMAP_KEY}", "is already set to", definition, ) else: network_config[CLOUDMAP_KEY] = network[CLOUDMAP_KEY]
[docs]def merge_family_services_networking(family: ComposeFamily) -> dict: """ Merge the different services network configuration definitions :param ecs_composex.ecs.ecs_family.ComposeFamily family: :return: The family network definition :rtype: dict """ network_config = { Ingress.master_key: { "Myself": False, Ingress.ext_sources_key: [], Ingress.aws_sources_key: [], Ingress.services_key: [], }, CLOUDMAP_KEY: {}, } x_network_ingress = [s.x_network for s in family.ordered_services if s.x_network] for network in x_network_ingress: for key, definition in network_config.items(): merge_family_network_setting( family, key, definition, network, network_config ) LOG.debug(family.name) LOG.debug(dumps(network_config, indent=2)) return network_config
[docs]def set_compose_services_ingress( dst_family: ComposeFamily, families_sg_stack: EcsIngressStack, settings: ComposeXSettings, ) -> None: """ Function to crate SG Ingress between two families / services. Presently, the ingress rules are set after all services have been created """ target_family_services: list[ComposeService] = [] for _target_service_def in dst_family.service_networking.ingress.services: service_name = _target_service_def["Name"] for _service in settings.services: if service_name != _service.name: continue if _service.family == dst_family: continue target_family_services.append(_service) add_service_to_service_ingress_rules( dst_family, target_family_services, families_sg_stack )
[docs]def add_service_to_service_ingress_rules( dst_family: ComposeFamily, target_family_services: list[ComposeService], families_sg_stack: EcsIngressStack, ): """ For each identified service that wants to access the `dst_family` services For each port of the `dst_family` Create an SG Ingress rule that allows service-to-service communication """ for _service in target_family_services: if families_sg_stack.title not in _service.family.stack.DependsOn: _service.family.stack.DependsOn.append(families_sg_stack.title) for _service_port_def in dst_family.service_networking.ports: target_port = set_else_none( "target", _service_port_def, set_else_none("published", _service_port_def, None), ) if target_port is None: raise ValueError( "Wrong port definition value for security group ingress", _service_port_def, ) common_args = { "FromPort": target_port, "ToPort": target_port, "IpProtocol": _service_port_def["protocol"], "SourceSecurityGroupOwnerId": Ref(AWS_ACCOUNT_ID), "Description": Sub( f"From ${_service.family.name} to {dst_family.name} " f"on port {target_port}/{_service_port_def['protocol']}" ), } ingress_title: str = ( f"From{_service.family.logical_name}To{dst_family.logical_name}" f"On{target_port}{_service_port_def['protocol'].title()}" ) add_resource( families_sg_stack.stack_template, SecurityGroupIngress( ingress_title, SourceSecurityGroupId=GetAtt( _service.family.service_networking.security_group.cfn_resource, "GroupId", ), GroupId=GetAtt( dst_family.service_networking.security_group.cfn_resource, "GroupId", ), **common_args, ), )
[docs]def handle_str_cloudmap_config( family: ComposeFamily, family_mappings: dict, cloudmap_config: str, ports: list ) -> None: """ Handle cloudmap config when config is set as str :param ecs_composex.ecs.ecs_family.ComposeFamily family: :param dict family_mappings: :param str cloudmap_config: :param list ports: """ if cloudmap_config not in family_mappings.keys(): family_mappings[cloudmap_config] = { "Port": ports[0], "Name": family.family_hostname, } else: LOG.warning( f"{family.name}.x-network.x-cloudmap - {cloudmap_config} is set multiple times. " f"Preserving {family_mappings[cloudmap_config]}" )
[docs]def handle_dict_cloudmap_config( family: ComposeFamily, family_mappings: dict, cloudmap_config: dict, ports: list ) -> None: """ Handles cloudmap config settings when set as a mapping/dict :param ecs_composex.ecs.ecs_family.ComposeFamily family: :param dict family_mappings: :param dict cloudmap_config: :param list ports: """ for map_name, config in cloudmap_config.items(): if map_name in family_mappings.keys(): LOG.warning( f"{family.name}.x-network.x-cloudmap - {cloudmap_config} is set multiple times. " f"Preserving {family_mappings[map_name]}" ) else: if keyisset("Port", config): for port in ports: if port["target"] == config["Port"]: family_mappings[map_name] = { "Port": port, "Name": set_else_none( "Name", config, family.family_hostname ), } break else: family_mappings[map_name] = { "Port": ports[0], "Name": set_else_none("Name", config, family.family_hostname), }
[docs]def merge_cloudmap_settings(family: ComposeFamily, ports: list) -> dict: """ Function to merge the x_cloudmap from the service :param ecs_composex.ecs.ecs_family.ComposeFamily family: :param list[dict] ports: :return: The cloudmap config for the given family :rtype: dict """ cloudmap_configs = [ svc.x_cloudmap for svc in family.ordered_services if svc.x_cloudmap ] if not cloudmap_configs or not ports: return {} family_mappings = {} for cloudmap_config in cloudmap_configs: if isinstance(cloudmap_config, str): handle_str_cloudmap_config(family, family_mappings, cloudmap_config, ports) elif isinstance(cloudmap_config, dict): handle_dict_cloudmap_config(family, family_mappings, cloudmap_config, ports) return family_mappings
[docs]def find_namespace( family: ComposeFamily, namespace_id: str, settings: ComposeXSettings ): """Finds the x-cloudmap: namespace and returns the identifier to use for it""" x_resource_attribute: str = f"x-cloudmap::{namespace_id}::Arn" namespace, parameter = settings.get_resource_attribute(x_resource_attribute) return namespace.get_resource_attribute_value(parameter, family)[0]
[docs]def find_port_mapping(port_name, family: ComposeFamily) -> PortMapping | None: """ Goes over all the services/containers of the task definition and over each PortMapping of the container. If the port name matches, we have found the PortMapping """ for service in family.ordered_services: port_mappings = getattr(service.container_definition, "PortMappings", []) if not port_mappings: continue for _port_mapping in port_mappings: if _port_mapping.Name == port_name: return _port_mapping return
[docs]def set_ecs_connect_from_macro( family: ComposeFamily, service: ComposeService, macro: dict, settings: ComposeXSettings, ) -> ServiceConnectConfiguration: """ Based on the MacroParameters, creates the ServiceConnectConfiguration object. Configuration is in the `macro` parameter """ LOG.info(f"{family.name}.{service.name} - Setting up ecs-connect settings") service_aliases: list[ServiceConnectService] = [] props: dict = { "Enabled": True, "Namespace": find_namespace(family, macro["x-cloudmap"], settings), "Services": service_aliases, } if not keyisset("service_ports", macro): return ServiceConnectConfiguration(**props) for port_name, connect_config in macro["service_ports"].items(): for the_port in family.service_networking.ports: if keyisset("name", the_port) and the_port["name"] == port_name: break else: raise AttributeError( f"No port called {port_name} in family {family.name}", [_port["name"] for _port in family.service_networking.ports], ) dns_name = set_else_none("DnsName", connect_config, None) app_protocol = set_else_none("appProtocol", connect_config, NoValue) valid_protocols: list[str] = ["http", "http2", "grpc"] if ( not isinstance(app_protocol, str) and app_protocol not in valid_protocols ) and app_protocol is not NoValue: raise ValueError( "appProtocol must be one of", valid_protocols, "got", app_protocol ) if "appProtocol" not in the_port: the_port["appProtocol"] = app_protocol port_mapping: PortMapping = find_port_mapping(the_port["name"], family) setattr(port_mapping, "AppProtocol", app_protocol) client_aliases = NoValue if dns_name: client_aliases = [ ServiceConnectClientAlias(DnsName=dns_name, Port=the_port["target"]) ] services_props: dict = { "DiscoveryName": set_else_none( "CloudMapServiceName", connect_config, family.name ), "PortName": port_name, "Timeout": set_else_none("Timeout", connect_config, NoValue), "IngressPortOverride": set_else_none( "IngressPortOverride", connect_config, NoValue ), "ClientAliases": client_aliases, } config: ServiceConnectService = ServiceConnectService(**services_props) service_aliases.append(config) return ServiceConnectConfiguration(**props)
[docs]def process_ecs_connect_settings( family: ComposeFamily, service: ComposeService, settings: ComposeXSettings ) -> ServiceConnectConfiguration | Ref: """Determines whether to create the ECS Service connect from the Properties or MacroParameters""" if keyisset("Properties", service.x_ecs_connect): props = import_record_properties( service.x_ecs_connect["Properties"], ServiceConnectConfiguration ) connect_props = ServiceConnectConfiguration(**props) elif keyisset("MacroParameters", service.x_ecs_connect): connect_props = set_ecs_connect_from_macro( family, service, service.x_ecs_connect["MacroParameters"], settings ) else: raise KeyError( f"{family.name} - x-network.x-ecs_connect is not set correctly. " "One of Properties or MacroParameters is required" ) return connect_props
[docs]def import_set_ecs_connect_settings( family: ComposeFamily, settings: ComposeXSettings ) -> ServiceConnectConfiguration | None: if not family.service_networking.ports: LOG.warning(f"services.{family.name} - No ports defined: ignoring ECS Connect.") return x_ecs_configs: list[ComposeService] = [ service for service in family.ordered_services if service.x_ecs_connect ] if not x_ecs_configs: return None if len(x_ecs_configs) > 1: raise ValueError( f"{family.name} - x-network.x-ecs_connect can only be set once for all the services of the family." ) return process_ecs_connect_settings(family, x_ecs_configs[0], settings)