File: //snap/google-cloud-cli/396/lib/googlecloudsdk/api_lib/container/binauthz/containeranalysis.py
# -*- coding: utf-8 -*- #
# Copyright 2018 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Helper functions for interacting with the binauthz API."""
from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals
from apitools.base.py import list_pager
from googlecloudsdk.api_lib.util import apis
API_NAME = 'containeranalysis'
V1_ALPHA1 = 'v1alpha1'
V1_BETA1 = 'v1beta1'
V1 = 'v1'
DEFAULT_VERSION = V1
class Client(object):
  """A client to access containeranalysis for binauthz purposes."""
  def __init__(self, api_version=None):
    """Creates a ContainerAnalysisClient.
    Args:
      api_version: The containeranalysis API version to use.
    """
    self.api_version = api_version or DEFAULT_VERSION
    self.client = apis.GetClientInstance(API_NAME, self.api_version)
    self.messages = apis.GetMessagesModule(API_NAME, self.api_version)
  def YieldAttestations(
      self,
      note_ref=None,
      project_ref=None,
      artifact_digest=None,
      page_size=None,
      limit=None,
  ):
    """Yields occurrences associated with a given attestor Note or Project.
    Args:
      note_ref: The Note reference that will be queried for attached
        occurrences. If None, then all occurrences from the given project will
        be listed. (containeranalysis.projects.notes Resource)
      project_ref: The Project referenece that will be queried for occurrences
        if note_ref is None.
      artifact_digest: Digest of the artifact for which to fetch occurrences. If
        None, then all occurrences attached to the AA Note are returned.
      page_size: The number of attestations to retrieve per request. (If None,
        use the default page size.)
      limit: The maxium number of attestations to retrieve. (If None,
        unlimited.)
    Yields:
      Occurrences bound to `note_ref` with matching `artifact_digest` (if
      passed).
    """
    artifact_filter = (
        'has_suffix(resourceUrl, "{}")'.format(artifact_digest)
        if artifact_digest is not None
        else ''
    )
    if note_ref is None:
      service = self.client.projects_occurrences
      request = self.messages.ContaineranalysisProjectsOccurrencesListRequest(
          parent=project_ref.RelativeName(), filter=artifact_filter
      )
    else:
      service = self.client.projects_notes_occurrences
      request = (
          self.messages.ContaineranalysisProjectsNotesOccurrencesListRequest(
              name=note_ref.RelativeName(), filter=artifact_filter
          )
      )
    occurrence_iter = list_pager.YieldFromList(
        service=service,
        request=request,
        field='occurrences',
        batch_size=page_size or 100,  # Default batch_size.
        batch_size_attribute='pageSize',
        limit=limit,
    )
    # TODO(b/69380601): This should be handled by the filter parameter to
    # ListNoteOccurrences, but filtering isn't implemented yet for the fields
    # we care about.
    def MatchesFilter(occurrence):
      if (
          occurrence.kind
          != self.messages.Occurrence.KindValueValuesEnum.ATTESTATION
      ):
        return False
      return True
    for occurrence in occurrence_iter:
      if MatchesFilter(occurrence):
        yield occurrence
  def CreateAttestationOccurrence(
      self,
      note_ref,
      project_ref,
      artifact_url,
      public_key_id,
      signature,
      plaintext,
      validation_callback=None,
  ):
    """Creates an Occurrence referencing given URL and Note.
    This only supports the AttestationOccurrence-type Occurrence, currently only
    present in the v1 version of the API.
    Args:
      note_ref: The Note reference that the created Occurrence will be bound to.
        (containeranalysis.projects.notes Resource)
      project_ref: The project ref where the Occurrence will be created.
        (cloudresourcemanager.projects Resource)
      artifact_url: URL of artifact to which the signature is associated.
        (string)
      public_key_id: The URI of the public key that will be used to verify the
        signature. (string)
      signature: The content artifact's signature as generated by the specified
        key's signing operation. (string)
      plaintext: The content that was signed. (string)
      validation_callback: If provided, a function to validate the Occurrence
        prior to creation. (Callable[[Occurrence], None])
    Returns:
      Created Occurrence.
    """
    assert self.api_version == V1
    attestation = self.messages.AttestationOccurrence(
        serializedPayload=plaintext,
        signatures=[
            self.messages.Signature(
                publicKeyId=public_key_id, signature=signature
            ),
        ],
    )
    occurrence = self.messages.Occurrence(
        kind=self.messages.Occurrence.KindValueValuesEnum.ATTESTATION,
        resourceUri=artifact_url,
        noteName=note_ref.RelativeName(),
        attestation=attestation,
    )
    request = self.messages.ContaineranalysisProjectsOccurrencesCreateRequest(
        parent=project_ref.RelativeName(),
        occurrence=occurrence,
    )
    if validation_callback is not None:
      validation_callback(occurrence)
    return self.client.projects_occurrences.Create(request)