HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/api_lib/container/images/util.py
# -*- coding: utf-8 -*- #
# Copyright 2016 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilities for the container images commands."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

from contextlib import contextmanager
import re

from containerregistry.client import docker_creds
from containerregistry.client import docker_name
# We use distinct versions of the library for v2 and v2.2 because
# the schema of the JSON data returned is fairly different, and
# images addressed by digest must be accessed via the API version
# corresponding to how they are stored.
from containerregistry.client.v2 import docker_http as v2_docker_http
from containerregistry.client.v2 import docker_image as v2_image
from containerregistry.client.v2_2 import docker_http as v2_2_docker_http
from containerregistry.client.v2_2 import docker_image as v2_2_image
from containerregistry.client.v2_2 import docker_image_list
from googlecloudsdk.api_lib.container.images import container_analysis_data_util
from googlecloudsdk.api_lib.containeranalysis import filter_util
from googlecloudsdk.api_lib.containeranalysis import requests
from googlecloudsdk.api_lib.util import apis
from googlecloudsdk.core import exceptions
from googlecloudsdk.core import log
from googlecloudsdk.core import resources
from googlecloudsdk.core import transports
from googlecloudsdk.core.credentials import store as c_store
from googlecloudsdk.core.docker import constants
from googlecloudsdk.core.docker import docker
from googlecloudsdk.core.util import times
import six
from six.moves import map
import six.moves.http_client


class UtilError(exceptions.Error):
  """Base class for util errors."""


class InvalidImageNameError(UtilError):
  """Raised when the user supplies an invalid image name."""


class UserRecoverableV2Error(UtilError):
  """Raised when a user-recoverable V2 API error is encountered."""


class TokenRefreshError(UtilError):
  """Raised when there's an error refreshing tokens."""


def IsFullySpecified(image_name):
  return ':' in image_name or '@' in image_name


def IsInvalidRegistry(registry):
  ar_pattern = '^([a-z0-9-]*)-docker.pkg.dev'
  ar_rep_pattern = 'docker.([a-z0-9-]*).rep.pkg.dev'
  gcr_pattern = '^([a-z0-9-]*)[.]?gcr.io'
  ar_prog = re.compile(ar_pattern)
  ar_rep_prog = re.compile(ar_rep_pattern)
  gcr_prog = re.compile(gcr_pattern)
  return (
      gcr_prog.match(registry) is None
      and ar_prog.match(registry) is None
      and ar_rep_prog.match(registry) is None
  )


def ValidateRepositoryPath(repository_path):
  """Validates the repository path.

  Args:
    repository_path: str, The repository path supplied by a user.

  Returns:
    The parsed docker_name.Repository object.

  Raises:
    InvalidImageNameError: If the image name is invalid.
    docker.UnsupportedRegistryError: If the path is valid, but belongs to a
      registry we don't support.
  """
  if IsFullySpecified(repository_path):
    raise InvalidImageNameError(
        'Image names must not be fully-qualified. Remove the tag or digest '
        'and try again.')
  if repository_path.endswith('/'):
    raise InvalidImageNameError('Image name cannot end with \'/\'. '
                                'Remove the trailing \'/\' and try again.')

  try:
    if repository_path in constants.MIRROR_REGISTRIES:
      repository = docker_name.Registry(repository_path)
    else:
      repository = docker_name.Repository(repository_path)
    if IsInvalidRegistry(repository.registry):
      raise docker.UnsupportedRegistryError(repository_path)
    return repository
  except docker_name.BadNameException as e:
    # Reraise with the proper base class so the message gets shown.
    raise InvalidImageNameError(six.text_type(e))


class CredentialProvider(docker_creds.Basic):
  """CredentialProvider is a class to refresh oauth2 creds during requests."""

  _USERNAME = '_token'

  def __init__(self):
    super(CredentialProvider, self).__init__(self._USERNAME, 'does not matter')

  @property
  def password(self):
    return c_store.GetAccessTokenIfEnabled()


def _TimeCreatedToDateTime(time_created_ms):
  # Convert to float.
  timestamp = float(time_created_ms)
  # Round the timestamp to whole seconds.
  timestamp = round(timestamp / 1000)
  try:
    return times.GetDateTimeFromTimeStamp(timestamp)
  except (ArithmeticError, times.DateTimeValueError):
    # Values like -62135596800000 have been observed, causing underflows.
    return None


def RecoverProjectId(repository):
  """Recovers the project-id from a GCR repository."""
  if repository.registry in constants.MIRROR_REGISTRIES:
    return constants.MIRROR_PROJECT
  if repository.registry in constants.LAUNCHER_REGISTRIES:
    return constants.LAUNCHER_PROJECT
  parts = repository.repository.split('/')
  if '.' not in parts[0]:
    return parts[0]
  elif len(parts) > 1:
    return parts[0] + ':' + parts[1]
  else:
    raise ValueError('Domain-scoped app missing project name: %s', parts[0])


def _UnqualifiedResourceUrl(repo):
  return 'https://{repo}@'.format(repo=six.text_type(repo))


def _ResourceUrl(repo, digest):
  return 'https://{repo}@{digest}'.format(
      repo=six.text_type(repo), digest=digest)


def _FullyqualifiedDigest(digest):
  return 'https://{digest}'.format(digest=digest)


def _MakeSummaryRequest(project_id, url_filter):
  """Helper function to make Summary request."""
  client = apis.GetClientInstance('containeranalysis', 'v1alpha1')
  messages = apis.GetMessagesModule('containeranalysis', 'v1alpha1')
  project_ref = resources.REGISTRY.Parse(
      project_id, collection='cloudresourcemanager.projects')

  req = (
      messages.
      ContaineranalysisProjectsOccurrencesGetVulnerabilitySummaryRequest(
          parent=project_ref.RelativeName(), filter=url_filter))
  return client.projects_occurrences.GetVulnerabilitySummary(req)


def TransformContainerAnalysisData(
    image_name, occurrence_filter=filter_util.ContainerAnalysisFilter()):
  """Transforms the occurrence data from Container Analysis API."""
  analysis_obj = container_analysis_data_util.ContainerAndAnalysisData(
      image_name)
  project_id = RecoverProjectId(image_name)
  occs = requests.ListOccurrences(project_id, occurrence_filter.GetFilter())
  for occ in occs:
    analysis_obj.add_record(occ)

  if 'DEPLOYMENT' in occurrence_filter.kinds:
    dep_filter = occurrence_filter.WithKinds(['DEPLOYMENT']).WithResources(
        [])
    dep_occs = requests.ListOccurrences(project_id, dep_filter.GetFilter())
    image_string = six.text_type(image_name)
    for occ in dep_occs:
      if not occ.deployment:
        continue
      if image_string in occ.deployment.resourceUri:
        analysis_obj.add_record(occ)

  analysis_obj.resolveSummaries()
  return analysis_obj


def FetchSummary(repository, resource_url):
  """Fetches the summary of vulnerability occurrences for some resource.

  Args:
    repository: A parsed docker_name.Repository object.
    resource_url: The URL identifying the resource.

  Returns:
    A GetVulnzOccurrencesSummaryResponse.
  """
  project_id = RecoverProjectId(repository)
  url_filter = 'resource_url = "{resource_url}"'.format(
      resource_url=resource_url)
  return requests.GetVulnerabilitySummary(project_id, url_filter)


def FetchOccurrences(repository, occurrence_filter):
  """Fetches the occurrences attached to the list of manifests."""
  project_id = RecoverProjectId(repository)
  occurrences_by_resources = {}
  occurrences = requests.ListOccurrencesWithFilters(
      project_id, occurrence_filter.GetChunkifiedFilters())
  for occ in occurrences:
    if occ.resourceUri not in occurrences_by_resources:
      occurrences_by_resources[occ.resourceUri] = []
    occurrences_by_resources[occ.resourceUri].append(occ)
  return occurrences_by_resources


def TransformManifests(manifests,
                       repository,
                       show_occurrences=False,
                       occurrence_filter=filter_util.ContainerAnalysisFilter()):
  """Transforms the manifests returned from the server."""
  if not manifests:
    return []

  # Map from resource url to the occurrence.
  occurrences = {}
  if show_occurrences:
    occurrences = FetchOccurrences(
        repository, occurrence_filter=occurrence_filter)

  # Attach each occurrence to the resource to which it applies.
  results = []
  for k, v in six.iteritems(manifests):
    result = {
        'digest': k,
        'tags': v.get('tag', []),
        'timestamp': _TimeCreatedToDateTime(v.get('timeCreatedMs'))
    }

    # Partition the (non-PACKAGE_VULNERABILITY) occurrences into different
    # columns by kind.
    for occ in occurrences.get(_ResourceUrl(repository, k), []):
      if occ.kind not in result:
        result[occ.kind] = []
      result[occ.kind].append(occ)

    if show_occurrences and occurrence_filter.resources:
      result['vuln_counts'] = {}
      # If this manifest is in the list of resource urls for which to show
      # summaries, query the API for the summary.
      resource_url = _ResourceUrl(repository, k)
      if resource_url not in occurrence_filter.resources:
        continue

      summary = FetchSummary(repository, resource_url)
      for severity_count in summary.counts:
        if severity_count.severity:
          result['vuln_counts'][str(severity_count.severity)] = (
              severity_count.totalCount)

    results.append(result)
  return results


def GetTagNamesForDigest(digest, http_obj):
  """Gets all of the tags for a given digest.

  Args:
    digest: docker_name.Digest, The digest supplied by a user.
    http_obj: http.Http(), The http transport.

  Returns:
    A list of all of the tags associated with the input digest.
  """
  repository_path = digest.registry + '/' + digest.repository
  repository = ValidateRepositoryPath(repository_path)
  with v2_2_image.FromRegistry(
      basic_creds=CredentialProvider(), name=repository,
      transport=http_obj) as image:
    if digest.digest not in image.manifests():
      return []
    manifest_value = image.manifests().get(digest.digest, {})
    return manifest_value.get('tag', [])  # digest tags


def GetDockerTagsForDigest(digest, http_obj):
  """Gets all of the tags for a given digest.

  Args:
    digest: docker_name.Digest, The digest supplied by a user.
    http_obj: http.Http(), The http transport.

  Returns:
    A list of all of the tags associated with the input digest.
  """
  repository_path = digest.registry + '/' + digest.repository
  repository = ValidateRepositoryPath(repository_path)
  tags = []
  tag_names = GetTagNamesForDigest(digest, http_obj)
  for tag_name in tag_names:  # iterate over digest tags
    try:
      tag = docker_name.Tag(six.text_type(repository) + ':' + tag_name)
    except docker_name.BadNameException as e:
      raise InvalidImageNameError(six.text_type(e))
    tags.append(tag)
  return tags


def ValidateImagePathAndReturn(digest_or_tag):
  # Repository should contain project/image_path.
  if '/' not in digest_or_tag.repository:
    raise InvalidImageNameError('Image name should start with '
                                '*.gcr.io/project_id/image_path. ')
  return digest_or_tag


def GetDockerImageFromTagOrDigest(image_name):
  """Gets an image object given either a tag or a digest.

  Args:
    image_name: Either a fully qualified tag or a fully qualified digest.
      Defaults to latest if no tag specified.

  Returns:
    Either a docker_name.Tag or a docker_name.Digest object.

  Raises:
    InvalidImageNameError: Given digest could not be resolved to a full digest.
  """
  if not IsFullySpecified(image_name):
    image_name += ':latest'

  try:
    return ValidateImagePathAndReturn(docker_name.Tag(image_name))
  except docker_name.BadNameException:
    pass

  parts = image_name.split('@', 1)
  if len(parts) == 2:
    if not parts[1].startswith('sha256:'):
      raise InvalidImageNameError(
          '[{0}] digest must be of the form "sha256:<digest>".'.format(
              image_name))

    # If the full digest wasn't specified, check if what was passed
    # in is a valid digest prefix.
    # 7 for 'sha256:' and 64 for the full digest
    if len(parts[1]) < 7 + 64:
      resolved = GetDockerDigestFromPrefix(image_name)
      if resolved == image_name:
        raise InvalidImageNameError(
            '[{0}] could not be resolved to a full digest.'.format(image_name))
      image_name = resolved
  try:
    return ValidateImagePathAndReturn(docker_name.Digest(image_name))
  except docker_name.BadNameException:
    raise InvalidImageNameError(
        '[{0}] digest must be of the form "sha256:<digest>".'.format(
            image_name))


def GetDigestFromName(image_name):
  """Gets a digest object given a repository, tag or digest.

  Args:
    image_name: A docker image reference, possibly underqualified.

  Returns:
    a docker_name.Digest object.

  Raises:
    InvalidImageNameError: If no digest can be resolved.
  """
  tag_or_digest = GetDockerImageFromTagOrDigest(image_name)

  # If we got a tag, resolve it to a digest.
  # If it was a digest - we check if resource exists and reconstruct it.
  def ResolveV2Tag(tag):
    with v2_image.FromRegistry(
        basic_creds=CredentialProvider(), name=tag,
        transport=Http()) as v2_img:
      if v2_img.exists():
        return v2_img.digest()
      return None

  def ResolveV22Tag(tag):
    with v2_2_image.FromRegistry(
        basic_creds=CredentialProvider(),
        name=tag,
        transport=Http(),
        accepted_mimes=v2_2_docker_http.SUPPORTED_MANIFEST_MIMES) as v2_2_img:
      if v2_2_img.exists():
        return v2_2_img.digest()
      return None

  def ResolveManifestListTag(tag):
    with docker_image_list.FromRegistry(
        basic_creds=CredentialProvider(), name=tag,
        transport=Http()) as manifest_list:
      if manifest_list.exists():
        return manifest_list.digest()
      return None

  # Resolve as manifest list, then v2.2, then v2.1 because for compatibility:
  # - manifest lists can be rewritten to v2.2 "default" images.
  # - v2.2 manifests can be rewritten to v2.1 manifests.
  sha256 = (
      ResolveManifestListTag(tag_or_digest) or ResolveV22Tag(tag_or_digest) or
      ResolveV2Tag(tag_or_digest))
  if not sha256:
    raise InvalidImageNameError(
        '[{0}] is not found or is not a valid name. Expected tag in the form '
        '"base:tag" or "tag" or digest in the form "sha256:<digest>"'.
        format(image_name))

  # Even though we were able to get the digest from the tag, we should warn
  # users against using tags. If they didn't.
  if not isinstance(tag_or_digest, docker_name.Digest):
    log.warning('Successfully resolved tag to sha256, but it is recommended to '
                'use sha256 directly.')

  return docker_name.Digest('{registry}/{repository}@{sha256}'.format(
      registry=tag_or_digest.registry,
      repository=tag_or_digest.repository,
      sha256=sha256))


def GetDockerDigestFromPrefix(digest):
  """Gets a full digest string given a potential prefix.

  Args:
    digest: The digest prefix

  Returns:
    The full digest, or the same prefix if no full digest is found.

  Raises:
    InvalidImageNameError: if the prefix supplied isn't unique.
  """
  repository_path, prefix = digest.split('@', 1)
  repository = ValidateRepositoryPath(repository_path)
  with v2_2_image.FromRegistry(
      basic_creds=CredentialProvider(), name=repository,
      transport=Http()) as image:
    matches = [d for d in image.manifests() if d.startswith(prefix)]

    if len(matches) == 1:
      return repository_path + '@' + matches.pop()
    elif len(matches) > 1:
      raise InvalidImageNameError(
          '{0} is not a unique digest prefix. Options are {1}.]'.format(
              prefix, ', '.join(map(str, matches))))
    return digest


@contextmanager
def WrapExpectedDockerlessErrors(optional_image_name=None):
  try:
    yield
  except (v2_docker_http.V2DiagnosticException,
          v2_2_docker_http.V2DiagnosticException) as err:
    if err.status in [
        six.moves.http_client.UNAUTHORIZED, six.moves.http_client.FORBIDDEN
    ]:
      raise UserRecoverableV2Error('Access denied: {}'.format(
          optional_image_name or six.text_type(err)))
    elif err.status == six.moves.http_client.NOT_FOUND:
      raise UserRecoverableV2Error('Not found: {}'.format(
          optional_image_name or six.text_type(err)))
    raise
  except (v2_docker_http.TokenRefreshException,
          v2_2_docker_http.TokenRefreshException) as err:
    raise TokenRefreshError(six.text_type(err))


def Http(timeout='unset'):
  """Gets an transport client for use with containerregistry.

  For now, this just calls into GetApitoolsTransport, but if we find that
  implementation does not satisfy our needs, we may need to fork it. This
  small amount of indirection will make that change a bit cleaner.

  Args:
    timeout: the http timeout in seconds

  Returns:
    1. A httplib2.Http-like object backed by httplib2 or requests.
  """
  return transports.GetApitoolsTransport(timeout=timeout)