Source code for ecs_composex.sqs.sqs_ecs_scaling

# SPDX-License-Identifier: MPL-2.0
# Copyright 2020-2022 John Mille <john@compose-x.io>

"""
Module to apply SQS settings onto ECS Services
"""

from __future__ import annotations

from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from .sqs_stack import Queue
    from ecs_composex.common.settings import ComposeXSettings

from troposphere import FindInMap, GetAtt, Ref
from troposphere.cloudwatch import Alarm, MetricDimension

from ecs_composex.common.cfn_params import Parameter
from ecs_composex.common.logging import LOG
from ecs_composex.common.troposphere_tools import (
    add_outputs,
    add_parameters,
    add_resource,
    add_update_mapping,
)
from ecs_composex.ecs.service_scaling.helpers import (
    generate_alarm_scaling_out_policy,
    reset_to_zero_policy,
)
from ecs_composex.sqs.sqs_params import SQS_NAME


[docs]def handle_service_scaling(resource: Queue, settings: ComposeXSettings = None) -> None: """ Function to define and prepare settings for scaling rules based for SQS Queues discovered through lookup :param ecs_composex.compose.x_resources.ServicesXResource resource: :param ecs_composex.common. settings: :raises KeyError: if the service name is not a listed service in docker-compose. :return: """ queue_id = resource.attributes_outputs[SQS_NAME] if resource.cfn_resource: queue_pointer = queue_id["ImportParameter"] elif resource.mappings: queue_pointer = queue_id["ImportValue"] else: raise AttributeError( resource.module.res_key, resource.name, "no mapping nor new resource" ) for target in resource.families_scaling: if ( not target[0].service_scaling.scalable_target or target[0].service_scaling.scalable_target not in target[0].template.resources.values() ): LOG.warning( f"No Scalable target defined for {target[0].name}." " You need to define `scaling.scaling_range` in x-configs first. No scaling applied" ) return scaling_out_policy = generate_alarm_scaling_out_policy( target[0].logical_name, target[0].template, target[1], scaling_source=resource.logical_name, ) scaling_in_policy = reset_to_zero_policy( target[0].logical_name, target[0].template, target[1], scaling_source=resource.logical_name, ) if isinstance(queue_pointer, Parameter): add_parameters(target[0].template, [queue_pointer]) target[0].stack.Parameters.update( {queue_pointer.title: queue_id["ImportValue"]} ) add_alarm_for_resource( resource, target, scaling_out_policy, scaling_in_policy, Ref(queue_pointer), ) else: add_update_mapping( target[0].template, resource.module.mapping_key, resource.module.mappings, ) add_alarm_for_resource( resource, target, scaling_out_policy, scaling_in_policy, queue_pointer, )
[docs]def add_alarm_for_resource( resource, target, scaling_out_policy, scaling_in_policy, resource_parameter ): """ Function to add the Alarm for SQS resource to the service template :param ecs_composex.common.compose_resources.XResource resource: :param tuple target: :param scaling_out_policy: :param scaling_in_policy: :param resource_parameter: :return: """ alarm = Alarm( f"SqsScalingAlarm{resource.logical_name}To{target[0].logical_name}", ActionsEnabled=True, AlarmActions=[Ref(scaling_out_policy)], AlarmDescription=f"MessagesProcessingWatchFor{resource.logical_name}To{target[0].logical_name}", ComparisonOperator="GreaterThanOrEqualToThreshold", DatapointsToAlarm=1, Dimensions=[ MetricDimension(Name="QueueName", Value=resource_parameter), ], EvaluationPeriods=1, InsufficientDataActions=[Ref(scaling_in_policy)], MetricName="ApproximateNumberOfMessagesVisible", Namespace="AWS/SQS", OKActions=[Ref(scaling_in_policy)], Period="60", Statistic="Sum", TreatMissingData="notBreaching", Threshold=float( scaling_out_policy.StepScalingPolicyConfiguration.StepAdjustments[ 0 ].MetricIntervalLowerBound ) + 1, ) add_resource(target[0].template, alarm, True)