# SPDX-License-Identifier: MPL-2.0
# Copyright 2020-2022 John Mille <john@compose-x.io>
from __future__ import annotations
from typing import TYPE_CHECKING
import ecs_composex.common.troposphere_tools
if TYPE_CHECKING:
from ecs_composex.mods_manager import XResourceModule
from ecs_composex.common.settings import ComposeXSettings
from compose_x_common.aws.elasticloadbalancing import LB_V2_LISTENER_ARN_RE
from compose_x_common.compose_x_common import keyisset, keypresent, set_else_none
from troposphere import AWS_NO_VALUE, AWS_STACK_NAME, GetAtt, Ref, Select, Sub, Tags
from troposphere.ec2 import EIP, SecurityGroup
from troposphere.elasticloadbalancingv2 import (
LoadBalancer,
LoadBalancerAttributes,
SubnetMapping,
)
from ecs_composex.common import NONALPHANUM
from ecs_composex.common.aws import find_aws_resource_arn_from_tags_api
from ecs_composex.common.logging import LOG
from ecs_composex.common.troposphere_tools import ROOT_STACK_NAME, add_parameters
from ecs_composex.compose.x_resources.network_x_resources import NetworkXResource
from ecs_composex.elbv2.elbv2_ecs import MergedTargetGroup
from ecs_composex.elbv2.elbv2_params import (
LB_ARN,
LB_CLOUD_CONTROL_ATTRIBUTES,
LB_DNS_NAME,
LB_DNS_ZONE_ID,
LB_FULL_NAME,
LB_NAME,
LB_SG_ID,
MOD_KEY,
)
from ecs_composex.elbv2.elbv2_stack.elbv2_listener import ComposeListener
from ecs_composex.elbv2.elbv2_stack.elbv2_listener.lookup_listener import LookupListener
from ecs_composex.elbv2.elbv2_stack.helpers import (
LISTENER_TARGET_RE,
handle_cross_zone,
handle_desync_mitigation_mode,
handle_drop_invalid_headers,
handle_http2,
handle_timeout_seconds,
validate_listeners_duplicates,
)
from ecs_composex.ingress_settings import Ingress, set_service_ports
from ecs_composex.vpc.vpc_params import APP_SUBNETS, PUBLIC_SUBNETS, VPC_ID
[docs]class Elbv2(NetworkXResource):
"""
Class to handle ELBv2 creation and mapping to ECS Services
"""
subnets_param = APP_SUBNETS
def __init__(
self, name, definition, module: XResourceModule, settings: ComposeXSettings
):
self.lb_is_public = False
self._lb_type = "application"
self.ingress = None
self.lb_sg = None
self.lb_eips = []
self.unique_service_lb = False
self.lb = None
self.new_listeners: list[ComposeListener] = []
self.lookup_listeners: dict[int, LookupListener] = {}
self.target_groups: list[MergedTargetGroup] = []
super().__init__(name, definition, module, settings)
if not keyisset("Listeners", definition) and not self.lookup:
raise KeyError(
"You must specify at least one new Listener for a new LB.", name
)
if (
self.lookup
and not keyisset("Listeners", self.definition)
and not keyisset("Listeners", self.lookup)
):
raise KeyError(
"When looking up LB, you must either create a new Listener or Lookup and existing one."
)
self.cloud_control_attributes_mapping = LB_CLOUD_CONTROL_ATTRIBUTES
self.no_allocate_eips: bool = keyisset("NoAllocateEips", self.settings)
self.retain_eips: bool = keyisset("RetainEips", self.settings)
self.validate_services()
self.sort_props()
self.module_name = MOD_KEY
self.ref_parameter = LB_ARN
@property
def lb_type(self) -> str:
if self.cloud_control_properties and keyisset(
"Type", self.cloud_control_properties
):
return self.cloud_control_properties["Type"]
return self._lb_type
[docs] def init_outputs(self):
self.output_properties = {
LB_ARN: (self.logical_name, self.cfn_resource, Ref, None),
LB_DNS_NAME: (
f"{self.logical_name}{LB_DNS_NAME.return_value}",
self.cfn_resource,
GetAtt,
LB_DNS_NAME.return_value,
),
LB_DNS_ZONE_ID: (
f"{self.logical_name}{LB_DNS_ZONE_ID.return_value}",
self.cfn_resource,
GetAtt,
LB_DNS_ZONE_ID.return_value,
),
LB_NAME: (
f"{self.logical_name}{LB_NAME.return_value}",
self.cfn_resource,
GetAtt,
LB_NAME.return_value,
),
LB_FULL_NAME: (
f"{self.logical_name}{LB_FULL_NAME.return_value}",
self.cfn_resource,
GetAtt,
LB_FULL_NAME.return_value,
),
}
[docs] def set_listeners(self, template):
"""
Method to define the listeners
:return:
"""
listeners: list[dict] = set_else_none("Listeners", self.definition, [])
if not listeners and not self.lookup_listeners:
raise KeyError(
f"You must define at least one listener for LB {self.name}"
" when not looking up existing ones."
)
ports = [listener["Port"] for listener in listeners]
validate_listeners_duplicates(self.name, ports)
for listener_def in listeners:
targets: list[dict] = set_else_none("Targets", listener_def, [])
if targets and self.services:
for target in targets:
target_parts = LISTENER_TARGET_RE.match(target["name"])
if not target_parts:
raise ValueError(
f"{self.module.res_key}.{self.name} - Listener {listener_def['Port']}"
f" - Target {target['name']} is not a valid value. Must match",
LISTENER_TARGET_RE.pattern,
)
if (
f"{target_parts.group('family')}:{target_parts.group('container')}"
not in self.services
):
listener_def["Targets"].remove(target)
if keyisset("Targets", listener_def) or keyisset(
"DefaultActions", listener_def
):
new_listener = template.add_resource(
ComposeListener(self, listener_def)
)
self.new_listeners.append(new_listener)
else:
LOG.warning(
f"{self.module.res_key}.{self.name} - "
f"Listener {listener_def['Port']} has no action or service. Not used."
)
[docs] def find_lookup_listeners(self):
"""
Method to lookup the listeners defined in the definition and sets them up.
Will use them to add to the LB mappings
"""
listeners: dict = set_else_none("Listeners", self.lookup, {})
if not listeners:
LOG.debug("No Listener to lookup.")
for listener_port, listener_def in listeners.items():
listener: LookupListener = LookupListener(self, listener_port, listener_def)
self.lookup_listeners[listener_port] = listener
[docs] def set_services_targets(self, settings):
"""
Method to map services and families targets of the services defined.
TargetStructure:
(family, family_wide, services[], access)
:param ecs_composex.common.settings.ComposeXSettings settings:
:return:
"""
if not self.services:
LOG.debug(f"{self.module.res_key}.{self.name} No Services defined.")
return
for family_combo_name, service_def in self.services.items():
service_name = family_combo_name.split(":")[-1]
family_name = NONALPHANUM.sub("", family_combo_name.split(":")[0])
LOG.info(
f"{self.module.res_key}.{self.name} - Adding target {family_name}:{service_name}"
)
if family_name not in settings.families:
raise ValueError(
f"{self.module.res_key}.{self.name} - Service family {family_name} is invalid. Defined families",
settings.families.keys(),
)
for f_service in settings.families[family_name].ordered_services:
if f_service.name == service_name:
if f_service not in settings.services:
raise ValueError(
f"{self.module.res_key}.{self.name} Please, use only the services names."
"You cannot use the family name defined by deploy labels"
f"Found {f_service}",
[s for s in settings.services],
[f for f in settings.families],
)
elif (
f_service.name == service_name
and f_service in settings.services
and f_service not in self.families_targets
):
self.families_targets.append(
(
f_service.family,
f_service,
service_def,
f"{family_combo_name}{service_def['port']}",
)
)
break
else:
raise ValueError(
f"{self.module.res_key}.{self.name} - Could not find {service_name} in family {family_name}"
)
self.debug_families_targets()
[docs] def validate_services(self):
services_names = list(self.services.keys())
if len(services_names) == 1:
LOG.info(
f"LB {self.name} only has a unique service. LB will be deployed with the service stack."
)
self.unique_service_lb = True
[docs] def sort_props(self):
self.lb_is_public = (
True
if (
keyisset("Scheme", self.properties)
and self.properties["Scheme"] == "internet-facing"
)
else False
)
self._lb_type = (
"application"
if not keyisset("Type", self.properties)
else self.properties["Type"]
)
self.sort_sg()
[docs] def sort_sg(self):
if self.is_nlb():
self.lb_sg = Ref(AWS_NO_VALUE)
elif self.is_alb():
if not self.lookup:
self.lb_sg = SecurityGroup(
f"{self.logical_name}SecurityGroup",
GroupDescription=Sub(
f"SG for LB {self.logical_name} in ${{{AWS_STACK_NAME}}}"
),
GroupName=Sub(
f"{self.logical_name}-{self.lb_type}-sg-${{{AWS_STACK_NAME}}}"
),
VpcId=Ref(VPC_ID),
Tags=Tags(
Name=Sub(f"elbv2-{self.logical_name}-${{{AWS_STACK_NAME}}}")
),
)
else:
self.lb_sg = Ref(AWS_NO_VALUE)
[docs] def sort_alb_ingress(self, settings, stack_template):
"""
Method to handle Ingress to ALB
"""
if self.is_nlb():
LOG.warning(
"You defined ingress rules for a NLB. This is invalid. Define ingress rules at the service level."
)
return
elif not self.parameters or (
self.parameters and not keyisset("Ingress", self.parameters)
):
LOG.warning(f"You did not define any Ingress rules for ALB {self.name}.")
return
ports = [listener["Port"] for listener in self.definition["Listeners"]]
ports = set_service_ports(ports)
self.ingress = Ingress(self.parameters["Ingress"], ports)
if self.ingress and self.is_alb():
if self.cfn_resource:
self.ingress.set_aws_sources_ingress(
settings, self.logical_name, GetAtt(self.lb_sg, "GroupId")
)
self.ingress.set_ext_sources_ingress(
self.logical_name, GetAtt(self.lb_sg, "GroupId")
)
else:
from ecs_composex.elbv2.elbv2_params import LB_SG_ID
add_parameters(
stack_template,
[self.attributes_outputs[LB_SG_ID]["ImportParameter"]],
)
self.stack.Parameters.update(
{
self.attributes_outputs[LB_SG_ID][
"ImportParameter"
].title: self.attributes_outputs[LB_SG_ID]["ImportValue"]
}
)
self.ingress.set_aws_sources_ingress(
settings,
self.logical_name,
Ref(self.attributes_outputs[LB_SG_ID]["ImportParameter"]),
)
self.ingress.set_ext_sources_ingress(
self.logical_name,
Ref(self.attributes_outputs[LB_SG_ID]["ImportParameter"]),
)
self.ingress.associate_aws_ingress_rules(stack_template)
self.ingress.associate_ext_ingress_rules(stack_template)
[docs] def define_override_subnets(self, subnets, vpc_stack):
"""
Method to define the subnets overrides to use for the LB
:param subnets: The original subnets to replace
:param ecs_composex.vpc.vpc_stack.VpcStack vpc_stack:
:return: the subnet name to use
:rtype: str
"""
if self.subnets_override:
if self.subnets_override not in vpc_stack.vpc_resource.mappings.keys():
raise KeyError(
f"The subnets indicated for {self.name} is not valid. Valid ones are",
vpc_stack.vpc_resource.mappings.keys(),
)
return self.subnets_override
if isinstance(subnets, Ref):
return subnets.data["Ref"]
return subnets
[docs] def set_eips(self, vpc_stack):
"""
:param ecs_composex.vpc.vpc_stack.VpcStack vpc_stack:
:return:
"""
if self.is_nlb() and self.lb_is_public:
if vpc_stack.vpc_resource.cfn_resource:
for public_subnet in vpc_stack.vpc_resource.public_subnets[1]:
self.lb_eips.append(
EIP(
f"{self.logical_name}Eip{public_subnet.title}",
Domain="vpc",
DeletionPolicy="Retain" if self.retain_eips else "Delete",
)
)
elif vpc_stack.vpc_resource.mappings:
subnets = self.define_override_subnets(PUBLIC_SUBNETS.title, vpc_stack)
for public_az in vpc_stack.vpc_resource.mappings[subnets]["Azs"]:
self.lb_eips.append(
EIP(
f"{self.logical_name}Eip{public_az.title().split('-')[-1]}",
Domain="vpc",
DeletionPolicy="Retain" if self.retain_eips else "Delete",
)
)
[docs] def set_subnets(self, vpc_stack):
"""
Method to define which subnets to use for the
:param ecs_composex.vpc.vpc_stack.VpcStack vpc_stack:
:return:
"""
if (
self.subnets_override
and vpc_stack.vpc_resource.cfn_resource
and self.subnets_override
not in [
PUBLIC_SUBNETS.title,
APP_SUBNETS.title,
]
):
raise ValueError(
"When Compose-X creates the VPC, the only subnets you can define to use are",
[PUBLIC_SUBNETS.title, APP_SUBNETS.title],
)
if self.is_nlb() and self.lb_is_public and not self.no_allocate_eips:
return Ref(AWS_NO_VALUE)
if (
self.subnets_override
and not vpc_stack.vpc_resource.cfn_resource
and vpc_stack.vpc_resource.mappings
and self.subnets_override in vpc_stack.vpc_resource.mappings.keys()
):
return Ref(self.subnets_override)
elif self.lb_is_public:
return Ref(PUBLIC_SUBNETS)
return Ref(APP_SUBNETS)
[docs] def set_subnet_mappings(self, vpc_stack):
"""
For NLB, defines the EC2 EIP and Subnets Mappings to use.
Determines the number of EIP to produce from the VPC Settings.
"""
if self.is_alb():
return Ref(AWS_NO_VALUE)
if not self.lb_eips and self.lb_is_public and not self.no_allocate_eips:
self.set_eips(vpc_stack)
mappings = []
subnets = self.define_override_subnets(PUBLIC_SUBNETS.title, vpc_stack)
for count, eip in enumerate(self.lb_eips):
mappings.append(
SubnetMapping(
AllocationId=GetAtt(eip, "AllocationId"),
SubnetId=Select(count, Ref(subnets)),
)
)
return mappings
elif not self.lb_is_public or (self.lb_is_public and self.no_allocate_eips):
self.cfn_resource.Subnets = self.set_subnets(vpc_stack)
return Ref(AWS_NO_VALUE)
[docs] def parse_attributes_settings(self):
"""
Method to parse pre-defined settings for shortcuts
:return: the lb attributes mappings
:rtype: list
"""
valid_settings = [
("timeout_seconds", int, handle_timeout_seconds, self.is_alb()),
(
"desync_mitigation_mode",
str,
handle_desync_mitigation_mode,
self.is_alb(),
),
(
"drop_invalid_header_fields",
bool,
handle_drop_invalid_headers,
self.is_alb(),
),
("http2", bool, handle_http2, self.is_alb()),
("cross_zone", bool, handle_cross_zone, self.is_nlb()),
]
mappings = []
for setting in valid_settings:
if (
keypresent(setting[0], self.parameters)
and isinstance(self.parameters[setting[0]], setting[1])
and setting[3]
):
if setting[2] and setting[3]:
mappings.append(setting[2](self.parameters[setting[0]]))
elif setting[3]:
mappings.append(
LoadBalancerAttributes(
Key=setting[0],
Value=str(self.parameters[setting[0]]),
)
)
return mappings
[docs] def set_lb_attributes(self):
"""
Method to define the LB attributes
:return: List of LB Attributes
:rtype: list
"""
attributes = []
if keyisset("LoadBalancerAttributes", self.properties):
for prop in self.properties["LoadBalancerAttributes"]:
attributes.append(
LoadBalancerAttributes(
Key=prop,
Value=self.properties["LoadBalancerAttributes"][prop],
)
)
elif (
not keyisset("LoadBalancerAttributes", self.definition) and self.parameters
):
attributes = self.parse_attributes_settings()
if attributes:
return attributes
return Ref(AWS_NO_VALUE)
[docs] def set_lb_definition(self):
"""
Function to parse the LB settings and properties and build the LB object
:param ecs_composex.common.settings.ComposeXSettings settings:
"""
attrs = {
"IpAddressType": (
"ipv4"
if not keyisset("IpAddressType", self.properties)
else self.properties["IpAddressType"]
),
"Type": self.lb_type,
"Scheme": "internet-facing" if self.lb_is_public else "internal",
"SecurityGroups": (
[Ref(self.lb_sg)]
if isinstance(self.lb_sg, SecurityGroup)
else self.lb_sg
),
"Subnets": Ref(AWS_NO_VALUE),
"SubnetMappings": Ref(AWS_NO_VALUE),
"LoadBalancerAttributes": self.set_lb_attributes(),
"Tags": Tags(Name=Sub(f"${{{ROOT_STACK_NAME.title}}}{self.logical_name}")),
"Name": Ref(AWS_NO_VALUE),
}
self.lb = LoadBalancer(self.logical_name, **attrs)
self.cfn_resource = self.lb
[docs] def is_nlb(self):
return True if self.lb_type == "network" else False
[docs] def is_alb(self):
return True if self.lb_type == "application" else False
[docs] def associate_to_template(self, template):
"""
Method to associate all resources to the template
:param troposphere.Template template:
:return:
"""
if self.cfn_resource:
template.add_resource(self.lb)
self.init_outputs()
if self.lb_sg and isinstance(self.lb_sg, SecurityGroup):
self.output_properties.update(
{
LB_SG_ID: (
f"{self.logical_name}{LB_SG_ID.return_value}",
self.lb_sg,
GetAtt,
LB_SG_ID.return_value,
None,
)
}
)
template.add_resource(self.lb_sg)
for eip in self.lb_eips:
template.add_resource(eip)
self.generate_outputs()
[docs] def update_from_vpc(self, vpc_stack, settings=None):
"""
Override to set the specific resources right once we have a VPC Definition
:param ecs_composex.vpc.vpc_stack.VpcStack vpc_stack:
:param ecs_composex.common.settings.ComposeXSettings settings:
"""
if vpc_stack and vpc_stack.vpc_resource:
if self.is_alb():
self.cfn_resource.Subnets = self.set_subnets(vpc_stack)
elif self.is_nlb():
if self.no_allocate_eips:
self.cfn_resource.Subnets = self.set_subnets(vpc_stack)
else:
self.cfn_resource.SubnetMappings = self.set_subnet_mappings(
vpc_stack
)