Source code for ecs_composex.kinesis_firehose.kinesis_firehose_template

# SPDX-License-Identifier: MPL-2.0
# Copyright 2020-2022 John Mille <john@compose-x.io>

from __future__ import annotations

from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from troposphere import Template
    from .kinesis_firehose_stack import DeliveryStream

from troposphere import NoValue, Sub
from troposphere.firehose import DeliveryStream as CfnDeliveryStream
from troposphere.logs import LogGroup

from ecs_composex.common.cfn_params import STACK_ID_SHORT
from ecs_composex.common.logging import LOG
from ecs_composex.resources_import import (
    get_dest_resource_nested_property,
    import_record_properties,
)

from ..common.troposphere_tools import add_outputs, add_resource, build_template
from .kinesis_firehose_iam_helpers import set_replace_iam_role
from .kinesis_firehose_logging_helpers import (
    grant_log_group_access,
    set_replace_cw_logging,
)


[docs]def values_validation(stream: DeliveryStream) -> None: """ Simple function to do values validation based on errors / limits encountered """ properties_to_values_mapping = { "ExtendedS3DestinationConfiguration::BufferingHints::IntervalInSeconds": ">= 60", "ExtendedS3DestinationConfiguration::BufferingHints::SizeInMBs": ">= 64", } for property_path, expression in properties_to_values_mapping.items(): prop_attr = get_dest_resource_nested_property( property_path, stream.cfn_resource ) if not prop_attr: continue value = getattr(prop_attr[0], prop_attr[1]) if not isinstance(value, (str, int, float)): LOG.debug( f"{stream.name} - Not evaluating property {prop_attr[1]} - {type(value)}" ) continue if isinstance(expression, str): eval_str = f"{value} {expression}" if not eval(eval_str): raise ValueError( stream.module.res_key, stream.name, "Property", property_path.replace(r"::", r"."), "is invalid", value, "must be", expression, ) elif callable(expression): expression(stream, prop_attr)
[docs]def create_new_stream(stream: DeliveryStream) -> None: """ Imports the settings from CFN Definitions and define the CFN Resource from properties :param DeliveryStream stream: """ props = import_record_properties( stream.properties, CfnDeliveryStream, ignore_missing_required=True, ignore_missing_sub_required=True, ) stream.cfn_resource = CfnDeliveryStream(stream.logical_name, **props) stream.log_group = LogGroup( f"{stream.logical_name}LogGroup", LogGroupName=Sub( f"firehose/${{STACK_ID}}/{stream.name}", STACK_ID=STACK_ID_SHORT ), ) if ( stream.cfn_resource.DeliveryStreamType == "KinesisStreamAsSource" and stream.cfn_resource.DeliveryStreamEncryptionConfigurationInput != NoValue ): LOG.error( f"{stream.module.res_key}.{stream.name} -" " You can only have ServerSide encryption with DirectPut DeliveryStream. Removing." ) stream.cfn_resource.DeliveryStreamEncryptionConfigurationInput = NoValue set_replace_iam_role(stream) values_validation(stream) stream.init_outputs() stream.generate_outputs()
[docs]def create_streams_template(new_resources: list[DeliveryStream]) -> Template: """ Function to create the root template for Firehose DeliveryStream :param list[DeliveryStream] new_resources: :return: root template """ root_template = build_template("Root stack for ecs_composex.kinesis_firehose") for res in new_resources: create_new_stream(res) add_resource(root_template, res.cfn_resource) add_resource(root_template, res.iam_manager.service_linked_role) add_resource(root_template, res.log_group) add_resource(root_template, grant_log_group_access(res)) set_replace_cw_logging(res, root_template) add_outputs(root_template, res.outputs) res.ensure_iam_policies_dependencies() return root_template