Source code for ecs_composex.sqs.sqs_sqs

#  SPDX-License-Identifier: MPL-2.0
#  Copyright 2020-2022 John Mille <john@compose-x.io>

"""
File to manage sqs to sqs dependency for dead letter queue
"""

from __future__ import annotations

from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from ecs_composex.common.settings import ComposeXSettings

    from .sqs_stack import Queue

from troposphere import GetAtt, Ref

from ecs_composex.sqs.sqs_params import SQS_ARN

from ..common.troposphere_tools import add_parameters, add_update_mapping


[docs]def update_target_queue_pointer( queue: Queue, source_queue: Queue, settings: ComposeXSettings ) -> None: """ Updates the ``deadLetterTargetArn`` of redrive policy with the appropriate Queue ARN """ queue_id = queue.attributes_outputs[SQS_ARN] if queue.stack != source_queue.stack and queue.cfn_resource: add_parameters(source_queue.stack.stack_template, [queue_id["ImportParameter"]]) source_queue.stack.Parameters.update( {queue_id["ImportParameter"].title: queue_id["ImportValue"]} ) setattr( source_queue.cfn_resource.RedrivePolicy, "deadLetterTargetArn", Ref(queue_id["ImportParameter"]), ) elif queue.stack == source_queue.stack: setattr( source_queue.cfn_resource.RedrivePolicy, "deadLetterTargetArn", GetAtt(queue.cfn_resource, SQS_ARN.return_value), ) else: add_update_mapping( source_queue.stack.stack_template, queue.module.mapping_key, settings.mappings[queue.module.mapping_key], ) setattr( source_queue.cfn_resource.RedrivePolicy, "deadLetterTargetArn", queue_id["ImportValue"], )
[docs]def sqs_to_sqs(queue: Queue, source_queue: Queue, settings: ComposeXSettings) -> None: """ :param queue: :param source_queue: The queue with redrive policy :param settings: """ if not source_queue.cfn_resource: return redrive_policy = ( getattr(source_queue.cfn_resource, "RedrivePolicy") if hasattr(source_queue.cfn_resource, "RedrivePolicy") else None ) if not redrive_policy: return if not hasattr(redrive_policy, "deadLetterTargetArn"): raise AttributeError( source_queue.module.res_key, source_queue.name, "RedrivePolicy does not have deadLetterTargetArn defined", ) if not isinstance(redrive_policy.deadLetterTargetArn, str) or ( isinstance(redrive_policy.deadLetterTargetArn, str) and not redrive_policy.deadLetterTargetArn.startswith(queue.module.res_key) ): return target_queue_name = redrive_policy.deadLetterTargetArn.split( f"{queue.module.res_key}::" )[-1] if target_queue_name == queue.name: update_target_queue_pointer(queue, source_queue, settings)