Source code for ecs_composex.ecs.ecs_firelens.helpers.firehose_helpers
# SPDX-License-Identifier: MPL-2.0
# Copyright 2020-2022 John Mille <john@compose-x.io>
"""
Module to help with common FireLens + FireHose configuration and settings
"""
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ecs_composex.ecs.ecs_family import ComposeFamily
from ecs_composex.compose.compose_services import ComposeService
from ecs_composex.common.settings import ComposeXSettings
from compose_x_common.aws.kinesis import KINESIS_FIREHOSE_ARN_RE
from compose_x_common.compose_x_common import keyisset
from troposphere import FindInMap, Ref, Region
from ecs_composex.common.logging import LOG
from ecs_composex.kinesis_firehose.kinesis_firehose_params import (
FIREHOSE_ARN,
FIREHOSE_ID,
)
from ecs_composex.kinesis_firehose.kinesis_firehose_stack import DeliveryStream
[docs]def set_add_family_to_firehose(
resource: DeliveryStream, family: ComposeFamily, settings: ComposeXSettings
):
for target in resource.families_targets:
if target[0] == family:
LOG.info(
f"{family.name} is already a target of {resource.module.res_key}.{resource.name}"
)
break
else:
LOG.info(
f"{family.name} - adding as target to {resource.module.res_key}.{resource.name}"
)
if not resource.attributes_outputs:
resource.init_outputs()
resource.generate_outputs()
family_target = (family, True, family.services, "Producer")
resource.families_targets.append(family_target)
resource.to_ecs(
settings,
settings.mod_manager,
settings.root_stack,
targets_overrides=[family_target],
)
[docs]def add_firehose_delivery_stream_for_firelens(
firehose_stream: DeliveryStream,
env_vars: dict,
family: ComposeFamily,
settings: ComposeXSettings,
):
if not isinstance(firehose_stream, DeliveryStream):
raise TypeError(
"firehose_stream must be", DeliveryStream, "Got", type(firehose_stream)
)
set_add_family_to_firehose(firehose_stream, family, settings)
firehose_stream.add_parameter_to_family_stack(family, settings, FIREHOSE_ID)
if firehose_stream.cfn_resource:
firehose_pointer = Ref(
firehose_stream.attributes_outputs[FIREHOSE_ID]["ImportParameter"]
)
env_vars.update({firehose_stream.env_var_prefix: firehose_pointer})
else:
firehose_pointer = firehose_stream.attributes_outputs[FIREHOSE_ID][
"ImportValue"
]
env_vars.update({firehose_stream.env_var_prefix: firehose_pointer})
return firehose_pointer
[docs]def handle_x_kinesis_firehose(
family: ComposeFamily,
service: ComposeService,
settings: ComposeXSettings,
parameter_name: str,
config_value: str,
):
"""
Detects if delivery_stream is x-kinesis_firehose and interpolates the stream name
:param family:
:param service:
:param parameter_name:
:param config_value:
:param settings:
:return: The pointer to kinesis stream
"""
if not config_value.startswith("x-kinesis_firehose::"):
return config_value
delivery_stream = settings.find_resource(config_value)
pointer = add_firehose_delivery_stream_for_firelens(
delivery_stream, {}, family, settings
)
if not keyisset("region", service.logging.log_options):
if isinstance(pointer, Ref):
service.logging.log_options.update({"region": Region})
elif isinstance(pointer, FindInMap):
_arn = delivery_stream.mappings[FIREHOSE_ARN.title]
service.logging.log_options.update(
{"region": KINESIS_FIREHOSE_ARN_RE.match(_arn).group("region")}
)
return pointer