Source code for ecs_composex.compose.compose_volumes

#  SPDX-License-Identifier: MPL-2.0
#  Copyright 2020-2022 John Mille <john@compose-x.io>

# SPDX-License-Identifier: MPL-2.0
# Copyright 2020-2022 John Mille <john@compose-x.io>

"""
Class and functions to interact with the volumes: defined in compose files.
"""

from copy import deepcopy

from compose_x_common.compose_x_common import keyisset, set_else_none

from ecs_composex.common.logging import LOG
from ecs_composex.efs.efs_params import RES_KEY as EFS_KEY

from .helpers import evaluate_plugin_efs_properties


[docs]class ComposeVolume: """ Class to keep track of the Docker-compose Volumes When properties are defined, the priority in evaluation goes * x-efs * driver * driver_opts Assumed local when none else defined. """ main_key = "volumes" driver_key = "driver" driver_opts_key = "driver_opts" efs_defaults = { "Encrypted": True, "LifecyclePolicies": [{"TransitionToIA": "AFTER_14_DAYS"}], "PerformanceMode": "generalPurpose", } def __init__(self, name, definition): self.name = name self.volume_name = name self.autogenerated = False self.definition = deepcopy(definition) self.is_shared = False self.services = [] self.parameters = {} self.device = None self.cfn_volume = None self.efs_definition = {} self.use = {} self.lookup = {} self.type = "volume" self.driver = set_else_none("driver", definition, "local") self.driver_opts = set_else_none(self.driver_opts_key, definition, {}) self.external = False self.efs_definition = ( evaluate_plugin_efs_properties(self.definition, self.driver_opts_key) if self.driver == "efs" else {} ) if self.efs_definition: LOG.info( f"volumes.{self.name} - Identified properties as defined by Docker Plugin" ) self.type = "bind" self.driver = "nfs" else: self.import_volume_from_definition()
[docs] def import_volume_from_definition(self): efs_set = set_else_none(EFS_KEY, self.definition) driver = set_else_none(self.driver_key, self.definition) driver_opts = set_else_none(self.driver_opts_key, self.definition) if efs_set: self.efs_definition = self.definition[EFS_KEY] self.import_from_x_efs_settings() elif not efs_set and driver and not driver_opts: self.import_local_volume() else: if driver == "local": self.type = set_else_none("o", driver_opts, "volume") else: self.type = "volume" self.driver = "local" self.is_shared = False
[docs] def import_from_x_efs_settings(self): self.driver = "nfs" self.type = "bind" self.is_shared = True if keyisset("Lookup", self.efs_definition): self.lookup = self.efs_definition["Lookup"] else: self.efs_definition = ( self.definition[EFS_KEY]["Properties"] if keyisset("Properties", self.efs_definition) else self.efs_defaults ) self.parameters = ( self.definition[EFS_KEY]["MacroParameters"] if keyisset("MacroParameters", self.definition[EFS_KEY]) else {} )
[docs] def import_local_volume(self): if self.definition[self.driver_key] == "local": self.type = "volume" self.driver = "local" self.efs_definition = None elif self.definition[self.driver_key] in ["nfs", "efs"]: self.type = "bind" self.is_shared = True self.driver = "nfs"
def __repr__(self): return self.name