Source code for ecs_composex.ecs.ecs_firelens.ecs_firelens_advanced

#  SPDX-License-Identifier: MPL-2.0
#  Copyright 2020-2022 John Mille <john@compose-x.io>

"""
Handles ``Rendered`` section of the FireLens configuration
"""
from __future__ import annotations

import json
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from ecs_composex.common.settings import ComposeXSettings
    from ecs_composex.ecs.ecs_family import ComposeFamily

import copy
from os import environ, path

from compose_x_common.compose_x_common import keyisset, set_else_none
from jinja2 import Environment, FileSystemLoader
from troposphere import Ref, Region

from ecs_composex.common.logging import LOG
from ecs_composex.compose.compose_services.service_logging import ServiceLogging
from ecs_composex.compose.compose_volumes import ComposeVolume

from .advanced_firehose import FireLensFirehoseManagedDestination
from .advanced_kinesis import FireLensKinesisManagedDestination
from .config_parameter import add_managed_ssm_parameter
from .firelens_config_sidecar import FluentBitConfig, render_config_sidecar_config


[docs]class FireLensFamilyManagedConfiguration: volume_mount: str = "/compose_x_rendered/" volume_name: str = "compose-x-firelens-rendering" config_file_name: str = "firelens.conf" def __init__(self, family: ComposeFamily, settings: ComposeXSettings): self._config_volume = ComposeVolume(self.volume_name, {}) self._family = family _services_with_advanced = [] for _svc in family.services: if _svc.logging and _svc.logging.firelens_advanced: _services_with_advanced.append((_svc, _svc.logging.firelens_advanced)) self._ssm_parameter = None self._parser_files: dict = {} self.family.logging.firelens_config_service = FluentBitConfig( "log_router_preload", render_config_sidecar_config( family, self.volume_name, self.volume_mount, self.ssm_parameter_title ), family.logging.firelens_service, settings, shared_volume=self.config_volume, mount_path=self.volume_mount, ) self.family.logging.firelens_config_service.image.interpolate_image_digest( settings ) self.family.logging.firelens_config_service.logging = ServiceLogging( self.family.logging.firelens_config_service, { "awslogs-group": Ref(self.family.logging.family_log_group), "awslogs-region": Region, "awslogs-stream-prefix": self.family.logging.firelens_config_service.name, }, ) self.services_configs: dict = {} for service, definition in _services_with_advanced: self.services_configs[service] = FireLensServiceManagedConfiguration( service, definition, family, settings ) @property def family(self) -> ComposeFamily: return self._family @property def ssm_parameter_title(self) -> str: return f"{self.family.logical_name}FireLensConfigurationSsm" @property def config_volume(self) -> ComposeVolume: return self._config_volume @property def extra_env_vars(self) -> dict: _env_vars = {} for config in self.services_configs.values(): for key, value in config.extra_env_vars.items(): if key not in _env_vars: _env_vars[key] = value return _env_vars
[docs] def set_update_ssm_parameter(self, settings): """ Sets ssm parameter or updates content """ content: dict = { "files": { f"{self.volume_mount}{self.config_file_name}": { "content": self.rendered_content } } } content["files"].update(self.parser_files) if not self._ssm_parameter: self._ssm_parameter = add_managed_ssm_parameter( self.family, settings, content )
@property def parser_files(self) -> dict: self._parser_files = {} for _svc in self.services_configs.values(): for _parser_file, _parser_file_def in _svc.parser_files.items(): if not keyisset("content", _parser_file_def): continue file_path = f"{self.volume_mount}{_parser_file}" if file_path not in self._parser_files.keys(): self._parser_files[f"{self.volume_mount}{_parser_file}"] = ( _parser_file_def ) return self._parser_files @property def parser_files_names(self) -> list: files_names: list = [] for _svc in self.services_configs.values(): for _parser_file, _parser_file_def in _svc.parser_files.items(): if keyisset("content", _parser_file_def): file_path = f"{self.volume_mount}{_parser_file}" self._parser_files.update({file_path: _parser_file_def}) else: file_path = _parser_file if file_path not in files_names: files_names.append(file_path) return files_names @property def rendered_content(self): here = path.abspath(path.dirname(__file__)) jinja_env = Environment( loader=FileSystemLoader(here), ) template = jinja_env.get_template("family_fluentbit_managed_config.j2") content = template.render( env=environ, enable_health_check=self.family.logging.api_health_enabled, grace_period=self.family.logging.grace_period, services_content=[ _svc.render_jinja_config_file() for _svc in self.services_configs.values() ], parser_files=self.parser_files_names, ) return content
[docs]class FireLensServiceManagedConfiguration: def __init__(self, service, definition: dict, family, settings: ComposeXSettings): self.service = service self._definition = copy.deepcopy(definition) self.family = family self.source_file = set_else_none("SourceFile", self.definition) self._parser_files = set_else_none("ParserFiles", self.definition, alt_value=[]) self._env_vars = set_else_none("EnvironmentVariables", self.definition) self.firelens_config: dict = set_else_none( "FirelensConfiguration", self.definition, alt_value={} ) self.managed_destinations = [] self.extra_env_vars = set_else_none( "EnvironmentVariables", self.definition, alt_value={} ) if keyisset("ComposeXManagedAwsDestinations", self.definition): for destination_definition in self.definition[ "ComposeXManagedAwsDestinations" ]: if keyisset("log_group_name", destination_definition): self.managed_destinations.append( FireLensCloudWatchManagedDestination( destination_definition, self, settings ) ) elif keyisset("delivery_stream", destination_definition): self.managed_destinations.append( FireLensFirehoseManagedDestination( destination_definition, self, settings ) ) elif keyisset("stream", destination_definition): self.managed_destinations.append( FireLensKinesisManagedDestination( destination_definition, self, settings ) ) else: LOG.error("Invalid definition for ComposeXManagedAwsDestinations") LOG.error(destination_definition) @property def definition(self): return self._definition @property def managed_firehose_destinations(self): managed_firehose: list = [] for _managed_dest in self.managed_destinations: if isinstance(_managed_dest, FireLensFirehoseManagedDestination): managed_firehose.append(_managed_dest) return managed_firehose @property def managed_data_streams_destinations(self): managed_data_stream: list = [] for _managed_dest in self.managed_destinations: if isinstance(_managed_dest, FireLensKinesisManagedDestination): managed_data_stream.append(_managed_dest) return managed_data_stream @property def parser_files(self) -> dict: files: dict = {} for _file in self._parser_files: file_name = path.basename(_file) try: with open(_file) as file_fd: content = file_fd.read() files[file_name]: dict = {"content": content} except OSError: LOG.warning( f"{self.family.name} - FireLens Advanced - {_file} not found." " Assuming it already is in the image" ) files[_file]: dict = {} return files @property def source_file_content(self) -> str: if not self.source_file: return "" with open(path.abspath(self.source_file)) as config_fd: content = config_fd.read() return content
[docs] def render_jinja_config_file(self): here = path.abspath(path.dirname(__file__)) jinja_env = Environment( loader=FileSystemLoader(here), ) template = jinja_env.get_template("service_fluentbit_managed_config.j2") content = template.render( env=environ, firelens_firehose_destinations=self.managed_firehose_destinations, firelens_data_streams_destinations=self.managed_data_streams_destinations, source_file=self.source_file_content, service_match=f"{self.service.name}-firelens*", ) LOG.debug(content) return content
[docs]class FireLensCloudWatchManagedDestination: def __init__( self, definition: dict, advanced_config: FireLensServiceManagedConfiguration, settings: ComposeXSettings, ): self._definition = definition self.output_definition = None self.parent = advanced_config @property def log_group_name(self) -> str: return self._definition["log_group_name"]