Source code for ecs_composex.kms.kms_s3
# SPDX-License-Identifier: MPL-2.0
# Copyright 2020-2022 John Mille <john@compose-x.io>
"""
Handle x-kms in S3 buckets
"""
from troposphere import Ref
from ..common.troposphere_tools import add_parameters
from .kms_params import KMS_KEY_ID
KEY = "KMSMasterKeyID"
[docs]def assign_kms_key_to_bucket(kms_key, bucket_rule, bucket_stack):
"""
Assigns the KMS Key pointer to the bucket property
:param ecs_composex.kms.kms_stack.KmsKey kms_key:
:param troposphere.s3.ServerSideEncryptionRule bucket_rule:
:param ecs_composex.s3.s3_stack.XStack bucket_stack:
:return:
"""
kms_key_id = kms_key.attributes_outputs[KMS_KEY_ID]
add_parameters(bucket_stack.stack_template, [kms_key_id["ImportParameter"]])
setattr(
bucket_rule.ServerSideEncryptionByDefault,
"KMSMasterKeyID",
Ref(kms_key_id["ImportParameter"]),
)
bucket_stack.Parameters.update(
{kms_key_id["ImportParameter"].title: kms_key_id["ImportValue"]}
)
setattr(bucket_rule.ServerSideEncryptionByDefault, "SSEAlgorithm", "aws:kms")
[docs]def handle_bucket_kms(kms_key, bucket, bucket_stack, settings):
"""
Goes over the properties of the bucket and if the KMSMasterKeyID points to the kms_key,
assigns the value accordingly in the template
:param ecs_composex.kms.kms_stack.KmsKey kms_key:
:param ecs_composex.s3.s3_bucket.Bucket bucket:
:param ecs_composex.s3.s3_stack.XStack bucket_stack:
:param ecs_composex.common.settings.ComposeXSettings settings: unused
:return:
"""
if not bucket.cfn_resource:
LOG.debug(
f"{bucket.module.res_key}.{bucket.name} - Not a new resource. Skipping"
)
return
if not hasattr(bucket.cfn_resource, "BucketEncryption"):
return
bucket_encryption = bucket.cfn_resource.BucketEncryption
if not hasattr(bucket_encryption, "ServerSideEncryptionConfiguration"):
return
sse_config = bucket_encryption.ServerSideEncryptionConfiguration
for rule in sse_config:
if (
hasattr(rule, "ServerSideEncryptionByDefault")
and hasattr(rule.ServerSideEncryptionByDefault, "KMSMasterKeyID")
and isinstance(rule.ServerSideEncryptionByDefault.KMSMasterKeyID, str)
):
key_parts = rule.ServerSideEncryptionByDefault.KMSMasterKeyID.split(
r"x-kms::"
)
if not key_parts or not key_parts[-1] == kms_key.name:
continue
assign_kms_key_to_bucket(kms_key, rule, bucket_stack)