Source code for ecs_composex.kinesis.kinesis_kinesis_firehose
# SPDX-License-Identifier: MPL-2.0
# Copyright 2020-2022 John Mille <john@compose-x.io>
"""
Updates x-kinesis_firehose fields and properties, IAM policies for Firehose::DeliveryStream
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from .kinesis_params import STREAM_ARN
if TYPE_CHECKING:
from ecs_composex.common.settings import ComposeXSettings
from troposphere import Ref
from ecs_composex.common.logging import LOG
from ecs_composex.iam.import_sam_policies import get_access_types
from ecs_composex.resource_settings import map_x_resource_perms_to_resource
from ecs_composex.resources_import import get_dest_resource_nested_property, skip_if
from ..common.troposphere_tools import add_parameters, add_update_mapping
FIREHOSE_PROPERTIES = {"KinesisStreamSourceConfiguration::KinesisStreamARN": STREAM_ARN}
[docs]def kinesis_to_firehose(
resource, dest_resource, dest_resource_stack, settings: ComposeXSettings
) -> None:
"""
Updates
:param resource:
:param dest_resource:
:param dest_resource_stack:
:param settings:
:return:
"""
if not dest_resource.cfn_resource:
LOG.error(
f"{dest_resource.module.res_key}.{dest_resource.name} - Not a new resource"
)
for prop_path, resource_param in FIREHOSE_PROPERTIES.items():
prop_attr = get_dest_resource_nested_property(
prop_path, dest_resource.cfn_resource
)
if skip_if(resource, prop_attr):
continue
resource_id = resource.attributes_outputs[resource_param]
if resource.cfn_resource:
add_parameters(
dest_resource_stack.stack_template, [resource_id["ImportParameter"]]
)
setattr(
prop_attr[0],
prop_attr[1],
Ref(resource_id["ImportParameter"]),
)
dest_resource.stack.Parameters.update(
{resource_id["ImportParameter"].title: resource_id["ImportValue"]}
)
arn_pointer = Ref(resource_id["ImportParameter"])
elif not resource.cfn_resource and resource.mappings:
add_update_mapping(
dest_resource.stack.stack_template,
resource.module.mapping_key,
settings.mappings[resource.module.mapping_key],
)
setattr(prop_attr[0], prop_attr[1], resource_id["ImportValue"])
arn_pointer = resource_id["ImportValue"]
else:
raise ValueError("Unable to determine if the KMS Key is new or lookup")
map_x_resource_perms_to_resource(
dest_resource,
arn_value=arn_pointer,
access_definition="kinesisSource",
access_subkey="kinesis_firehose",
resource_policies=get_access_types(resource.module.mod_key),
resource_mapping_key=resource.module.mapping_key,
)
dest_resource.ensure_iam_policies_dependencies()