# SPDX-License-Identifier: MPL-2.0
# Copyright 2020-2022 John Mille <john@compose-x.io>
"""
Module to help with defining the network settings for the ECS Service based on the family services definitions.
"""
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from troposphere import AWSHelperFn
from ecs_composex.common.settings import ComposeXSettings
import re
from copy import deepcopy
from ipaddress import IPv4Interface
from json import dumps
from compose_x_common.compose_x_common import keyisset, set_else_none
from troposphere import AWS_ACCOUNT_ID, AWS_NO_VALUE, Ref, Sub
from troposphere.ec2 import SecurityGroupIngress
from ecs_composex.common import NONALPHANUM
from ecs_composex.common.aws import (
define_lookup_role_from_info,
find_aws_resource_arn_from_tags_api,
)
from ecs_composex.common.logging import LOG
[docs]def flatten_ip(ip_str):
"""
Function to remove all non alphanum characters from IP CIDR notation
:param ip_str:
:rtype: str
"""
return str(ip_str.replace(".", "").split("/")[0].strip())
[docs]def generate_security_group_props(allowed_source):
"""
Function to parse the allowed source and create the SG Opening options accordingly.
:param dict allowed_source: The allowed source defined in configs
:return: security group ingress properties
:rtype: dict
"""
props = {
"CidrIp": (
allowed_source[Ingress.ipv4_key]
if keyisset(Ingress.ipv4_key, allowed_source)
else Ref(AWS_NO_VALUE)
),
"CidrIpv6": (
allowed_source[Ingress.ipv6_key]
if keyisset(Ingress.ipv6_key, allowed_source)
else Ref(AWS_NO_VALUE)
),
}
if keyisset("CidrIp", props) and isinstance(props["CidrIp"], str):
try:
IPv4Interface(props["CidrIp"])
except Exception as error:
LOG.error(f"Falty IP Address: {allowed_source}")
raise ValueError("Not a valid IPv4 CIDR notation", props["CidrIp"], error)
return props
[docs]def define_protocol(port_string):
"""
Function to define the port protocol. Defaults to TCP if not specified otherwise
:param port_string: the port string to parse from the ports list in the compose file
:type port_string: str
:return: protocol, ie. udp or tcp
:rtype: str
"""
protocols = ["tcp", "udp"]
protocol = "tcp"
if port_string.find("/"):
protocol_found = port_string.split("/")[-1].strip()
if protocol_found in protocols:
return protocol_found
return protocol
[docs]def set_port_from_str(port: str):
"""
Function to filter out port string and define published port, target port and protocol
:param str port:
:return: the ports parameters
:rtype: tuple
"""
if r"/" in port:
protocol = port.split(r"/")[-1]
if protocol not in ["udp", "tcp"]:
raise ValueError(
"Protocol", protocol, "is not valid. Must be one of", ["udp", "tcp"]
)
port = port.split(r"/")[0]
else:
protocol = "tcp"
if r":" in port:
published = port.split(r":")[0]
target = port.split(r":")[1]
else:
target = port
published = port
if r"-" in target or r"-" in published:
raise ValueError(
"Range ports not supported for exposure in AWS ECS with AWSVPC mode"
)
numbers_only = re.compile(r"^\d+$")
if not numbers_only.match(target):
raise ValueError("target port is not valid", numbers_only.pattern)
if not numbers_only.match(published):
raise ValueError("published port is not valid", numbers_only.pattern)
if not (1 <= int(target) < (2**16)):
raise ValueError(f"target port {target} is not between 1 and 65535")
if not (1 <= int(published) < (2**16)):
raise ValueError(f"published port {published} is not between 1 and 65535")
return published, target, protocol
[docs]def set_service_ports(ports):
"""Function to define common structure to ports
:return: list of ports the ecs_service uses formatted according to dict
:rtype: list
"""
service_ports = []
for port in ports:
if not isinstance(port, (str, dict, int)):
raise TypeError(
"ports must be of types", dict, "or", list, "got", type(port)
)
if isinstance(port, str):
parts = set_port_from_str(port)
service_ports.append(
{
"protocol": parts[2],
"published": int(parts[0]),
"target": int(parts[1]),
}
)
elif isinstance(port, dict):
service_ports.append(port)
elif isinstance(port, int):
service_ports.append(
{
"protocol": "tcp",
"published": port,
"target": port,
}
)
return service_ports
[docs]def lookup_security_group(settings: ComposeXSettings, lookup: dict | list) -> str:
"""Function to fetch the security group ID based on lookup details"""
sg_re = re.compile(
r"^arn:aws(?:-[a-z]+)?:ec2:[a-z0-9-]+:\d{12}:security-group/([\S]+)$"
)
ec2_types = {
"ec2:security-group": {"regexp": sg_re.pattern},
}
lookup_session = define_lookup_role_from_info(lookup, settings.session)
sg_arn = find_aws_resource_arn_from_tags_api(
lookup,
lookup_session,
"ec2:security-group",
types=ec2_types,
)
if not sg_arn:
raise LookupError("Failed to identify EC2 SecurityGroup based on tags")
return sg_re.match(sg_arn).groups()[0]
[docs]class Ingress:
"""
Class to group the configuration for Service network settings
"""
defined = True
master_key = "Ingress"
aws_sources_key = "AwsSources"
ext_sources_key = "ExtSources"
services_key = "Services"
ipv4_key = "IPv4"
ipv6_key = "IPv6"
network_settings = [master_key, "UseCloudmap", "IsPublic"]
def __init__(self, definition, ports):
"""
Initialize network settings for the family ServiceConfig
"""
self.definition = deepcopy(definition)
self.aws_sources = (
self.definition[self.aws_sources_key]
if keyisset(self.aws_sources_key, self.definition)
else []
)
self.ext_sources = (
self.definition[self.ext_sources_key]
if keyisset(self.ext_sources_key, self.definition)
else []
)
self.ext_sources = []
if keyisset(self.ext_sources_key, self.definition):
cidrs = []
for ext_source in self.definition[self.ext_sources_key]:
source_cidr = set_else_none(
self.ipv4_key,
ext_source,
set_else_none(self.ipv6_key, ext_source, None),
)
if source_cidr and source_cidr not in cidrs:
self.ext_sources.append(ext_source)
else:
LOG.warning(
f"Ingress source {source_cidr} already defined in a previous Ingress rule."
)
self.services = (
self.definition[self.services_key]
if keyisset(self.services_key, self.definition)
else []
)
self.ports = ports
self.aws_ingress_rules = []
self.ext_ingress_rules = []
self.to_self_rules = []
def __repr__(self):
return dumps(self.definition, indent=2)
[docs] def handle_security_group_source(
self,
source,
common_args: dict,
destination_title: str,
target_port: int,
settings,
) -> None:
"""
Method to handle SecurityGroup sources
It updates the list of AWS sources ingress rules that will later be added to the stack template of the family
"""
if keyisset("Id", source):
sg_id = source["Id"]
elif keyisset("Lookup", source):
sg_id = lookup_security_group(settings, source["Lookup"])
else:
raise KeyError(
"Information missing to identify the SecurityGroup. Requires either Id or Lookup"
)
common_args.update(
{
"Description": Sub(
f"From {sg_id} to {destination_title} on port {target_port}"
)
}
)
self.aws_ingress_rules.append(
SecurityGroupIngress(
f"From{NONALPHANUM.sub('', sg_id)}ToServiceOn{target_port}",
SourceSecurityGroupId=sg_id,
SourceSecurityGroupOwnerId=set_else_none(
"AccountOwner", source, Ref(AWS_ACCOUNT_ID)
),
**common_args,
)
)
[docs] def set_aws_sources_ingress(
self, settings: ComposeXSettings, destination_title: str, sg_ref: AWSHelperFn
) -> None:
"""Method to define AWS Sources ingresses"""
for source in self.aws_sources:
for port in self.ports:
if (
keyisset("Ports", source)
and keyisset("published", port)
and port["published"] not in source["Ports"]
):
continue
target_port = set_else_none(
"published", port, alt_value=set_else_none("target", port, None)
)
if target_port is None:
raise ValueError(
"Wrong port definition value for security group ingress", port
)
common_args = {
"FromPort": target_port,
"ToPort": target_port,
"IpProtocol": port["protocol"],
"GroupId": sg_ref,
}
if source["Type"] == "SecurityGroup":
self.handle_security_group_source(
source, common_args, destination_title, target_port, settings
)
elif source["Type"] == "PrefixList":
self.aws_ingress_rules.append(
SecurityGroupIngress(
f"From{NONALPHANUM.sub('', source['Id'])}ToServiceOn{target_port}",
SourcePrefixListId=source["Id"],
**common_args,
)
)
[docs] def create_ext_sources_ingress_rule(
self, destination_title, allowed_source, security_group: AWSHelperFn, **props
) -> None:
"""
Creates the Security Ingress rule for a CIDR based rule
:param str destination_title:
:param dict allowed_source:
:param security_group:
:param dict props:
"""
for port in self.ports:
target_port = set_else_none(
"published", port, alt_value=set_else_none("target", port, None)
)
if target_port is None:
raise ValueError(
"Wrong port definition value for security group ingress", port
)
if (
keyisset("Ports", allowed_source)
and target_port not in allowed_source["Ports"]
):
continue
if keyisset("Name", allowed_source):
name = NONALPHANUM.sub("", allowed_source["Name"])
title = f"From{name.title()}To{target_port}{port['protocol']}"
description = Sub(
f"From {name.title()} "
f"To {target_port}{port['protocol']} for {destination_title}"
)
else:
title = (
f"From{flatten_ip(allowed_source[self.ipv4_key])}"
f"To{target_port}{port['protocol']}"
)
description = Sub(
f"Public {target_port}{port['protocol']}"
f" for {destination_title}"
)
self.ext_ingress_rules.append(
SecurityGroupIngress(
title,
Description=(
description
if not keyisset("Description", allowed_source)
else allowed_source["Description"]
),
GroupId=security_group,
IpProtocol=port["protocol"],
FromPort=target_port,
ToPort=target_port,
**props,
)
)
[docs] def set_ext_sources_ingress(self, destination_tile, security_group):
"""
Method to add ingress rules from external sources to a given Security Group (ie. ALB Security Group).
If a list of IPs is found in the config['ext_sources'] part of the network section of configs for the service,
then it will use that. If no IPv4 source is indicated, it will by default allow traffic from 0.0.0.0/0
:param str destination_tile: The name of the destination for description
:param security_group: security group (object or title string) to add the rules to
:type security_group: str or troposphere.ec2.SecurityGroup or troposphere.Ref or Troposphere.GetAtt
"""
if not self.ext_sources:
LOG.debug("No external rules defined. Skipping.")
return
for allowed_source in self.ext_sources:
if not keyisset(self.ipv4_key, allowed_source) and not keyisset(
self.ipv6_key, allowed_source
):
LOG.warning(f"No {self.ipv4_key} or {self.ipv6_key} set. Skipping")
continue
props = generate_security_group_props(allowed_source)
if props:
LOG.debug(f"Adding {allowed_source} for ingress")
self.create_ext_sources_ingress_rule(
destination_tile, allowed_source, security_group, **props
)
[docs] def associate_aws_ingress_rules(self, template):
"""
Method to associate AWS ingress rules to a specific template
:param troposphere.Template template:
:return:
"""
for ingress_rule in self.aws_ingress_rules:
if ingress_rule.title not in template.resources:
template.add_resource(ingress_rule)
[docs] def associate_ext_ingress_rules(self, template):
"""
Method to associate External ingress rules to a specific template
:param troposphere.Template template:
:return:
"""
for ingress_rule in self.ext_ingress_rules:
if ingress_rule.title not in template.resources:
template.add_resource(ingress_rule)