HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/api_lib/container/binauthz/containeranalysis.py
# -*- coding: utf-8 -*- #
# Copyright 2018 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Helper functions for interacting with the binauthz API."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

from apitools.base.py import list_pager
from googlecloudsdk.api_lib.util import apis

API_NAME = 'containeranalysis'

V1_ALPHA1 = 'v1alpha1'
V1_BETA1 = 'v1beta1'
V1 = 'v1'
DEFAULT_VERSION = V1


class Client(object):
  """A client to access containeranalysis for binauthz purposes."""

  def __init__(self, api_version=None):
    """Creates a ContainerAnalysisClient.

    Args:
      api_version: The containeranalysis API version to use.
    """
    self.api_version = api_version or DEFAULT_VERSION

    self.client = apis.GetClientInstance(API_NAME, self.api_version)
    self.messages = apis.GetMessagesModule(API_NAME, self.api_version)

  def YieldAttestations(
      self,
      note_ref=None,
      project_ref=None,
      artifact_digest=None,
      page_size=None,
      limit=None,
  ):
    """Yields occurrences associated with a given attestor Note or Project.

    Args:
      note_ref: The Note reference that will be queried for attached
        occurrences. If None, then all occurrences from the given project will
        be listed. (containeranalysis.projects.notes Resource)
      project_ref: The Project referenece that will be queried for occurrences
        if note_ref is None.
      artifact_digest: Digest of the artifact for which to fetch occurrences. If
        None, then all occurrences attached to the AA Note are returned.
      page_size: The number of attestations to retrieve per request. (If None,
        use the default page size.)
      limit: The maxium number of attestations to retrieve. (If None,
        unlimited.)

    Yields:
      Occurrences bound to `note_ref` with matching `artifact_digest` (if
      passed).
    """
    artifact_filter = (
        'has_suffix(resourceUrl, "{}")'.format(artifact_digest)
        if artifact_digest is not None
        else ''
    )
    if note_ref is None:
      service = self.client.projects_occurrences
      request = self.messages.ContaineranalysisProjectsOccurrencesListRequest(
          parent=project_ref.RelativeName(), filter=artifact_filter
      )
    else:
      service = self.client.projects_notes_occurrences
      request = (
          self.messages.ContaineranalysisProjectsNotesOccurrencesListRequest(
              name=note_ref.RelativeName(), filter=artifact_filter
          )
      )

    occurrence_iter = list_pager.YieldFromList(
        service=service,
        request=request,
        field='occurrences',
        batch_size=page_size or 100,  # Default batch_size.
        batch_size_attribute='pageSize',
        limit=limit,
    )

    # TODO(b/69380601): This should be handled by the filter parameter to
    # ListNoteOccurrences, but filtering isn't implemented yet for the fields
    # we care about.
    def MatchesFilter(occurrence):
      if (
          occurrence.kind
          != self.messages.Occurrence.KindValueValuesEnum.ATTESTATION
      ):
        return False
      return True

    for occurrence in occurrence_iter:
      if MatchesFilter(occurrence):
        yield occurrence

  def CreateAttestationOccurrence(
      self,
      note_ref,
      project_ref,
      artifact_url,
      public_key_id,
      signature,
      plaintext,
      validation_callback=None,
  ):
    """Creates an Occurrence referencing given URL and Note.

    This only supports the AttestationOccurrence-type Occurrence, currently only
    present in the v1 version of the API.

    Args:
      note_ref: The Note reference that the created Occurrence will be bound to.
        (containeranalysis.projects.notes Resource)
      project_ref: The project ref where the Occurrence will be created.
        (cloudresourcemanager.projects Resource)
      artifact_url: URL of artifact to which the signature is associated.
        (string)
      public_key_id: The URI of the public key that will be used to verify the
        signature. (string)
      signature: The content artifact's signature as generated by the specified
        key's signing operation. (string)
      plaintext: The content that was signed. (string)
      validation_callback: If provided, a function to validate the Occurrence
        prior to creation. (Callable[[Occurrence], None])

    Returns:
      Created Occurrence.
    """
    assert self.api_version == V1
    attestation = self.messages.AttestationOccurrence(
        serializedPayload=plaintext,
        signatures=[
            self.messages.Signature(
                publicKeyId=public_key_id, signature=signature
            ),
        ],
    )
    occurrence = self.messages.Occurrence(
        kind=self.messages.Occurrence.KindValueValuesEnum.ATTESTATION,
        resourceUri=artifact_url,
        noteName=note_ref.RelativeName(),
        attestation=attestation,
    )
    request = self.messages.ContaineranalysisProjectsOccurrencesCreateRequest(
        parent=project_ref.RelativeName(),
        occurrence=occurrence,
    )

    if validation_callback is not None:
      validation_callback(occurrence)

    return self.client.projects_occurrences.Create(request)