Source code for ecs_composex.compose.compose_services.service_image

#  SPDX-License-Identifier: MPL-2.0
#  Copyright 2020-2022 John Mille <john@compose-x.io>


from __future__ import annotations

import copy
import re
from typing import TYPE_CHECKING, Union

from boto3.session import Session
from botocore.exceptions import ClientError

if TYPE_CHECKING:
    from ecs_composex.common.settings import ComposeXSettings
    from . import ComposeService

import docker
import requests
import urllib3
from compose_x_common.aws import get_session
from compose_x_common.aws.ecr import PRIVATE_ECR_URI_RE, PUBLIC_ECR_URI_RE
from compose_x_common.compose_x_common import keyisset
from troposphere import Ref

from ecs_composex.common.cfn_params import Parameter
from ecs_composex.common.logging import LOG

from .ecr_helpers import define_service_image, interpolate_ecr_uri_tag_with_digest


[docs]def get_image_from_ssm_parameter( ssm_parameter: Parameter, session: Session = None ) -> Union[str, None]: session = get_session(session) client = session.client("ssm") try: return client.get_parameter(Name=ssm_parameter.Default)["Parameter"]["Value"] except (client.exceptions.InvalidKeyId, client.exceptions.ParameterNotFound): pass except ClientError: pass return None
[docs]class ServiceImage: """ :ivar _image: :ivar str image_uri: """ def __init__(self, service: ComposeService, image_param: Parameter = None): if not keyisset("image", service.definition): raise KeyError(service.name, "You must define ``image``") self._service = service self._image = None self._image_digest = None self.image_uri = service.definition["image"] if not image_param: self._image_param = Parameter( f"{self.service.logical_name}ImageUrl", Type="String" ) self.image = service.definition["image"] else: self._image_param = image_param self.image = image_param @property def image(self) -> Union[str, Ref]: if isinstance(self._image, str): return self._image elif isinstance(self._image, Parameter): return Ref(self._image) @image.setter def image(self, value: Union[str, Parameter]): if isinstance(value, str): self._image = value elif isinstance(value, Parameter): self._image = self.image_param else: raise TypeError( self.service, "image must be one of", (str, Parameter), "Got", value, type(value), ) @property def image_param(self) -> Parameter: return self._image_param @property def service(self) -> ComposeService: return self._service @property def private_ecr(self) -> Union[re.Match, None]: return PRIVATE_ECR_URI_RE.match(self.image_uri) @property def public_ecr(self) -> Union[re.Match, None]: return PUBLIC_ECR_URI_RE.match(self.image_uri)
[docs] def private_ecr_digest(self, settings: ComposeXSettings): if not self.service.x_ecr: return service_image = define_service_image(self.service, settings) if not keyisset("imageDigest", service_image): LOG.info( f"{self.service.name} - Unable to find digest for {self.image_uri}" ) return if ( self.service.ecr_config and keyisset("InterpolateWithDigest", self.service.x_ecr) and keyisset("imageDigest", service_image) ): self.image_uri = interpolate_ecr_uri_tag_with_digest( self.image_uri, service_image["imageDigest"] ) LOG.info(f"Update service {self.service.name} image to {self.image_uri}") if self.service.family: self.service.family.stack.Parameters.update( {self.image_param.title: self.image_uri} ) else: self.service.definition["image"] = self.image_uri LOG.debug("ECR - ADDING IMAGE TAG TO LABELS") self.service.docker_labels.update( {"docker_image_tag": service_image["imageTag"]} )
[docs] def interpolate_image_digest(self, settings: ComposeXSettings = None): """ if service x-ecr is set, and image URI indicates a resolvable image, sets image_digest """ if ( settings and self.service.x_ecr and keyisset("InterpolateWithDigest", self.service.x_ecr) and self.private_ecr ): self.private_ecr_digest(settings) else: if keyisset("x-docker_opts", self.service.definition) and keyisset( "InterpolateWithDigest", self.service.definition["x-docker_opts"] ): try: import docker self.retrieve_image_digest() except ImportError: print("Unable to use docker to resolve image")
[docs] def retrieve_image_digest(self): """ Retrieves the docker images digest from the repository to use instead of the image tag. """ if isinstance(self.image, Ref): return valid_media_types = [ "application/vnd.oci.image.index.v1+json", "application/vnd.docker.distribution.manifest.v1+json", "application/vnd.docker.distribution.manifest.v2+json", "application/vnd.docker.distribution.manifest.v1+prettyjws", "application/vnd.docker.distribution.manifest.list.v2+json", ] try: original_image = self.image dkr_client = docker.APIClient() image_details = dkr_client.inspect_distribution(self.image) if not keyisset("Descriptor", image_details): raise KeyError(f"No information retrieved for {self.image}") except (docker.errors.APIError, docker.errors.DockerException) as error: LOG.error(f"Failed to retrieve the image digest for {self.image}: {error}") image_details = {} except (FileNotFoundError, urllib3.exceptions, requests.exceptions) as error: LOG.error("Failed to connect to any docker engine: {error}") image_details = {} if not image_details: LOG.warn( f"services.{self.service.name}: Failed to interpolate Docker image tag with digest" ) return details = image_details["Descriptor"] if ( keyisset("mediaType", details) and details["mediaType"] not in valid_media_types ): raise ValueError( "The mediaType is not valid. Got", details["mediaType"], "Expected one of", valid_media_types, ) if keyisset("digest", details): image_digest = details["digest"] self.image_uri = re.sub(r"(:.*$|@.*$)", f"@{image_digest}", self.image) LOG.info(f"Update service {self.service.name} image to {self.image_uri}") if self.service.family: self.service.family.stack.Parameters.update( {self.image_param.title: self.image_uri} ) else: self.service.definition["image"] = self.image_uri try: image_tag_re = re.compile(r"(?<=[:@])(?P<tag_digest>[\w.\-_]+$)") original_tag_digest = image_tag_re.search(original_image).group( "tag_digest" ) self.service.docker_labels.update( {"__meta_image_tag": original_tag_digest} ) except AttributeError: pass else: LOG.warning( "No digest found. This might be due to Registry API prior to V2" )