HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/396/lib/googlecloudsdk/command_lib/container/binauthz/util.py
# -*- coding: utf-8 -*- #
# Copyright 2023 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilities for Binary Authorization commands."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import json

from containerregistry.client import docker_name
from googlecloudsdk.core.exceptions import Error
import six
from six.moves import urllib

_BINAUTHZ_ATTESTATION_ANNOTATION_KEY = (
    'binaryauthorization.googleapis.com/attestations'
)


class BadImageUrlError(Error):
  """Raised when a container image URL cannot be parsed successfully."""


def ReplaceImageUrlScheme(image_url, scheme):
  """Returns the passed `image_url` with the scheme replaced.

  Args:
    image_url: The URL to replace (or strip) the scheme from. (string)
    scheme: The scheme of the returned URL.  If this is an empty string or
      `None`, the scheme is stripped and the leading `//` of the resulting URL
      will be stripped off.
  Raises:
    BadImageUrlError: `image_url` isn't valid.
  """
  scheme = scheme or ''
  parsed_url = urllib.parse.urlparse(image_url)

  # If the URL has a scheme but not a netloc, then it must have looked like
  # 'scheme:///foo/bar', which is invalid for the purpose of attestation.
  if parsed_url.scheme and not parsed_url.netloc:
    raise BadImageUrlError(
        "Image URL '{image_url}' is invalid because it does not have a host "
        'component.'.format(image_url=image_url))

  # If there is neither a scheme nor a netloc, this means that an unqualified
  # URL was passed, like 'gcr.io/foo/bar'.  In this case we canonicalize the URL
  # by prefixing '//', which will cause urlparse to correctly pick up the
  # netloc.
  if not parsed_url.netloc:
    parsed_url = urllib.parse.urlparse('//{}'.format(image_url))

  # Finally, we replace the scheme and generate the URL.  If we were stripping
  # the scheme, the result will be prefixed with '//', which we strip off.  If
  # the scheme is non-empty, the lstrip is a no-op.
  return parsed_url._replace(scheme=scheme).geturl().lstrip('/')


def MakeSignaturePayloadDict(container_image_url):
  """Creates a dict representing a JSON signature object to sign.

  Args:
    container_image_url: See `containerregistry.client.docker_name.Digest` for
      artifact URL validation and parsing details.  `container_image_url` must
      be a fully qualified image URL with a valid sha256 digest.

  Returns:
    Dictionary of nested dictionaries and strings, suitable for passing to
    `json.dumps` or similar.
  """
  url = ReplaceImageUrlScheme(image_url=container_image_url, scheme='')
  try:
    repo_digest = docker_name.Digest(url)
  except docker_name.BadNameException as e:
    raise BadImageUrlError(e)
  return {
      'critical': {
          'identity': {
              'docker-reference': six.text_type(repo_digest.as_repository()),
          },
          'image': {
              'docker-manifest-digest': repo_digest.digest,
          },
          'type': 'Google cloud binauthz container signature',
      },
  }


def MakeSignaturePayload(container_image_url):
  """Creates a JSON bytestring representing a signature object to sign.

  Args:
    container_image_url: See `containerregistry.client.docker_name.Digest` for
      artifact URL validation and parsing details.  `container_image_url` must
      be a fully qualified image URL with a valid sha256 digest.

  Returns:
    A bytestring representing a JSON-encoded structure of nested dictionaries
    and strings.
  """
  payload_dict = MakeSignaturePayloadDict(container_image_url)
  # `separators` is specified as a workaround to the native `json` module's
  # https://bugs.python.org/issue16333 which results in inconsistent
  # serialization in older versions of Python.
  payload = json.dumps(
      payload_dict,
      ensure_ascii=True,
      indent=2,
      separators=(',', ': '),
      sort_keys=True,
  )
  # NOTE: A newline is appended for backwards compatibility with the previous
  # payload serialization which relied on gcloud's default JSON serialization.
  return '{}\n'.format(payload).encode('utf-8')


def RemoveArtifactUrlScheme(artifact_url):
  """Ensures the given URL has no scheme (e.g.

  replaces "https://gcr.io/foo/bar" with "gcr.io/foo/bar" and leaves
  "gcr.io/foo/bar" unchanged).

  Args:
    artifact_url: A URL string.
  Returns:
    The URL with the scheme removed.
  """
  url_without_scheme = ReplaceImageUrlScheme(artifact_url, scheme='')
  try:
    # The validation logic in `docker_name` silently produces incorrect results
    # if the passed URL has a scheme.
    docker_name.Digest(url_without_scheme)
  except docker_name.BadNameException as e:
    raise BadImageUrlError(e)
  return url_without_scheme


def GetImageDigest(artifact_url):
  """Returns the digest of an image given its url.

  Args:
    artifact_url: An image url. e.g. "https://gcr.io/foo/bar@sha256:123"

  Returns:
    The image digest. e.g. "sha256:123"
  """
  url_without_scheme = ReplaceImageUrlScheme(artifact_url, scheme='')

  try:
    # The validation logic in `docker_name` silently produces incorrect results
    # if the passed URL has a scheme.
    digest = docker_name.Digest(url_without_scheme)
  except docker_name.BadNameException as e:
    raise BadImageUrlError(e)
  return digest.digest


def PaeEncode(dsse_type, body):
  """Pae encode input using the specified dsse type.

  Args:
    dsse_type: DSSE envelope type.
    body: payload string.

  Returns:
    Pae-encoded payload byte string.
  """
  dsse_type_bytes = dsse_type.encode('utf-8')
  body_bytes = body.encode('utf-8')
  return b' '.join([
      b'DSSEv1',
      b'%d' % len(dsse_type_bytes),
      dsse_type_bytes,
      b'%d' % len(body_bytes),
      body_bytes,
  ])


def GeneratePodSpecFromImages(images):
  """Creates a minimal PodSpec from a list of images.

  Args:
    images: list of images being evaluated.

  Returns:
    PodSpec object in JSON form.
  """
  spec = {
      'apiVersion': 'v1',
      'kind': 'Pod',
      'metadata': {
          'name': '',
      },
      'spec': {
          'containers': [{'image': image} for image in images],
      },
  }

  return spec


def AddInlineAttestationsToPodSpec(pod_spec, attestations):
  """Inlines attestations into a Kubernetes PodSpec.

  Args:
    pod_spec: The PodSpec provided by the user.
    attestations: List of attestations returned by the policy evaluator in comma
      separated DSSE form.

  Returns:
    Modified PodSpec with attestations inlined.
  """
  annotations = pod_spec['metadata'].get('annotations', {})
  existing_attestations = annotations.get(
      _BINAUTHZ_ATTESTATION_ANNOTATION_KEY, None
  )
  if existing_attestations:
    annotations[_BINAUTHZ_ATTESTATION_ANNOTATION_KEY] = ','.join(
        [existing_attestations] + attestations
    )
  else:
    annotations[_BINAUTHZ_ATTESTATION_ANNOTATION_KEY] = ','.join(attestations)
  pod_spec['metadata']['annotations'] = annotations
  return pod_spec


def AddInlineAttestationsToResource(resource, attestations):
  """Inlines attestations into a Kubernetes resource.

  Args:
    resource: The Kubernetes resource provided by the user.
    attestations: List of attestations returned by the policy evaluator in comma
      separated DSSE form.

  Returns:
    Modified Kubernetes resource with attestations inlined.
  """
  if resource['kind'] != 'Pod':
    resource['spec']['template'] = AddInlineAttestationsToPodSpec(
        resource['spec']['template'], attestations
    )
    return resource
  return AddInlineAttestationsToPodSpec(resource, attestations)