Source code for ecs_composex.vpc.helpers

#  SPDX-License-Identifier: MPL-2.0
#  Copyright 2020-2022 John Mille <john@compose-x.io>


from __future__ import annotations

from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from ecs_composex.common.settings import ComposeXSettings
    from ecs_composex.mods_manager import XResourceModule
    from ecs_composex.vpc.vpc_stack import XStack as VpcStack

from ecs_composex.common.logging import LOG
from ecs_composex.common.stacks import ComposeXStack
from ecs_composex.common.troposphere_tools import add_resource, add_update_mapping
from ecs_composex.compose.x_resources.network_x_resources import NetworkXResource


[docs]def update_network_resources_vpc_config( settings: ComposeXSettings, vpc_stack: VpcStack ): """ Iterate over the settings.x_resources, over the root stack nested stacks. If the nested stack has x_resources that depend on VPC, update the stack parameters with the vpc stack settings Although the first if should never be true, setting condition in case for safety. """ from ecs_composex.vpc.vpc_params import VPC_ID_T nested_stacks: dict[str, ComposeXStack] = { resource: resource_def for resource, resource_def in settings.root_stack.stack_template.resources.items() if ( isinstance(resource_def, ComposeXStack) or issubclass(type(resource_def), ComposeXStack) ) } for stack_def in nested_stacks.values(): if VPC_ID_T in stack_def.Parameters: if vpc_stack.vpc_resource.mappings: stack_def.set_vpc_params_from_vpc_lookup(vpc_stack, settings) else: stack_def.set_vpc_parameters_from_vpc_stack(vpc_stack, settings) for resource in settings.x_resources: if not resource: print("NONED???", resource) continue if ( resource and not resource.requires_vpc or (resource and resource.mappings) or (hasattr(resource, "stack") and resource.stack == vpc_stack) ): LOG.debug( f"{resource.module.res_key}.{resource.name} - Lookup resource need no VPC Settings." ) continue if not issubclass(type(resource), NetworkXResource): LOG.debug( f"{resource.module.res_key}.{resource.name} - Not a NetworkXResource" ) if vpc_stack.vpc_resource.mappings: resource.stack.set_vpc_params_from_vpc_lookup(vpc_stack, settings) else: resource.stack.set_vpc_parameters_from_vpc_stack(vpc_stack, settings) if resource.requires_vpc and hasattr(resource, "update_from_vpc"): resource.update_from_vpc(vpc_stack, settings)
[docs]def define_vpc_settings( settings: ComposeXSettings, vpc_module: XResourceModule, vpc_stack: ComposeXStack ): """ Function to deal with vpc stack settings """ if settings.requires_vpc() and not vpc_stack.vpc_resource: LOG.info( f"{settings.name} - Services or x-Resources need a VPC to function. Creating default one" ) vpc_stack.create_new_default_vpc("vpc", vpc_module, settings) add_resource(settings.root_stack.stack_template, vpc_stack) vpc_stack.vpc_resource.generate_outputs() elif ( vpc_stack.is_void and vpc_stack.vpc_resource and vpc_stack.vpc_resource.mappings ): vpc_stack.vpc_resource.generate_outputs() add_update_mapping( settings.root_stack.stack_template, "Network", vpc_stack.vpc_resource.mappings, ) elif ( vpc_stack.vpc_resource and vpc_stack.vpc_resource.cfn_resource and vpc_stack.title not in settings.root_stack.stack_template.resources.keys() ): add_resource(settings.root_stack.stack_template, vpc_stack) LOG.info(f"{settings.name}.x-vpc - VPC stack added. A new VPC will be created.") vpc_stack.vpc_resource.generate_outputs()