# SPDX-License-Identifier: MPL-2.0
# Copyright 2020-2022 John Mille <john@compose-x.io>
"""
Module for the XStack SSM
"""
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ecs_composex.common.settings import ComposeXSettings
from ecs_composex.mods_manager import XResourceModule
from compose_x_common.aws import get_account_id
from compose_x_common.aws.wafv2 import WAF_V2_WEB_ACL_ARN_RE, WAF_V2_WEB_ACL_REF_RE
from compose_x_common.compose_x_common import keyisset, set_else_none
from troposphere import GetAtt, Ref
from ecs_composex.common.logging import LOG
from ecs_composex.common.stacks import ComposeXStack
from ecs_composex.common.troposphere_tools import (
add_resource,
add_update_mapping,
build_template,
)
from ecs_composex.compose.x_resources.environment_x_resources import (
AwsEnvironmentResource,
)
from ecs_composex.wafv2_webacl.wafv2_webacl_params import (
CONTROL_CLOUD_ATTR_MAPPING,
WEB_ACL_ARN,
WEB_ACL_ID,
WEB_ACL_NAMESPACE,
WEB_ACL_REF,
WEB_ACL_WCU,
)
from ecs_composex.wafv2_webacl.wafv2_webacl_template import render_new_web_acls
[docs]class WebACL(AwsEnvironmentResource):
"""
Class to represent a WebACL
"""
def __init__(
self,
name: str,
definition: dict,
module: XResourceModule,
settings: ComposeXSettings,
):
super().__init__(name, definition, module, settings)
self.ref_parameter = WEB_ACL_REF
self.arn_parameter = WEB_ACL_ARN
self.cloud_control_attributes_mapping = CONTROL_CLOUD_ATTR_MAPPING
[docs] def init_outputs(self):
self.output_properties = {
WEB_ACL_REF: (self.logical_name, self.cfn_resource, Ref, None),
WEB_ACL_ARN: (
f"{self.logical_name}{WEB_ACL_ARN.return_value}",
self.cfn_resource,
GetAtt,
WEB_ACL_ARN.return_value,
),
WEB_ACL_ID: (
f"{self.logical_name}{WEB_ACL_ID.return_value}",
self.cfn_resource,
GetAtt,
WEB_ACL_ID.return_value,
),
WEB_ACL_NAMESPACE: (
f"{self.logical_name}{WEB_ACL_NAMESPACE.return_value}",
self.cfn_resource,
GetAtt,
WEB_ACL_NAMESPACE.return_value,
),
# WEB_ACL_WCU: (
# f"{self.logical_name}{WEB_ACL_WCU.return_value}",
# self.cfn_resource,
# GetAtt,
# WEB_ACL_WCU.return_value,
# ),
}
[docs] def lookup_resource(
self,
arn_re,
native_lookup_function,
cfn_resource_type,
tagging_api_id: str = None,
subattribute_key=None,
use_arn_for_id: bool = False,
):
"""
Method to self-identify properties. It will try to use AWS Cloud Control API if possible, otherwise fallback
to using boto3 descriptions functions to create a mapping of the attributes.
For WAFv2 given these do not have Tags, using CloudControl only.
"""
self.init_outputs()
wafv2_webacl_id = None
lookup_attributes = self.lookup
if subattribute_key is not None:
lookup_attributes = self.lookup[subattribute_key]
if keyisset("Identifier", lookup_attributes):
if not WAF_V2_WEB_ACL_REF_RE.match(lookup_attributes["Identifier"]):
raise ValueError(
"Identifier {} is invalid. Must match {}".format(
lookup_attributes["Identifier"], WAF_V2_WEB_ACL_REF_RE.pattern
)
)
wafv2_webacl_id = lookup_attributes["Identifier"]
elif keyisset("Arn", lookup_attributes):
LOG.info(f"{self.module.res_key}.{self.name} - Lookup via ARN")
LOG.debug(
f"{self.module.res_key}.{self.name} - ARN is {lookup_attributes['Arn']}"
)
arn_parts = arn_re.match(lookup_attributes["Arn"])
if not arn_parts:
raise KeyError(
f"{self.module.res_key}.{self.name} - ARN {lookup_attributes['Arn']} is not valid. Must match",
arn_re.pattern,
)
self.arn = lookup_attributes["Arn"]
wafv2_webacl_id: str = "|".join(
[
arn_parts.group("name"),
arn_parts.group("id"),
arn_parts.group("scope").upper(),
]
)
elif keyisset("Tags", lookup_attributes):
raise AttributeError("Tags do not apply to AWS::WAFv2::WebACL")
else:
raise KeyError(f"{self.module.res_key}.{self.name} - You must specify Arn")
if wafv2_webacl_id is None:
raise LookupError(
f"{self.module.res_key}.{self.name} - Failed to find the WAFv2 WebACL from either Identifier or ARN"
)
LOG.debug(
"arn: %s - waf_id: %s",
self.arn,
wafv2_webacl_id,
)
(
props,
self.cloud_control_properties,
) = self.cloud_control_attributes_mapping_lookup(
cfn_resource_type, wafv2_webacl_id
)
self.lookup_properties = props
self.generate_cfn_mappings_from_lookup_properties()
self.generate_outputs()
[docs] def init_stack(self, root_stack, settings: ComposeXSettings) -> None:
"""
Initialize a XStack for resource associated with Lookup resources
:param ComposeXStack root_stack: The root stack
"""
if self.stack.is_void:
stack_template = build_template("WAFv2 WebACL stack - Compose-X")
super(XStack, self.stack).__init__("wafv2_webacl", stack_template)
self.stack.is_void = False
add_update_mapping(
self.stack.stack_template,
self.module.mapping_key,
settings.mappings[self.module.mapping_key],
)
add_resource(root_stack.stack_template, self.stack)
[docs] def handle_x_dependencies(self, settings, root_stack) -> None:
"""
WIll go over all the new resources to create in the execution and search for properties that can be updated
with itself
:param ecs_composex.common.settings.ComposeXSettings settings:
:param ComposeXStack root_stack: The root stack
"""
from ecs_composex.elbv2 import Elbv2
from ecs_composex.wafv2_webacl.wafv2_webacl_elbv2 import handle_elbv2
load_balancers = set_else_none("LoadBalancers", self.definition)
if not load_balancers:
return
for resource in settings.get_x_resources(include_mappings=False):
if not resource.cfn_resource:
continue
resource_stack = resource.stack
if not resource_stack:
LOG.debug(
f"resource {resource.name} has no `stack` attribute defined. Skipping"
)
continue
mappings = [
(Elbv2, handle_elbv2),
]
for target in mappings:
if isinstance(resource, target[0]) or issubclass(
type(resource), target[0]
):
if resource.name not in load_balancers:
continue
if (
self.mappings
and self.stack
and not self.stack.is_void
and self.stack.stack_template
):
add_update_mapping(
self.stack.stack_template,
self.module.mapping_key,
settings.mappings[self.module.mapping_key],
)
target[1](
self, self.stack, resource, resource_stack, settings, root_stack
)
[docs]def resolve_lookup(lookup_resources, settings, module: XResourceModule):
"""
Lookup of the AWS resources and setting the mappings for the resource type
"""
from troposphere.wafv2 import WebACL as CfnWebACL
if not keyisset(module.mapping_key, settings.mappings):
settings.mappings[module.mapping_key] = {}
for resource in lookup_resources:
resource.lookup_resource(
WAF_V2_WEB_ACL_ARN_RE,
None,
CfnWebACL.resource_type,
)
LOG.info(f"{module.res_key}.{resource.name} - Matched to {resource.arn}")
settings.mappings[module.mapping_key].update(
{resource.logical_name: resource.mappings}
)
[docs]class XStack(ComposeXStack):
def __init__(
self, title, settings: ComposeXSettings, module: XResourceModule, **kwargs
):
if module.lookup_resources:
resolve_lookup(module.lookup_resources, settings, module)
if module.new_resources:
template = build_template(f"WAFv2 WebAcl for {settings.name}")
super().__init__(module.mapping_key, stack_template=template, **kwargs)
render_new_web_acls(module.new_resources, self)
else:
self.is_void = True
for resource in module.resources_list:
resource.stack = self