Source code for ecs_composex.compose.compose_volumes
# SPDX-License-Identifier: MPL-2.0
# Copyright 2020-2022 John Mille <john@compose-x.io>
# SPDX-License-Identifier: MPL-2.0
# Copyright 2020-2022 John Mille <john@compose-x.io>
"""
Class and functions to interact with the volumes: defined in compose files.
"""
from copy import deepcopy
from compose_x_common.compose_x_common import keyisset, set_else_none
from ecs_composex.common.logging import LOG
from ecs_composex.efs.efs_params import RES_KEY as EFS_KEY
from .helpers import evaluate_plugin_efs_properties
[docs]class ComposeVolume:
"""
Class to keep track of the Docker-compose Volumes
When properties are defined, the priority in evaluation goes
* x-efs
* driver
* driver_opts
Assumed local when none else defined.
"""
main_key = "volumes"
driver_key = "driver"
driver_opts_key = "driver_opts"
efs_defaults = {
"Encrypted": True,
"LifecyclePolicies": [{"TransitionToIA": "AFTER_14_DAYS"}],
"PerformanceMode": "generalPurpose",
}
def __init__(self, name, definition):
self.name = name
self.volume_name = name
self.autogenerated = False
self.definition = deepcopy(definition)
self.is_shared = False
self.services = []
self.parameters = {}
self.device = None
self.cfn_volume = None
self.efs_definition = {}
self.use = {}
self.lookup = {}
self.type = "volume"
self.driver = set_else_none("driver", definition, "local")
self.driver_opts = set_else_none(self.driver_opts_key, definition, {})
self.external = False
self.efs_definition = (
evaluate_plugin_efs_properties(self.definition, self.driver_opts_key)
if self.driver == "efs"
else {}
)
if self.efs_definition:
LOG.info(
f"volumes.{self.name} - Identified properties as defined by Docker Plugin"
)
self.type = "bind"
self.driver = "nfs"
else:
self.import_volume_from_definition()
[docs] def import_volume_from_definition(self):
efs_set = set_else_none(EFS_KEY, self.definition)
driver = set_else_none(self.driver_key, self.definition)
driver_opts = set_else_none(self.driver_opts_key, self.definition)
if efs_set:
self.efs_definition = self.definition[EFS_KEY]
self.import_from_x_efs_settings()
elif not efs_set and driver and not driver_opts:
self.import_local_volume()
else:
if driver == "local":
self.type = set_else_none("o", driver_opts, "volume")
else:
self.type = "volume"
self.driver = "local"
self.is_shared = False
[docs] def import_from_x_efs_settings(self):
self.driver = "nfs"
self.type = "bind"
self.is_shared = True
if keyisset("Lookup", self.efs_definition):
self.lookup = self.efs_definition["Lookup"]
else:
self.efs_definition = (
self.definition[EFS_KEY]["Properties"]
if keyisset("Properties", self.efs_definition)
else self.efs_defaults
)
self.parameters = (
self.definition[EFS_KEY]["MacroParameters"]
if keyisset("MacroParameters", self.definition[EFS_KEY])
else {}
)
[docs] def import_local_volume(self):
if self.definition[self.driver_key] == "local":
self.type = "volume"
self.driver = "local"
self.efs_definition = None
elif self.definition[self.driver_key] in ["nfs", "efs"]:
self.type = "bind"
self.is_shared = True
self.driver = "nfs"
def __repr__(self):
return self.name