Source code for ecs_composex.common.compose_networks

#  -*- coding: utf-8 -*-
# SPDX-License-Identifier: MPL-2.0
# Copyright 2020-2021 John Mille <john@compose-x.io>

"""
Class and functions to interact with the networks: defined in compose files.
"""

from compose_x_common.compose_x_common import keyisset

from ecs_composex.common import LOG


[docs]def match_networks_services_config(service, net_config, networks): """ Function to map network config in services and top-level networks :param service: :param dict net_config: :param list networks: :raises LookupError: """ for network in networks: if network.name == net_config["source"]: network.services.append(service) net_config["network"] = network service.networks.append(net_config) LOG.info(f"Mapped {network.name} to {service.name}") return raise LookupError( f"Volume {net_config['source']} was not found in {[vol.name for vol in networks]}" )
[docs]class ComposeNetwork(object): """ Class to keep track of the Docker-compose Volumes """ main_key = "networks" driver_opts_key = "driver" def __init__(self, name, definition, subnets_list): self.name = name self.subnet_name = name if keyisset("name", definition): self.subnet_name = definition["name"] elif ( not keyisset("name", definition) and keyisset("x-vpc", definition) and isinstance(definition["x-vpc"], str) ): self.subnet_name = definition["x-vpc"] subnet_names = [subnet.title for subnet in subnets_list] if self.subnet_name not in subnet_names: raise KeyError( f"No subnet {self.name} defined. Valid options are", subnet_names, )