Source code for ecs_composex.mods_manager

#  SPDX-License-Identifier: MPL-2.0
#  Copyright 2020-2022 John Mille <john@compose-x.io>

"""
This module allows managing import of x-resources modules into ecs-composex dynamically and order the resources
processing based on the type of resource this is.

Priority order goes

* AWS Environment resources
* AWS API based resources (purely serverless resources)
* AWS Networking based resources (resources that require VPC)

"""

from __future__ import annotations

import sys
import warnings
from typing import TYPE_CHECKING, Union

if TYPE_CHECKING:
    from ecs_composex.common.settings import ComposeXSettings
    from ecs_composex.common.settings import ComposeXSettings
    from ecs_composex.compose.x_resources import XResource
    from ecs_composex.compose.x_resources.services_resources import ServicesXResource
    from ecs_composex.compose.x_resources.api_x_resources import ApiXResource
    from ecs_composex.compose.x_resources.environment_x_resources import (
        AwsEnvironmentResource,
    )
    from ecs_composex.compose.x_resources.network_x_resources import (
        NetworkXResource,
        DatabaseXResource,
    )

import re
from collections import OrderedDict
from copy import deepcopy
from importlib import import_module
from json import loads

from compose_x_common.compose_x_common import keyisset, set_else_none

from ecs_composex.common import NONALPHANUM
from ecs_composex.common.ecs_composex import X_KEY
from ecs_composex.common.logging import LOG
from ecs_composex.iam.import_sam_policies import import_and_cleanse_sam_policies


[docs]class XResourceModule: def __init__( self, res_key: str, x_class, posix_path, resource_class: ( XResource | ServicesXResource | ApiXResource | AwsEnvironmentResource | NetworkXResource | DatabaseXResource ) = None, definition: dict = None, ): if definition and not isinstance(definition, dict): raise TypeError("The module resources definition must be a dict/mapping") self._res_key = res_key self._xstack_class = x_class self._resource_class = resource_class self._stack = None self._path = posix_path self._mod_policies = {} self._json_schema = {} self.import_perms_definition() self.import_json_schema() self._resources: dict = {} self._definition: dict = {} self._original_definition: dict = {} self._mappings: dict = {} if definition: self.definition = definition self._original_definition = deepcopy(definition) self.module_deletion_policy: str = "Delete" def __del__(self): if hasattr(self, "_resources") and self._resources: self._resources.clear() @property def resource_class( self, ) -> ( XResource | ServicesXResource | ApiXResource | AwsEnvironmentResource | NetworkXResource | DatabaseXResource ): return self._resource_class @property def mappings(self) -> dict: _lookup_mappings: dict = {} for resource in self.lookup_resources: _lookup_mappings[resource.logical_name] = resource.mappings return _lookup_mappings @property def new_resources(self) -> list: """ Function to create a list of new resources. Check if empty resource is supported :return: list of resources to create :rtype: list[XResource] x_resources: """ new_resources = [] for resource in self.resources_list: if resource.lookup: continue if resource.uses_default and not resource.support_defaults: raise KeyError( f"{resource.module.res_key}.{resource.name} - " "Requires either or both Properties or MacroParameters. Got neither", resource.definition.keys(), ) else: new_resources.append(resource) return new_resources @property def lookup_resources(self) -> list: """ :return: list of resources to import from Lookup :rtype: list[XResource] x_resources: """ lookup_resources = [] for resource in self.resources_list: if resource.lookup: if resource.properties or resource.parameters: LOG.warning( f"{resource.module.res_key}.{resource.name} is set for Lookup" " but has other properties set. Voiding them" ) if resource.properties: resource.properties = {} lookup_resources.append(resource) return lookup_resources @property def resources( self, ) -> dict[ str, ( XResource | ServicesXResource | ApiXResource | AwsEnvironmentResource | NetworkXResource | DatabaseXResource ), ]: return self._resources @property def resources_list( self, ) -> list[ ( XResource | ServicesXResource | ApiXResource | AwsEnvironmentResource | NetworkXResource | DatabaseXResource ) ]: return [_res for _res in self._resources.values() if _res is not None] @property def definition(self) -> dict: return self._definition @definition.setter def definition(self, definition: dict): self._definition = definition @property def res_key(self): return self._res_key @property def mapping_key(self): return NONALPHANUM.sub("", re.sub(X_KEY, "", self._res_key)) @property def mod_key(self): return re.sub(X_KEY, "", self._res_key) @property def stack_class(self): return self._xstack_class @property def path(self): return str(self._path) @property def iam_policies(self) -> dict: if not self._mod_policies: self.import_perms_definition() sam_policies = import_and_cleanse_sam_policies() sam_policies.update(self._mod_policies) return sam_policies @property def json_schema(self): return self._json_schema def __repr__(self): return self.res_key
[docs] def import_perms_definition(self): perms_file_path = self._path.joinpath(f"{self.mod_key}_perms.json") try: with open(perms_file_path, encoding="utf-8-sig") as perms_fd: self._mod_policies = loads(perms_fd.read()) except OSError: pass
[docs] def import_json_schema(self): json_schema_file_path = self._path.joinpath(f"{self.res_key}.spec.json") try: with open(json_schema_file_path, encoding="utf-8-sig") as json_schema_fd: self._json_schema = loads(json_schema_fd.read()) except OSError: LOG.warning( f"{self.res_key} - JSON Schema not found for validation. Render may contain errors." ) pass
[docs] def set_resources(self, settings: ComposeXSettings): """ Method to define the ComposeXResource for each service. First updates the resources dict :param ecs_composex.common.settings.ComposeXSettings settings: """ if self._resources: warnings.warn("BEFORE SETTINGS RESOURCES, SOME WERE ALREADY FOUND") self._resources: dict = {} _resources = OrderedDict( sorted( settings.compose_content[self.res_key].items(), key=lambda item: item[0], ) ) if keyisset("DeletionPolicy", _resources): self.module_deletion_policy = _resources["DeletionPolicy"] del _resources["DeletionPolicy"] if not self._original_definition: self._original_definition = {self.res_key: dict(_resources)} for resource_name, resource_definition in _resources.items(): new_definition = self.resource_class( name=resource_name, definition=resource_definition, module=self, settings=settings, ) LOG.debug(type(new_definition)) LOG.debug(new_definition.__dict__) self.resources[resource_name] = new_definition
[docs]class ModManager: """ Class to manage the modules """ def __init__(self, settings: ComposeXSettings): self.modules = {} self.loaded_modules: list = [] for res_key, res_def in settings.compose_content.items(): if not res_def: continue self.load_module(res_key, res_def) def __del__(self): for module in self.modules.values(): if module: module.resources.clear() for module in self.loaded_modules: if module in sys.modules: del sys.modules[module] del module
[docs] def init_mods_resources(self, settings: ComposeXSettings): for module in self.modules.values(): if not module.resource_class or not isinstance( settings.compose_content[module.res_key], dict ): continue if module.definition: module.set_resources(settings) elif keyisset(module.res_key, settings.compose_content): module.definition = settings.compose_content[module.res_key] module.set_resources(settings)
[docs] def modules_repr(self): for key, module in self.modules.items(): print( "Loaded", module.res_key, module.mod_key, module.mapping_key, module.path, )
[docs] def import_resource_modules(self, res_key: str, module_path: str): py_module, mod_x_stack_modules = get_module(module_path) if mod_x_stack_modules: for module_res_key, module_def in mod_x_stack_modules.items(): self.modules[module_res_key] = module_def["Module"] for module_name, module in self.modules.items(): if module_name == res_key: self.loaded_modules.append(py_module) return module
[docs] def add_module_from_module_def(self, res_key: str, mod_key: str, module_name: str): module_path = f"{module_name}.{mod_key}_module" core_module = self.import_resource_modules(res_key, module_path) if core_module: return core_module module_name = f"ecs_composex_{mod_key}" extensions_modules_path = f"{module_name}.{mod_key}_module" extension_module = self.import_resource_modules( res_key, extensions_modules_path ) if extension_module: return extension_module
[docs] def load_module(self, res_key: str, res_def: dict | bool) -> XResourceModule | None: if not res_key.startswith(X_KEY): return mod_key = re.sub(X_KEY, "", res_key) module_name = f"ecs_composex.{mod_key}" module = self.add_module_from_module_def(res_key, mod_key, module_name) if not module: LOG.debug(f"{res_key} - Unable to load module definition") return if res_def and isinstance(res_def, dict): module.definition = res_def self.modules[res_key] = module return module
[docs]def get_module(module_name) -> tuple: """ Function to get the XResourceModule if it has been defined. :return: the_class, maps to the main class for the given x-module """ try: res_module = import_module(module_name) try: module = getattr(res_module, "COMPOSE_X_MODULES") return res_module, module except AttributeError: LOG.debug(f"No {module_name}.COMPOSE_X_MODULES found") except AttributeError as error: LOG.debug(error) return None, None except ImportError as error: LOG.debug(module_name) LOG.debug(error) return None, None