Source code for ecs_composex.s3.s3_kinesis_firehose
# SPDX-License-Identifier: MPL-2.0
# Copyright 2020-2022 John Mille <john@compose-x.io>
"""
Updates x-kinesis_firehose fields and properties, IAM policies for Firehose::DeliveryStream
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from .s3_params import S3_BUCKET_ARN
if TYPE_CHECKING:
from ecs_composex.common.settings import ComposeXSettings
from .s3_bucket import Bucket
from ecs_composex.kinesis_firehose.kinesis_firehose_stack import DeliveryStream
from troposphere import Ref
from ecs_composex.common.logging import LOG
from ecs_composex.iam.import_sam_policies import get_access_types
from ecs_composex.resource_settings import map_x_resource_perms_to_resource
from ecs_composex.resources_import import get_dest_resource_nested_property, skip_if
from ..common.troposphere_tools import add_parameters, add_update_mapping
FIREHOSE_PROPERTIES = {
"S3DestinationConfiguration::BucketARN": S3_BUCKET_ARN,
"ExtendedS3DestinationConfiguration::BucketARN": S3_BUCKET_ARN,
"ExtendedS3DestinationConfiguration::S3BackupConfiguration::BucketARN": S3_BUCKET_ARN,
"RedshiftDestinationConfiguration::S3BackupConfiguration::BucketARN": S3_BUCKET_ARN,
"ElasticsearchDestinationConfiguration::S3BackupConfiguration::BucketARN": S3_BUCKET_ARN,
"AmazonopensearchserviceDestinationConfiguration::S3BackupConfiguration::BucketARN": S3_BUCKET_ARN,
"SplunkDestinationConfiguration::S3BackupConfiguration::BucketARN": S3_BUCKET_ARN,
"HttpEndpointDestinationConfiguration::S3BackupConfiguration::BucketARN": S3_BUCKET_ARN,
}
[docs]def s3_to_firehose(
resource: Bucket,
dest_resource: DeliveryStream,
dest_resource_stack,
settings: ComposeXSettings,
) -> None:
"""
Updates
:param Bucket resource:
:param DeliveryStream dest_resource:
:param dest_resource_stack:
:param settings:
:return:
"""
if not dest_resource.cfn_resource:
LOG.error(
f"{dest_resource.module.res_key}.{dest_resource.name} - Not a new resource"
)
for prop_path, bucket_param in FIREHOSE_PROPERTIES.items():
prop_attr = get_dest_resource_nested_property(
prop_path, dest_resource.cfn_resource
)
if skip_if(resource, prop_attr):
continue
bucket_id = resource.attributes_outputs[bucket_param]
if resource.cfn_resource:
add_parameters(
dest_resource_stack.stack_template, [bucket_id["ImportParameter"]]
)
setattr(
prop_attr[0],
prop_attr[1],
Ref(bucket_id["ImportParameter"]),
)
dest_resource.stack.Parameters.update(
{bucket_id["ImportParameter"].title: bucket_id["ImportValue"]}
)
arn_pointer = Ref(bucket_id["ImportParameter"])
elif not resource.cfn_resource and resource.mappings:
add_update_mapping(
dest_resource.stack.stack_template,
resource.module.mapping_key,
settings.mappings[resource.module.mapping_key],
)
setattr(prop_attr[0], prop_attr[1], bucket_id["ImportValue"])
arn_pointer = bucket_id["ImportValue"]
else:
raise ValueError(
resource.module.mapping_key,
resource.name,
"Unable to determine if new or lookup",
)
map_x_resource_perms_to_resource(
dest_resource,
arn_value=arn_pointer,
access_definition="s3destination",
access_subkey="kinesis_firehose",
resource_policies=get_access_types(resource.module.mod_key),
resource_mapping_key=resource.module.mapping_key,
)
dest_resource.ensure_iam_policies_dependencies()