HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/396/lib/googlecloudsdk/command_lib/artifacts/version_util.py
# -*- coding: utf-8 -*- #
# Copyright 2022 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utility for parsing Artifact Registry versions."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import base64
import json

from apitools.base.protorpclite import protojson
from googlecloudsdk.api_lib.artifacts import filter_rewriter
from googlecloudsdk.api_lib.util import common_args
from googlecloudsdk.command_lib.artifacts import containeranalysis_util as ca_util
from googlecloudsdk.command_lib.artifacts import requests
from googlecloudsdk.command_lib.artifacts import util
from googlecloudsdk.core import log
from googlecloudsdk.core import properties
from googlecloudsdk.core import resources


def ShortenRelatedTags(response, unused_args):
  """Convert the tag resources into tag IDs."""
  tags = []
  for t in response.relatedTags:
    tag = resources.REGISTRY.ParseRelativeName(
        t.name, "artifactregistry.projects.locations.repositories.packages.tags"
    )
    tags.append(tag.tagsId)

  json_obj = json.loads(protojson.encode_message(response))
  json_obj.pop("relatedTags", None)
  if tags:
    json_obj["relatedTags"] = tags
  # Restore the display format of `metadata` after json conversion.
  if response.metadata is not None:
    json_obj["metadata"] = {
        prop.key: prop.value.string_value
        for prop in response.metadata.additionalProperties
    }
  return json_obj


def ListOccurrences(response, args):
  """Call CA APIs for vulnerabilities if --show-package-vulnerability is set."""
  if not args.show_package_vulnerability:
    return response

  # TODO(b/246801021) Assume all versions are mavenArtifacts until versions API
  # is aware of the package type.
  project, maven_resource = _GenerateMavenResourceFromResponse(response)

  metadata = ca_util.GetMavenArtifactOccurrences(project, maven_resource)

  if metadata.ArtifactsDescribeView():
    response.update(metadata.ArtifactsDescribeView())
  else:
    response.update(
        {"package_vulnerability_summary": "No vulnerability data found."}
    )

  return response


def ConvertFingerprint(response, unused_args):
  """Convert fingerprint and annotations to a dict."""
  if hasattr(response, "check_initialized"):
    # It's a protorpc message.
    resource = json.loads(protojson.encode_message(response))
  else:
    # It's a json already.
    resource = response

  if "fingerprints" in resource and resource["fingerprints"]:
    for h in resource["fingerprints"]:
      if isinstance(h.get("value"), str):
        # In dicts from tests, the value is base64 encoded string.
        h["value"] = base64.b64decode(h["value"]).hex()

  if "annotations" in resource and resource.get("annotations"):
    # The value from scenario test is a dict, not a message.
    if "additionalProperties" in resource["annotations"]:
      annotations = {}
      for p in resource["annotations"].get("additionalProperties", []):
        annotations[p["key"]] = p["value"]
      resource["annotations"] = annotations
  return resource


def _GenerateMavenResourceFromResponse(response):
  """Convert Versions Describe Response to maven artifact resource name."""
  r = resources.REGISTRY.ParseRelativeName(
      response["name"],
      "artifactregistry.projects.locations.repositories.packages.versions",
  )

  # mavenArtifacts is only present in the v1 API, not the default v1beta2 API
  registry = resources.REGISTRY.Clone()
  registry.RegisterApiByName("artifactregistry", "v1")

  maven_artifacts_id = r.packagesId + ":" + r.versionsId

  maven_resource = resources.Resource.RelativeName(
      registry.Create(
          "artifactregistry.projects.locations.repositories.mavenArtifacts",
          projectsId=r.projectsId,
          locationsId=r.locationsId,
          repositoriesId=r.repositoriesId,
          mavenArtifactsId=maven_artifacts_id,
      )
  )

  return r.projectsId, maven_resource


def ListVersions(args):
  """Lists package versions in a given package.

  Args:
    args: User input arguments.

  Returns:
    List of package versiions.
  """
  client = requests.GetClient()
  messages = requests.GetMessages()
  page_size = args.page_size
  repo = util.GetRepo(args)
  project = util.GetProject(args)
  location = args.location or properties.VALUES.artifacts.location.Get()
  package = args.package
  escaped_pkg = package.replace("/", "%2F").replace("+", "%2B")
  escaped_pkg = escaped_pkg.replace("^", "%5E")
  order_by = common_args.ParseSortByArg(args.sort_by)
  limit = args.limit
  _, server_filter = filter_rewriter.Rewriter().Rewrite(args.filter)

  if order_by is not None:
    if "," in order_by:
      # Multi-ordering is not supported yet on backend, fall back to client-side
      # sort-by.
      order_by = None

  if args.limit is not None and args.filter is not None:
    if server_filter is not None:
      # Apply limit to server-side page_size to improve performance when
      # server-side filter is used.
      page_size = args.limit
    else:
      # Fall back to client-side paging with client-side filtering.
      page_size = None
      limit = None

  pkg_path = resources.Resource.RelativeName(
      resources.REGISTRY.Create(
          "artifactregistry.projects.locations.repositories.packages",
          projectsId=project,
          locationsId=location,
          repositoriesId=repo,
          packagesId=escaped_pkg,
      )
  )

  server_args = {
      "client": client,
      "messages": messages,
      "pkg": pkg_path,
      "server_filter": server_filter,
      "page_size": page_size,
      "order_by": order_by,
      "limit": limit,
  }
  server_args_skipped, lversions = util.RetryOnInvalidArguments(
      requests.ListVersions, **server_args
  )

  if not server_args_skipped:
    # If server-side filter or sort-by is parsed correctly and the request
    # succeeds, remove the client-side filter and sort-by.
    if server_filter and server_filter == args.filter:
      args.filter = None
    if order_by:
      args.sort_by = None

  log.status.Print(
      "Listing items under project {}, location {}, repository {}, "
      "package {}.\n".format(project, location, repo, package)
  )
  return lversions