File: //snap/google-cloud-cli/current/lib/googlecloudsdk/api_lib/container/binauthz/containeranalysis.py
# -*- coding: utf-8 -*- #
# Copyright 2018 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Helper functions for interacting with the binauthz API."""
from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals
from apitools.base.py import list_pager
from googlecloudsdk.api_lib.util import apis
API_NAME = 'containeranalysis'
V1_ALPHA1 = 'v1alpha1'
V1_BETA1 = 'v1beta1'
V1 = 'v1'
DEFAULT_VERSION = V1
class Client(object):
"""A client to access containeranalysis for binauthz purposes."""
def __init__(self, api_version=None):
"""Creates a ContainerAnalysisClient.
Args:
api_version: The containeranalysis API version to use.
"""
self.api_version = api_version or DEFAULT_VERSION
self.client = apis.GetClientInstance(API_NAME, self.api_version)
self.messages = apis.GetMessagesModule(API_NAME, self.api_version)
def YieldAttestations(
self,
note_ref=None,
project_ref=None,
artifact_digest=None,
page_size=None,
limit=None,
):
"""Yields occurrences associated with a given attestor Note or Project.
Args:
note_ref: The Note reference that will be queried for attached
occurrences. If None, then all occurrences from the given project will
be listed. (containeranalysis.projects.notes Resource)
project_ref: The Project referenece that will be queried for occurrences
if note_ref is None.
artifact_digest: Digest of the artifact for which to fetch occurrences. If
None, then all occurrences attached to the AA Note are returned.
page_size: The number of attestations to retrieve per request. (If None,
use the default page size.)
limit: The maxium number of attestations to retrieve. (If None,
unlimited.)
Yields:
Occurrences bound to `note_ref` with matching `artifact_digest` (if
passed).
"""
artifact_filter = (
'has_suffix(resourceUrl, "{}")'.format(artifact_digest)
if artifact_digest is not None
else ''
)
if note_ref is None:
service = self.client.projects_occurrences
request = self.messages.ContaineranalysisProjectsOccurrencesListRequest(
parent=project_ref.RelativeName(), filter=artifact_filter
)
else:
service = self.client.projects_notes_occurrences
request = (
self.messages.ContaineranalysisProjectsNotesOccurrencesListRequest(
name=note_ref.RelativeName(), filter=artifact_filter
)
)
occurrence_iter = list_pager.YieldFromList(
service=service,
request=request,
field='occurrences',
batch_size=page_size or 100, # Default batch_size.
batch_size_attribute='pageSize',
limit=limit,
)
# TODO(b/69380601): This should be handled by the filter parameter to
# ListNoteOccurrences, but filtering isn't implemented yet for the fields
# we care about.
def MatchesFilter(occurrence):
if (
occurrence.kind
!= self.messages.Occurrence.KindValueValuesEnum.ATTESTATION
):
return False
return True
for occurrence in occurrence_iter:
if MatchesFilter(occurrence):
yield occurrence
def CreateAttestationOccurrence(
self,
note_ref,
project_ref,
artifact_url,
public_key_id,
signature,
plaintext,
validation_callback=None,
):
"""Creates an Occurrence referencing given URL and Note.
This only supports the AttestationOccurrence-type Occurrence, currently only
present in the v1 version of the API.
Args:
note_ref: The Note reference that the created Occurrence will be bound to.
(containeranalysis.projects.notes Resource)
project_ref: The project ref where the Occurrence will be created.
(cloudresourcemanager.projects Resource)
artifact_url: URL of artifact to which the signature is associated.
(string)
public_key_id: The URI of the public key that will be used to verify the
signature. (string)
signature: The content artifact's signature as generated by the specified
key's signing operation. (string)
plaintext: The content that was signed. (string)
validation_callback: If provided, a function to validate the Occurrence
prior to creation. (Callable[[Occurrence], None])
Returns:
Created Occurrence.
"""
assert self.api_version == V1
attestation = self.messages.AttestationOccurrence(
serializedPayload=plaintext,
signatures=[
self.messages.Signature(
publicKeyId=public_key_id, signature=signature
),
],
)
occurrence = self.messages.Occurrence(
kind=self.messages.Occurrence.KindValueValuesEnum.ATTESTATION,
resourceUri=artifact_url,
noteName=note_ref.RelativeName(),
attestation=attestation,
)
request = self.messages.ContaineranalysisProjectsOccurrencesCreateRequest(
parent=project_ref.RelativeName(),
occurrence=occurrence,
)
if validation_callback is not None:
validation_callback(occurrence)
return self.client.projects_occurrences.Create(request)