HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/command_lib/container/binauthz/sigstore_image.py
# -*- coding: utf-8 -*- #
# Copyright 2024 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tool and utils for creating Sigstore attestations stored as a Docker images."""

import base64
import binascii
import collections
import copy
import json
from typing import List, Optional, Text

from containerregistry.client import docker_creds
from containerregistry.client import docker_name
from containerregistry.client.v2_2 import docker_digest
from containerregistry.client.v2_2 import docker_http
from containerregistry.client.v2_2 import docker_image
from containerregistry.client.v2_2 import docker_session
from containerregistry.transform.v2_2 import metadata
from googlecloudsdk.api_lib.container.images import util
from googlecloudsdk.command_lib.container.binauthz import util as binauthz_util
from googlecloudsdk.core.exceptions import Error
import httplib2


DSSE_PAYLOAD_TYPE = 'application/vnd.dsse.envelope.v1+json'
BINAUTHZ_CUSTOM_PREDICATE = (
    'https://binaryauthorization.googleapis.com/policy_verification/v0.1'
)


def _RemovePrefix(text, prefix):
  if text.startswith(prefix):
    return text[len(prefix):]
  return text


def AddMissingBase64Padding(encoded):
  return encoded + '==='[: -len(encoded) % 4]


def StandardOrUrlsafeBase64Decode(encoded):
  try:
    return base64.b64decode(encoded)
  except binascii.Error:
    # Urlsafe encodings may or may not be padded.
    return base64.urlsafe_b64decode(AddMissingBase64Padding(encoded))


def AttestationToImageUrl(attestation):
  """Extract the image url from a DSSE of predicate type https://binaryauthorization.googleapis.com/policy_verification/*.

  This is a helper function for mapping attestations back to their respective
  images. Do not use this for signature verification.

  Args:
    attestation: The attestation in base64 encoded string form.

  Returns:
    The image url referenced in the attestation.
  """
  # The DSSE spec permits either standard or URL-safe base64 encoding.
  deser_att = json.loads(StandardOrUrlsafeBase64Decode(attestation))

  deser_payload = json.loads(
      StandardOrUrlsafeBase64Decode(deser_att['payload'])
  )
  return '{}@sha256:{}'.format(
      deser_payload['subject'][0]['name'],
      deser_payload['subject'][0]['digest']['sha256'],
  )


def UploadAttestationToRegistry(
    image_url, attestation, use_docker_creds=None, docker_config_dir=None
):
  """Uploads a DSSE attestation to the registry.

  Args:
    image_url: The image url of the target image.
    attestation: The attestation referencing the target image in JSON DSSE form.
    use_docker_creds: Whether to use the Docker configuration file for
      authenticating to the registry.
    docker_config_dir: Directory where Docker saves authentication settings.
  """
  http_obj = httplib2.Http()
  # The registry name is deduced by splitting on the first '/' char. This will
  # not work properly if the image url has a scheme.
  target_image = docker_name.Digest(
      binauthz_util.ReplaceImageUrlScheme(image_url, scheme='')
  )
  # Sigstore scheme for tag based discovery.
  attestation_tag = docker_name.Tag(
      '{}/{}:sha256-{}.att'.format(
          target_image.registry,
          target_image.repository,
          _RemovePrefix(target_image.digest, 'sha256:'),
      )
  )

  creds = None
  if use_docker_creds:
    keychain = docker_creds.DefaultKeychain
    if docker_config_dir:
      keychain.setCustomConfigDir(docker_config_dir)
    creds = keychain.Resolve(docker_name.Registry(target_image.registry))
  if creds is None or isinstance(creds, docker_creds.Anonymous):
    creds = util.CredentialProvider()

  # Check if attestation image already exists and if so append a new layer.
  # Only check for Image Manifest Version 2, Schema 2 images since that format
  # predates Sigstore.
  with docker_image.FromRegistry(
      attestation_tag,
      creds,
      http_obj,
      accepted_mimes=docker_http.SUPPORTED_MANIFEST_MIMES,
  ) as v2_2_image:
    if v2_2_image.exists():
      new_image = SigstoreAttestationImage([attestation], v2_2_image)
      # TODO(b/310721968): Use etags to mitigate against read-modify-update race
      # conditions.
      docker_session.Push(attestation_tag, creds, http_obj).upload(new_image)
      return

  # Otherwise create a new image.
  new_image = SigstoreAttestationImage([attestation])
  docker_session.Push(attestation_tag, creds, http_obj).upload(new_image)


class SigstoreAttestationImage(docker_image.DockerImage):
  """Creates a new image or appends a layers on top of an existing image.

  Adheres to the Sigstore Attestation spec:
  https://github.com/sigstore/cosign/blob/main/specs/ATTESTATION_SPEC.md.
  """

  def __init__(
      self,
      additional_blobs: List[bytes],
      base: Optional[docker_image.DockerImage] = None,
  ):
    """Creates a new Sigstore style image or extends a base image.

    Args:
      additional_blobs: additional attestations to be appended to the image.
      base: optional base DockerImage.
    """
    self._additional_blobs = collections.OrderedDict(
        (docker_digest.SHA256(blob), blob) for blob in additional_blobs
    )
    if base is not None:
      self._base = base
      self._base_manifest = json.loads(self._base.manifest())
      self._base_config_file = json.loads(self._base.config_file())
    else:
      self._base = None
      self._base_manifest = {
          'mediaType': docker_http.OCI_MANIFEST_MIME,
          'schemaVersion': 2,
          'config': {
              'digest': '',  # Populated below.
              'mediaType': docker_http.CONFIG_JSON_MIME,
              'size': 0,
          },
          'layers': [],  # Populated below.
      }
      self._base_config_file = dict()

  def add_layer(self, blob: bytes) -> None:
    self._additional_blobs[docker_digest.SHA256(blob)] = blob

  def config_file(self) -> Text:
    """Override."""
    config_file = self._base_config_file
    overrides = metadata.Overrides()
    overrides = overrides.Override(created_by=docker_name.USER_AGENT)

    layers = [
        _RemovePrefix(blob_sum, 'sha256:')
        for blob_sum in self._additional_blobs.keys()
    ]

    overrides = overrides.Override(
        layers=layers,
    )

    # Override makes a deep copy of the base config file before modifying it.
    config_file = metadata.Override(
        config_file,
        options=overrides,
        architecture='',
        operating_system='',
    )

    return json.dumps(config_file, sort_keys=True)

  def manifest(self) -> Text:
    """Override."""
    manifest = copy.deepcopy(self._base_manifest)
    for blob_sum, blob in self._additional_blobs.items():
      manifest['layers'].append({
          'digest': blob_sum,
          'mediaType': DSSE_PAYLOAD_TYPE,
          'size': len(blob),
          'annotations': {
              'dev.cosignproject.cosign/signature': '',
              'predicateType': BINAUTHZ_CUSTOM_PREDICATE,
          },
      })

    config_file = self.config_file()
    utf8_encoded_config = config_file.encode('utf8')
    manifest['config']['digest'] = docker_digest.SHA256(utf8_encoded_config)
    manifest['config']['size'] = len(utf8_encoded_config)
    return json.dumps(manifest, sort_keys=True)

  def blob(self, digest: Text) -> bytes:
    """Override. Returns uncompressed blob."""
    if digest in self._additional_blobs:
      return self._additional_blobs[digest]
    if self._base:
      return self._base.blob(digest)
    raise Error('Digest not found: {}'.format(digest))

  # __enter__ and __exit__ allow use as a context manager.
  def __enter__(self):
    """Override."""
    return self

  def __exit__(self, unused_type, unused_value, unused_traceback):
    """Override."""
    return