HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/api_lib/container/binauthz/attestors.py
# -*- coding: utf-8 -*- #
# Copyright 2018 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""API helpers for interacting with attestors."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

from apitools.base.py import list_pager
from googlecloudsdk.api_lib.container.binauthz import apis
from googlecloudsdk.api_lib.container.binauthz import util
from googlecloudsdk.command_lib.container.binauthz import exceptions
from googlecloudsdk.command_lib.kms import maps as kms_maps


class Client(object):
  """A client for interacting with attestors."""

  def __init__(self, api_version=None):
    self.client = apis.GetClientInstance(api_version)
    self.messages = apis.GetMessagesModule(api_version)
    self.api_version = api_version

  def Get(self, attestor_ref):
    """Get the specified attestor."""
    return self.client.projects_attestors.Get(
        self.messages.BinaryauthorizationProjectsAttestorsGetRequest(
            name=attestor_ref.RelativeName(),
        ))

  def List(self, project_ref, limit=None, page_size=None):
    """List the attestors associated with the current project."""
    return list_pager.YieldFromList(
        self.client.projects_attestors,
        self.messages.BinaryauthorizationProjectsAttestorsListRequest(
            parent=project_ref.RelativeName(),),
        batch_size=page_size or 100,  # Default batch_size.
        limit=limit,
        field='attestors',
        batch_size_attribute='pageSize')

  def _GetNoteClass(self):
    return (self.messages.UserOwnedGrafeasNote
            if self.api_version == apis.V1 else
            self.messages.UserOwnedDrydockNote)

  def GetNotePropertyName(self):
    return ('userOwnedGrafeasNote'
            if self.api_version == apis.V1 else
            'userOwnedDrydockNote')

  def GetNoteAttr(self, attestor):
    """Return the Attestor's version-dependent Note attribute."""
    if self.api_version == apis.V1:
      return attestor.userOwnedGrafeasNote
    else:
      return attestor.userOwnedDrydockNote

  def Create(self, attestor_ref, note_ref, description=None):
    """Create an attestors associated with the current project."""
    project_ref = attestor_ref.Parent(util.PROJECTS_COLLECTION)
    return self.client.projects_attestors.Create(
        self.messages.BinaryauthorizationProjectsAttestorsCreateRequest(
            attestor=self.messages.Attestor(
                name=attestor_ref.RelativeName(),
                description=description,
                **{self.GetNotePropertyName(): self._GetNoteClass()(
                    noteReference=note_ref.RelativeName(),
                )}
            ),
            attestorId=attestor_ref.Name(),
            parent=project_ref.RelativeName(),
        ))

  def AddPgpKey(self, attestor_ref, pgp_pubkey_content, comment=None):
    """Add a PGP key to an attestor.

    Args:
      attestor_ref: ResourceSpec, The attestor to be updated.
      pgp_pubkey_content: The contents of the PGP public key file.
      comment: The comment on the public key.

    Returns:
      The added public key.

    Raises:
      AlreadyExistsError: If a public key with the same key content was found on
          the attestor.
    """
    attestor = self.Get(attestor_ref)

    existing_pub_keys = set(
        public_key.asciiArmoredPgpPublicKey
        for public_key in self.GetNoteAttr(attestor).publicKeys)
    if pgp_pubkey_content in existing_pub_keys:
      raise exceptions.AlreadyExistsError(
          'Provided public key already present on attestor [{}]'.format(
              attestor.name))

    existing_ids = set(
        public_key.id
        for public_key in self.GetNoteAttr(attestor).publicKeys)

    self.GetNoteAttr(attestor).publicKeys.append(
        self.messages.AttestorPublicKey(
            asciiArmoredPgpPublicKey=pgp_pubkey_content,
            comment=comment))

    updated_attestor = self.client.projects_attestors.Update(attestor)

    return next(
        public_key
        for public_key in self.GetNoteAttr(updated_attestor).publicKeys
        if public_key.id not in existing_ids)

  def AddPkixKey(self, attestor_ref, pkix_pubkey_content, pkix_sig_algorithm,
                 id_override=None, comment=None):
    """Add a key to an attestor.

    Args:
      attestor_ref: ResourceSpec, The attestor to be updated.
      pkix_pubkey_content: The PEM-encoded PKIX public key.
      pkix_sig_algorithm: The PKIX public key signature algorithm.
      id_override: If provided, the key ID to use instead of the API-generated
          one.
      comment: The comment on the public key.

    Returns:
      The added public key.

    Raises:
      AlreadyExistsError: If a public key with the same key content was found on
          the attestor.
    """
    attestor = self.Get(attestor_ref)

    existing_ids = set(
        public_key.id
        for public_key in self.GetNoteAttr(attestor).publicKeys)
    if id_override is not None and id_override in existing_ids:
      raise exceptions.AlreadyExistsError(
          'Public key with ID [{}] already present on attestor [{}]'.format(
              id_override, attestor.name))

    self.GetNoteAttr(attestor).publicKeys.append(
        self.messages.AttestorPublicKey(
            id=id_override,
            pkixPublicKey=self.messages.PkixPublicKey(
                publicKeyPem=pkix_pubkey_content,
                signatureAlgorithm=pkix_sig_algorithm),
            comment=comment))

    updated_attestor = self.client.projects_attestors.Update(attestor)

    return next(
        public_key
        for public_key in self.GetNoteAttr(updated_attestor).publicKeys
        if public_key.id not in existing_ids)

  def RemoveKey(self, attestor_ref, pubkey_id):
    """Remove a key on an attestor.

    Args:
      attestor_ref: ResourceSpec, The attestor to be updated.
      pubkey_id: The ID of the key to remove.

    Raises:
      NotFoundError: If an expected public key could not be located by ID.
    """
    attestor = self.Get(attestor_ref)

    existing_ids = set(
        public_key.id
        for public_key in self.GetNoteAttr(attestor).publicKeys)
    if pubkey_id not in existing_ids:
      raise exceptions.NotFoundError(
          'No matching public key found on attestor [{}]'.format(
              attestor.name))

    self.GetNoteAttr(attestor).publicKeys = [
        public_key
        for public_key in self.GetNoteAttr(attestor).publicKeys
        if public_key.id != pubkey_id]

    self.client.projects_attestors.Update(attestor)

  def UpdateKey(
      self, attestor_ref, pubkey_id, pgp_pubkey_content=None, comment=None):
    """Update a key on an attestor.

    Args:
      attestor_ref: ResourceSpec, The attestor to be updated.
      pubkey_id: The ID of the key to update.
      pgp_pubkey_content: The contents of the public key file.
      comment: The comment on the public key.

    Returns:
      The updated public key.

    Raises:
      NotFoundError: If an expected public key could not be located by ID.
      InvalidStateError: If multiple public keys matched the provided ID.
      InvalidArgumentError: If a non-PGP key is updated with pgp_pubkey_content.
    """
    attestor = self.Get(attestor_ref)

    existing_keys = [
        public_key
        for public_key in self.GetNoteAttr(attestor).publicKeys
        if public_key.id == pubkey_id]

    if not existing_keys:
      raise exceptions.NotFoundError(
          'No matching public key found on attestor [{}]'.format(
              attestor.name))
    if len(existing_keys) > 1:
      raise exceptions.InvalidStateError(
          'Multiple matching public keys found on attestor [{}]'.format(
              attestor.name))

    existing_key = existing_keys[0]
    if pgp_pubkey_content is not None:
      if not existing_key.asciiArmoredPgpPublicKey:
        raise exceptions.InvalidArgumentError(
            'Cannot update a non-PGP PublicKey with a PGP public key')
      existing_key.asciiArmoredPgpPublicKey = pgp_pubkey_content
    if comment is not None:
      existing_key.comment = comment

    updated_attestor = self.client.projects_attestors.Update(attestor)

    return next(
        public_key
        for public_key in self.GetNoteAttr(updated_attestor).publicKeys
        if public_key.id == pubkey_id)

  def Update(self, attestor_ref, description=None):
    """Update an attestor.

    Args:
      attestor_ref: ResourceSpec, The attestor to be updated.
      description: string, If provided, the new attestor description.

    Returns:
      The updated attestor.
    """
    attestor = self.Get(attestor_ref)

    if description is not None:
      attestor.description = description

    return self.client.projects_attestors.Update(attestor)

  def Delete(self, attestor_ref):
    """Delete the specified attestor."""
    req = self.messages.BinaryauthorizationProjectsAttestorsDeleteRequest(
        name=attestor_ref.RelativeName(),
    )

    self.client.projects_attestors.Delete(req)

  def ConvertFromKmsSignatureAlgorithm(self, kms_algorithm):
    """Convert a KMS SignatureAlgorithm into a Binauthz SignatureAlgorithm."""
    binauthz_enum = self.messages.PkixPublicKey.SignatureAlgorithmValueValuesEnum
    kms_enum = kms_maps.ALGORITHM_ENUM
    alg_map = {
        kms_enum.RSA_SIGN_PSS_2048_SHA256.name:
            binauthz_enum.RSA_PSS_2048_SHA256,
        kms_enum.RSA_SIGN_PSS_3072_SHA256.name:
            binauthz_enum.RSA_PSS_3072_SHA256,
        kms_enum.RSA_SIGN_PSS_4096_SHA256.name:
            binauthz_enum.RSA_PSS_4096_SHA256,
        kms_enum.RSA_SIGN_PSS_4096_SHA512.name:
            binauthz_enum.RSA_PSS_4096_SHA512,
        kms_enum.RSA_SIGN_PKCS1_2048_SHA256.name:
            binauthz_enum.RSA_SIGN_PKCS1_2048_SHA256,
        kms_enum.RSA_SIGN_PKCS1_3072_SHA256.name:
            binauthz_enum.RSA_SIGN_PKCS1_3072_SHA256,
        kms_enum.RSA_SIGN_PKCS1_4096_SHA256.name:
            binauthz_enum.RSA_SIGN_PKCS1_4096_SHA256,
        kms_enum.RSA_SIGN_PKCS1_4096_SHA512.name:
            binauthz_enum.RSA_SIGN_PKCS1_4096_SHA512,
        kms_enum.EC_SIGN_P256_SHA256.name:
            binauthz_enum.ECDSA_P256_SHA256,
        kms_enum.EC_SIGN_P384_SHA384.name:
            binauthz_enum.ECDSA_P384_SHA384,
    }
    try:
      return alg_map[kms_algorithm.name]
    except KeyError:
      raise exceptions.InvalidArgumentError(
          'Unsupported PkixPublicKey signature algorithm: "{}"'.format(
              kms_algorithm.name))