Source code for ecs_composex.sqs.sqs_template

# SPDX-License-Identifier: MPL-2.0
# Copyright 2020-2022 John Mille <john@compose-x.io>

from __future__ import annotations

from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from ecs_composex.common.settings import ComposeXSettings
    from troposphere import Template
    from ecs_composex.common.settings import ComposeXSettings
    from ecs_composex.common.stacks import ComposeXStack
    from .sqs_stack import Queue

from itertools import chain

import boto3.session
from compose_x_common.compose_x_common import keyisset, set_else_none
from troposphere import MAX_OUTPUTS, AccountId, GetAtt, Ref, Sub
from troposphere.sqs import Queue as CfnQueue
from troposphere.sqs import QueuePolicy

from ecs_composex.common.logging import LOG
from ecs_composex.common.stacks import ComposeXStack
from ecs_composex.resources_import import import_record_properties
from ecs_composex.sqs.sqs_params import SQS_ARN

from ..common.troposphere_tools import add_outputs, add_resource, build_template


[docs]def add_queue_default_policy(queue: Queue): queue_policy_title = f"{queue.logical_name}Policy" queue.resource_policy = QueuePolicy( queue_policy_title, PolicyDocument={ "Version": "2008-10-17", "Id": "__default_policy", "Statement": [ { "Sid": "__owner_statement", "Effect": "Allow", "Principal": { "AWS": Sub("arn:${AWS::Partition}:iam::${AWS::AccountId}:root") }, "Action": "sqs:*", "Resource": GetAtt(queue.cfn_resource, SQS_ARN.return_value), } ], }, Queues=[Ref(queue.cfn_resource)], ) add_resource(queue.stack.stack_template, queue.resource_policy)
[docs]def define_queue_properties(queue): """ Function to parse the queue definition and generate the queue accordingly. Created the redrive policy if necessary :param ecs_composex.common.compose_resources.Queue queue: name of the queue :return: queue :rtype: troposphere.sqs.Queue """ name = set_else_none("QueueName", queue.properties) if keyisset("FifoQueue", queue.properties) and name and not name.endswith(".fifo"): queue.properties["QueueName"] = f"${name}.fifo" LOG.warning( f"QueueName was defined and FifoQueue set to true, but queue name was invalid. " f"Corrected to {queue.properties['QueueName']}" ) props = import_record_properties(queue.properties, CfnQueue) queue.cfn_resource = CfnQueue(queue.logical_name, **props) LOG.debug(queue.cfn_resource.title, queue.logical_name)
[docs]def add_aws_services_queue_policy(queue: Queue): if not queue.resource_policy: return aws_services = set_else_none("AwsPrincipalsAccess", queue.parameters) publish_consume = set_else_none("PublishConsume", aws_services, alt_value=[]) consume = [ _elem for _elem in set_else_none("Consume", aws_services, alt_value=[]) if _elem not in publish_consume ] publish = [ _elem for _elem in set_else_none("Publish", aws_services, alt_value=[]) if _elem not in publish_consume ] for service in chain(consume, publish, publish_consume): if service not in boto3.session.Session().get_available_services(): raise ValueError( queue.module.res_key, queue.name, f"Service {service} is not valid. Valid services", boto3.session.Session().get_available_services(), ) same_account_condition: dict = {"StringEquals": {"aws:SourceAccount": AccountId}} if consume: consume_statement = { "Sid": "__aws_services_consume", "Effect": "Allow", "Principal": { "Service": [ Sub(f"{_aws_service}.${{AWS::URLSuffix}}") for _aws_service in consume ] }, "NotAction": [ "sqs:TagQueue", "sqs:RemovePermission", "sqs:AddPermission", "sqs:UntagQueue", "sqs:PurgeQueue", "sqs:DeleteQueue", "sqs:CreateQueue", "sqs:SetQueueAttributes", "sqs:SendMessage", ], "Resource": GetAtt(queue.cfn_resource, SQS_ARN.return_value), "Condition": same_account_condition, } queue.resource_policy.PolicyDocument["Statement"].append(consume_statement) if publish: publish_statement = { "Sid": "__aws_services_publish", "Effect": "Allow", "Principal": { "Service": [ Sub(f"{_aws_service}.${{AWS::URLSuffix}}") for _aws_service in publish ] }, "Action": ["sqs:SendMessage"], "Resource": GetAtt(queue.cfn_resource, SQS_ARN.return_value), "Condition": same_account_condition, } queue.resource_policy.PolicyDocument["Statement"].append(publish_statement) if publish_consume: publish_consume_statement = { "Sid": "__aws_services_publish_consume", "Effect": "Allow", "Principal": { "Service": [ Sub(f"{_aws_service}.${{AWS::URLSuffix}}") for _aws_service in publish_consume ] }, "NotAction": [ "sqs:TagQueue", "sqs:RemovePermission", "sqs:AddPermission", "sqs:UntagQueue", "sqs:PurgeQueue", "sqs:DeleteQueue", "sqs:CreateQueue", "sqs:SetQueueAttributes", ], "Resource": GetAtt(queue.cfn_resource, SQS_ARN.return_value), "Condition": same_account_condition, } queue.resource_policy.PolicyDocument["Statement"].append( publish_consume_statement )
[docs]def render_new_queues( settings: ComposeXSettings, new_queues: list[Queue], xstack: ComposeXStack, template: Template, ): """ Sets the new SQS Queue properties. In case there are so many queues that there would not be enough outputs, split into 1 stack per queue :param settings: :param new_queues: :param xstack: :param template: """ mono_template = False output_per_resource = 3 if (len(new_queues) * output_per_resource) <= MAX_OUTPUTS: mono_template = True for queue in new_queues: queue.stack = xstack define_queue_properties(queue) if not queue.cfn_resource: continue queue.init_outputs() queue.generate_outputs() if mono_template: the_template = template queue.stack = xstack else: the_template = build_template( f"Template for SQS queue {queue.cfn_resource.title}", ) queue.stack = ComposeXStack(queue.logical_name, stack_template=the_template) add_resource(template, queue.stack) add_resource(the_template, queue.cfn_resource) add_outputs(the_template, queue.outputs) add_queue_default_policy(queue) if queue.parameters and keyisset("AwsPrincipalsAccess", queue.parameters): add_aws_services_queue_policy(queue)