# SPDX-License-Identifier: MPL-2.0
# Copyright 2020-2022 John Mille <john@compose-x.io>
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ecs_composex.elbv2 import Elbv2
from ecs_composex.common.settings import ComposeXSettings
from ecs_composex.common.stacks import ComposeXStack
from ecs_composex.ecs.ecs_family import ComposeFamily
from ecs_composex.compose.compose_services import ComposeService
import re
from compose_x_common.compose_x_common import keyisset, set_else_none
from troposphere import AWS_NO_VALUE, GetAtt, Output, Ref, Sub
from troposphere.ecs import LoadBalancer as EcsLb
from troposphere.elasticloadbalancingv2 import (
Matcher,
TargetGroup,
TargetGroupAttribute,
)
from ecs_composex.common import NONALPHANUM
from ecs_composex.common.cfn_params import Parameter
from ecs_composex.common.logging import LOG
from ecs_composex.common.troposphere_tools import (
add_outputs,
add_parameters,
add_resource,
)
from ecs_composex.ecs.ecs_params import ELB_GRACE_PERIOD
from ecs_composex.elbv2.elbv2_params import (
LB_SG_ID,
TGT_FULL_NAME,
TGT_GROUP_ARN,
TGT_GROUP_NAME,
)
from ecs_composex.vpc.vpc_params import VPC_ID
[docs]class MergedTargetGroup(TargetGroup):
"""Class for TargetGroup merged among more than one service"""
def __init__(
self,
name: str,
definition: dict,
elbv2: Elbv2,
stack: ComposeXStack,
port: int,
**kwargs,
):
self.name = name
self._definition = definition
self.families: list[ComposeFamily] = []
self.stack: ComposeXStack = stack
self.outputs = []
self.elbv2: Elbv2 = elbv2
self.output_properties = {}
self.attributes_outputs = {}
super().__init__(NONALPHANUM.sub("", name), **kwargs)
@property
def definition(self) -> dict:
return self._definition
[docs] def init_outputs(self):
self.output_properties = {
TGT_GROUP_ARN: (self.title, self, Ref, None),
TGT_GROUP_NAME: (
f"{self.title}{TGT_GROUP_NAME.return_value}",
self,
GetAtt,
TGT_GROUP_NAME.return_value,
None,
),
TGT_FULL_NAME: (
f"{self.title}{TGT_FULL_NAME.return_value}",
self,
GetAtt,
TGT_FULL_NAME.return_value,
None,
),
}
[docs] def generate_outputs(self):
for (
attribute_parameter,
output_definition,
) in self.output_properties.items():
output_name = f"{self.title}{attribute_parameter.title}"
value = self.set_new_resource_outputs(output_definition)
self.attributes_outputs[attribute_parameter] = {
"Name": output_name,
"Output": Output(output_name, Value=value),
"ImportParameter": Parameter(
output_name,
return_value=attribute_parameter.return_value,
Type=attribute_parameter.Type,
),
"ImportValue": GetAtt(
self.stack,
f"Outputs.{output_name}",
),
"Original": attribute_parameter,
}
for attr in self.attributes_outputs.values():
if keyisset("Output", attr):
self.outputs.append(attr["Output"])
[docs] def set_new_resource_outputs(self, output_definition):
"""
Method to define the outputs for the resource when new
"""
if output_definition[2] is Ref:
value = Ref(output_definition[1])
elif output_definition[2] is GetAtt:
value = GetAtt(output_definition[1], output_definition[3])
elif output_definition[2] is Sub:
value = Sub(output_definition[3])
else:
raise TypeError(
f"3rd argument for {output_definition[0]} must be one of",
(Ref, GetAtt, Sub),
"Got",
output_definition[2],
)
return value
[docs] def associate_families(self, settings: ComposeXSettings):
for _family in self.definition["Services"]:
_family_name, _service_name = _family["Name"].split(r":")
for family in settings.families.values():
if family.name == _family_name:
break
else:
raise KeyError(
f"{self.elbv2.module.res_key}.{self.elbv2.name} - TargetGroup {self.name} - Service Family {_family_name} is not set in services"
)
for _f_service in family.services:
if _f_service.name == _service_name:
break
else:
raise KeyError(
f"{self.elbv2.module.res_key}.{self.elbv2.name} - TargetGroup {self.name} - Family {_family_name} does not have a container named {_service_name}"
)
if self not in family.target_groups:
family.target_groups.append(self)
tgt_parameter = self.attributes_outputs[TGT_GROUP_ARN]["ImportParameter"]
add_parameters(family.template, [tgt_parameter])
family.stack.Parameters.update(
{
tgt_parameter.title: self.attributes_outputs[TGT_GROUP_ARN][
"ImportValue"
],
}
)
service_lb = EcsLb(
ContainerPort=self.Port,
ContainerName=_f_service.name,
TargetGroupArn=Ref(tgt_parameter),
)
family.ecs_service.lbs.append(service_lb)
add_parameters(family.template, [ELB_GRACE_PERIOD])
family.ecs_service.ecs_service.HealthCheckGracePeriodSeconds = Ref(
ELB_GRACE_PERIOD
)
handle_sg_lb_ingress_to_service(self.elbv2, family, self.elbv2.stack)
[docs]class ComposeTargetGroup(TargetGroup):
"""
Class to manage Target Groups
"""
def __init__(
self,
title: str,
elbv2: Elbv2,
family: ComposeFamily,
service: ComposeService,
stack: ComposeXStack,
port: int,
**kwargs,
):
self.family: ComposeFamily = family
self.service: ComposeService = service
self.stack: ComposeXStack = stack
self.port: int = port
self.outputs = []
self.elbv2: Elbv2 = elbv2
self.output_properties = {}
self.attributes_outputs = {}
super().__init__(title, **kwargs)
[docs] def init_outputs(self):
self.output_properties = {
TGT_GROUP_ARN: (self.title, self, Ref, None),
TGT_GROUP_NAME: (
f"{self.title}{TGT_GROUP_NAME.return_value}",
self,
GetAtt,
TGT_GROUP_NAME.return_value,
None,
),
TGT_FULL_NAME: (
f"{self.title}{TGT_FULL_NAME.return_value}",
self,
GetAtt,
TGT_FULL_NAME.return_value,
None,
),
}
[docs] def generate_outputs(self):
for (
attribute_parameter,
output_definition,
) in self.output_properties.items():
output_name = f"{self.title}{attribute_parameter.title}"
value = self.set_new_resource_outputs(output_definition)
self.attributes_outputs[attribute_parameter] = {
"Name": output_name,
"Output": Output(output_name, Value=value),
"ImportParameter": Parameter(
output_name,
return_value=attribute_parameter.return_value,
Type=attribute_parameter.Type,
),
"ImportValue": GetAtt(
self.stack,
f"Outputs.{output_name}",
),
"Original": attribute_parameter,
}
for attr in self.attributes_outputs.values():
if keyisset("Output", attr):
self.outputs.append(attr["Output"])
[docs] def set_new_resource_outputs(self, output_definition):
"""
Method to define the outputs for the resource when new
"""
if output_definition[2] is Ref:
value = Ref(output_definition[1])
elif output_definition[2] is GetAtt:
value = GetAtt(output_definition[1], output_definition[3])
elif output_definition[2] is Sub:
value = Sub(output_definition[3])
else:
raise TypeError(
f"3rd argument for {output_definition[0]} must be one of",
(Ref, GetAtt, Sub),
"Got",
output_definition[2],
)
return value
[docs]def handle_ping_settings(props, ping_raw):
"""
Function to setup the "ping" settings
:param dict props:
:param str ping_raw:
:return:
"""
ping_re = re.compile(r"^([\d]|10):([\d]|10):([\d]{1,3}):([\d]{1,3})$")
groups = ping_re.match(ping_raw).groups()
ping_mapping = (
("HealthyThresholdCount", (2, 10)),
("UnhealthyThresholdCount", (2, 10)),
("HealthCheckIntervalSeconds", (5, 300)),
("HealthCheckTimeoutSeconds", (2, 120)),
)
for count, value in enumerate(groups):
if not min(ping_mapping[count][1]) <= int(value) <= max(ping_mapping[count][1]):
LOG.error(
f"Value for {ping_mapping[count][0]} is not valid. Must be in range of {ping_mapping[count][1]}"
)
props[ping_mapping[count][0]] = int(value)
[docs]def handle_path_settings(props, path_raw):
"""
Function to set the path and codes properties
:param dict props:
:param str path_raw:
:return:
"""
path_re = re.compile(
r"(/[\S][^:]+.$)|(/[\S]+)(?::)((?:[\d]{1,4},?){1,}.$)|((?:[\d]{1,4},?){1,}.$)"
)
groups = path_re.search(path_raw).groups()
if not groups:
LOG.debug("No PATH or ReturnCodes set.")
return
path = groups[0] or groups[1]
codes = groups[2] or groups[3]
if path:
props["HealthCheckPath"] = path
if codes:
props["Matcher"] = Matcher(HttpCode=codes)
if props["HealthCheckProtocol"] not in ["HTTP", "HTTPS"] and codes:
raise ValueError(
groups,
"Protocol and return codes are only valid for HTTP and HTTPS HealthCheck",
)
[docs]def set_healthcheck_definition(
props, target_definition, healtheck_keyword: str = "healthcheck"
):
"""
:param dict props:
:param dict target_definition:
:return:
"""
healthcheck_props = {
"HealthCheckEnabled": Ref(AWS_NO_VALUE),
"HealthCheckIntervalSeconds": Ref(AWS_NO_VALUE),
"HealthCheckPath": Ref(AWS_NO_VALUE),
"HealthCheckPort": Ref(AWS_NO_VALUE),
"HealthCheckProtocol": Ref(AWS_NO_VALUE),
"HealthCheckTimeoutSeconds": Ref(AWS_NO_VALUE),
"HealthyThresholdCount": Ref(AWS_NO_VALUE),
}
required_mapping = (
"HealthCheckPort",
"HealthCheckProtocol",
)
required_rex = re.compile(r"^([\d]{2,5}):(HTTPS|HTTP|TCP_UDP|TCP|TLS|UDP)$")
healthcheck_reg = re.compile(
r"(^(?:[\d]{2,5}):(?:HTTPS|HTTP|TCP_UDP|TCP|TLS|UDP)):?"
r"((?:[\d]{1}|10):(?:[\d]{1}|10):[\d]{1,3}:[\d]{1,3})?:"
r"?((?:/[\S][^:]+.$)|(?:/[\S]+)(?::)(?:(?:[\d]{1,4},?){1,}.$)|(?:(?:[\d]{1,4},?){1,}.$))?"
)
healthcheck_definition = set_else_none(healtheck_keyword, target_definition)
if isinstance(healthcheck_definition, str):
groups = healthcheck_reg.search(healthcheck_definition).groups()
if not groups[0]:
raise ValueError(
f"You need to define at least the Protocol and port for {healtheck_keyword}"
)
for count, value in enumerate(required_rex.match(groups[0]).groups()):
healthcheck_props[required_mapping[count]] = value
if groups[1]:
handle_ping_settings(healthcheck_props, groups[1])
if groups[2]:
try:
handle_path_settings(healthcheck_props, groups[2])
except ValueError:
LOG.error(target_definition["name"], target_definition["healthcheck"])
raise
elif isinstance(healthcheck_definition, dict):
healthcheck_props.update(healthcheck_definition)
if keyisset("Matcher", healthcheck_definition):
healthcheck_props["Matcher"] = Matcher(**healthcheck_definition["Matcher"])
else:
raise TypeError(
healthcheck_definition,
type(healthcheck_definition),
"must be one of",
(str, dict),
)
props.update(healthcheck_props)
[docs]def validate_props_and_service_definition(props, service):
"""
Function to validate that the defined settings are valid according to the service definition.
:param props:
:param ecs_composex.common.compose_services.ComposeService service:
:return:
"""
valid_tcp = ["HTTP", "HTTPS", "TLS", "TCP_UDP", "TCP"]
valid_udp = ["UDP", "TCP_UDP"]
if not props["Port"] in [p["target"] for p in service.ports]:
raise ValueError(
f"Defined TargetGroup port {props['Port']} is not defined for {service.name}."
" Valid ports are",
[
_port["published"]
for _port in service.ports
if keyisset("published", _port)
],
)
chosen_port = [p for p in service.ports if p["target"] == props["Port"]]
if (chosen_port[0]["protocol"] == "tcp" and props["Protocol"] not in valid_tcp) or (
chosen_port[0]["protocol"] == "udp" and props["Protocol"] not in valid_udp
):
raise ValueError(
f"The protocol defined for TargetGroup {props['Protocol']} "
f"does not match the service protocol {chosen_port[0]['protocol']}"
)
[docs]def handle_sg_lb_ingress_to_service(resource, family, resources_stack):
"""
Function to add ingress from the LB to Target if using ALB
:param resource:
:param ecs_composex.ecs.ecs_family.ComposeFamily family:
:param resources_stack:
:return:
"""
if resource.is_nlb():
return
if resource.cfn_resource and not resource.attributes_outputs:
resource.init_outputs()
resource.generate_outputs()
lb_sg_param = resource.attributes_outputs[LB_SG_ID]["ImportParameter"]
add_parameters(family.template, [lb_sg_param])
family.service_networking.add_lb_ingress(
lb_name=resource.logical_name, lb_sg_ref=Ref(lb_sg_param)
)
family.stack.Parameters.update(
{lb_sg_param.title: resource.attributes_outputs[LB_SG_ID]["ImportValue"]}
)
if resources_stack.title not in family.stack.DependsOn:
family.stack.DependsOn.append(resources_stack.title)
[docs]def validate_target_group_attributes(target_attributes, validation, lb_type):
"""
Function to ensure that each attribute set is compatible with elbv2.type == application
:param list[TargetGroupAttribute] target_attributes:
:param dict validation:
:param str lb_type:
:raises: ValueError
"""
for attr in target_attributes:
if attr.Key not in validation.keys():
raise ValueError(
f"Attribute {attr.Key} is not compatible with {lb_type}. Valid ones",
validation.keys(),
)
evaluation = validation[attr.Key]
if not evaluation(attr.Value):
raise ValueError(f"{attr.Key} value {attr.Value} is not valid.")
[docs]def import_target_group_attributes(props, target_def, elbv2):
attributes_key = "TargetGroupAttributes"
if not keyisset(attributes_key, target_def):
props[attributes_key] = [
TargetGroupAttribute(Key="deregistration_delay.timeout_seconds", Value="60")
]
else:
if isinstance(target_def[attributes_key], list):
props[attributes_key] = [
TargetGroupAttribute(Key=attr["Key"], Value=str(attr["Value"]))
for attr in target_def[attributes_key]
]
elif isinstance(target_def[attributes_key], dict):
props[attributes_key] = [
TargetGroupAttribute(Key=key, Value=str(value))
for key, value in target_def[attributes_key].items()
]
if not keyisset(attributes_key, props):
props[attributes_key] = [
TargetGroupAttribute(Key="deregistration_delay.timeout_seconds", Value="60")
]
return
if "deregistration_delay.timeout_seconds" not in [
attr.Key for attr in props[attributes_key]
]:
props[attributes_key].append(
TargetGroupAttribute(Key="deregistration_delay.timeout_seconds", Value="60")
)
nlb_valid = {
"deregistration_delay.connection_termination.enabled": lambda x: x
in ("true", "false"),
"preserve_client_ip.enabled": lambda x: x in ("true", "false"),
"proxy_protocol_v2.enabled": lambda x: x in ("true", "false"),
"stickiness.type": lambda x: x == "source_ip",
"deregistration_delay.timeout_seconds": lambda x: 0 <= int(x) <= 3600,
"stickiness.enabled": lambda x: x in ("true", "false"),
}
alb_valid = {
"stickiness.enabled": lambda x: x in ("true", "false"),
"stickiness.type": lambda x: x in ("lb_cookie", "app_cookie"),
"stickiness.app_cookie.cookie_name": lambda x: isinstance(x, str)
and not re.match(r"^AWSALB.*$|^AWSALBAPP.*|^AWSALBTG.*$", x),
"stickiness.app_cookie.duration_seconds": lambda x: 1 <= int(x) <= 604800,
"stickiness.lb_cookie.duration_seconds": lambda x: 1 <= int(x) <= 604800,
"deregistration_delay.timeout_seconds": lambda x: 0 <= int(x) <= 3600,
"load_balancing.algorithm.type": lambda x: x
in ("round_robin", "least_outstanding_requests"),
"slow_start.duration_seconds": lambda x: 30 <= int(x) <= 900,
}
# pragma: ignore use-case for now "lambda.multi_value_headers.enabled": lambda x: x in ("true", "false"),
if elbv2.lb_type == "application":
validate_target_group_attributes(
props[attributes_key], alb_valid, elbv2.lb_type
)
if elbv2.lb_type == "network":
validate_target_group_attributes(
props[attributes_key], nlb_valid, elbv2.lb_type
)
[docs]def define_service_target_group(
resource: Elbv2,
family: ComposeFamily,
service: ComposeService,
resources_root_stack: ComposeXStack,
target_definition: dict,
) -> ComposeTargetGroup:
"""
Function to create the elbv2 target group
"""
props = {}
set_healthcheck_definition(props, target_definition)
props["Port"] = target_definition["port"]
props["Protocol"] = (
props["HealthCheckProtocol"]
if not keyisset("protocol", target_definition)
else target_definition["protocol"]
)
props["TargetType"] = "ip"
import_target_group_attributes(props, target_definition, resource)
validate_props_and_service_definition(props, service)
target_group_name = f"Tgt{resource.logical_name}{family.logical_name}{service.logical_name}{props['Port']}"
target_group = ComposeTargetGroup(
target_group_name,
elbv2=resource,
family=family,
service=service,
stack=resource.stack,
port=int(target_definition["port"]),
VpcId=Ref(VPC_ID),
**props,
)
if target_group.title not in resources_root_stack.stack_template.resources:
resources_root_stack.stack_template.add_resource(target_group)
else:
target_group = resources_root_stack.stack_template.resources[target_group.title]
target_group.init_outputs()
target_group.generate_outputs()
add_outputs(resources_root_stack.stack_template, target_group.outputs)
if target_group not in family.target_groups:
family.target_groups.append(target_group)
tgt_parameter = target_group.attributes_outputs[TGT_GROUP_ARN]["ImportParameter"]
add_parameters(family.template, [tgt_parameter])
family.stack.Parameters.update(
{
tgt_parameter.title: target_group.attributes_outputs[TGT_GROUP_ARN][
"ImportValue"
],
}
)
service_lb = EcsLb(
ContainerPort=props["Port"],
ContainerName=service.name,
TargetGroupArn=Ref(tgt_parameter),
)
family.ecs_service.lbs.append(service_lb)
add_parameters(family.template, [ELB_GRACE_PERIOD])
family.ecs_service.ecs_service.HealthCheckGracePeriodSeconds = Ref(ELB_GRACE_PERIOD)
handle_sg_lb_ingress_to_service(resource, family, resources_root_stack)
return target_group
[docs]def define_service_target_group_definition(
resource: Elbv2,
family: ComposeFamily,
service: ComposeService,
target_def: dict,
resources_root_stack: ComposeXStack,
) -> ComposeTargetGroup:
"""
Function to create the new service TGT Group for a given combination of family, service and port.
"""
if resource.logical_name not in family.stack.DependsOn:
family.stack.DependsOn.append(resources_root_stack.title)
LOG.info(
f"{resource.module.res_key}.{resource.name} - Adding {family.logical_name} {service.name}"
)
return define_service_target_group(
resource,
family,
service,
resources_root_stack,
target_def,
)
[docs]def handle_services_association(
load_balancer: Elbv2, res_root_stack: ComposeXStack, settings: ComposeXSettings
) -> None:
"""
Function to handle association of listeners and targets to the LB
"""
template = res_root_stack.stack_template
load_balancer.set_listeners(template)
load_balancer.associate_to_template(template)
add_outputs(template, load_balancer.outputs)
identified = []
for target in load_balancer.families_targets:
family: ComposeFamily = target[0]
print("TARGET?", target)
if target[1].launch_type == "EXTERNAL":
LOG.error(
f"x-elbv2.{load_balancer.name} - Target family {family.name} uses EXTERNAL launch type. Ignoring"
)
continue
tgt_group = define_service_target_group_definition(
load_balancer, family, target[1], target[2], res_root_stack
)
for service_name, service in load_balancer.services.items():
target_name = f"{family.name}:{target[1].name}"
if target_name not in service_name:
continue
if target_name == service_name and tgt_group.Port == int(service["port"]):
service["target_arn"] = Ref(tgt_group)
identified.append(True)
break
if not identified:
LOG.error(
f"{load_balancer.module.res_key}.{load_balancer.name} - No services found as targets. Skipping association"
)
return
for listener in load_balancer.new_listeners:
listener.map_lb_target_groups_service_to_listener_targets(load_balancer)
for listener_port, listener in load_balancer.lookup_listeners.items():
listener.map_lb_target_groups_service_to_listener_targets(load_balancer)
listener.handle_cognito_pools(settings, res_root_stack)
listener.define_new_rules(load_balancer, template)
for listener in load_balancer.new_listeners:
listener.handle_certificates(settings, res_root_stack)
listener.handle_cognito_pools(settings, res_root_stack)
listener.define_default_actions(load_balancer, template)
[docs]def handle_target_groups_association(
load_balancer: Elbv2, res_root_stack: ComposeXStack, settings: ComposeXSettings
) -> None:
"""
Function to create TargetGroups based on the `TargetGroups` defined on the ELB rather than the services.
This allows to associate more than one ECS service to a single TargetGroup.
"""
template = res_root_stack.stack_template
load_balancer.set_listeners(template)
load_balancer.associate_to_template(template)
add_outputs(template, load_balancer.outputs)
_targets = set_else_none("TargetGroups", load_balancer.definition, {})
if not _targets:
print("NO TARGET GROUPS")
return
for _target_name, _target_def in _targets.items():
props = {}
set_healthcheck_definition(props, _target_def, "HealthCheck")
props["Port"] = _target_def["Port"]
props["Protocol"] = _target_def["Protocol"]
props["TargetType"] = "ip"
import_target_group_attributes(props, _target_def, load_balancer)
_tgt_group = MergedTargetGroup(
_target_name,
_target_def,
load_balancer,
load_balancer.stack,
int(_target_def["Port"]),
VpcId=Ref(VPC_ID),
**props,
)
_tgt_group.init_outputs()
_tgt_group.generate_outputs()
add_resource(template, _tgt_group)
add_outputs(template, _tgt_group.outputs)
load_balancer.target_groups.append(_tgt_group)
_tgt_group.associate_families(settings)
for listener in load_balancer.new_listeners:
listener.map_target_group_to_listener(_tgt_group)
for listener in load_balancer.lookup_listeners.values():
print("MAPPING TARGET TO LISTENER", _tgt_group, listener)
listener.map_target_group_to_listener(_tgt_group)
for listener_port, listener_def in load_balancer.lookup_listeners.items():
print(listener_port, listener_def)
for listener in load_balancer.new_listeners:
listener.handle_certificates(settings, res_root_stack)
listener.handle_cognito_pools(settings, res_root_stack)
listener.define_default_actions(load_balancer, template)
[docs]def elbv2_to_ecs(resources, services_stack, res_root_stack, settings):
"""
Entrypoint function to map services, targets, listeners and ACM together
:param dict resources:
:param ecs_composex.common.stacks.ComposeXStack services_stack:
:param ecs_composex.common.stacks.ComposeXStack res_root_stack:
:param ecs_composex.common.settings.ComposeXSettings settings:
:return:
"""
for resource_name, resource in resources.items():
if resource.cfn_resource:
if keyisset("TargetGroups", resource.definition):
LOG.info(
f"{resource.module.res_key}.{resource_name} - Linking to TargetGroups"
)
handle_target_groups_association(resource, res_root_stack, settings)
else:
LOG.info(
f"{resource.module.res_key}.{resource_name} - Linking to Services"
)
handle_services_association(resource, res_root_stack, settings)
elif resource.mappings:
if keyisset("TargetGroups", resource.definition):
LOG.info(
f"{resource.module.res_key}.{resource_name} (Lookup) - Linking to TargetGroups"
)
handle_target_groups_association(resource, res_root_stack, settings)
else:
LOG.info(
f"{resource.module.res_key}.{resource_name} (Lookup) - Linking to Services"
)
handle_services_association(resource, res_root_stack, settings)