HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/394/lib/googlecloudsdk/api_lib/secrets/api.py
# -*- coding: utf-8 -*- #
# Copyright 2019 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Shared utilities to access the Google Secret Manager API."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

from apitools.base.py import exceptions as apitools_exceptions
from apitools.base.py import list_pager
from googlecloudsdk.api_lib.util import apis
from googlecloudsdk.calliope import base
from googlecloudsdk.command_lib.iam import iam_util


def GetClient(version=None):
  """Get the default client."""
  return apis.GetClientInstance('secretmanager', version or
                                apis.ResolveVersion('secretmanager'))


def GetMessages(version=None):
  """Get the default messages module."""
  return apis.GetMessagesModule('secretmanager', version or
                                apis.ResolveVersion('secretmanager'))


def _FormatUpdateMask(update_mask):
  return ','.join(update_mask)


def _MakeReplicationMessage(messages, policy, locations, keys):
  """Create a replication message from its components."""
  if not policy:
    return None
  replication = messages.Replication(automatic=messages.Automatic())
  if policy == 'automatic' and keys:
    replication = messages.Replication(
        automatic=messages.Automatic(
            customerManagedEncryption=messages.CustomerManagedEncryption(
                kmsKeyName=keys[0])))
  if policy == 'user-managed':
    replicas = []
    for i, location in enumerate(locations):
      if i < len(keys):
        replicas.append(
            messages.Replica(
                location=location,
                customerManagedEncryption=messages.CustomerManagedEncryption(
                    kmsKeyName=keys[i])))
      else:
        replicas.append(messages.Replica(location=locations[i]))

    replication = messages.Replication(
        userManaged=messages.UserManaged(replicas=replicas))
  return replication


class Client(object):
  """Base class for all clients."""

  def __init__(self, client=None, messages=None, api_version=None):
    self.client = client or GetClient(version=api_version)
    self.messages = messages or self.client.MESSAGES_MODULE


class Locations(Client):
  """High-level client for locations."""

  def __init__(self, client=None, messages=None, api_version=None):
    super(Locations, self).__init__(client, messages, api_version)
    self.service = self.client.projects_locations

  def Get(self, location_ref):
    """Get the location with the given name."""
    return self.service.Get(
        self.messages.SecretmanagerProjectsLocationsGetRequest(
            name=location_ref.RelativeName()))

  def ListWithPager(self, project_ref, limit):
    """List secrets returning a pager object."""
    request = self.messages.SecretmanagerProjectsLocationsListRequest(
        name=project_ref.RelativeName())

    return list_pager.YieldFromList(
        service=self.service,
        request=request,
        field='locations',
        limit=limit,
        batch_size_attribute='pageSize')


class Secrets(Client):
  """High-level client for secrets."""

  def __init__(self, client=None, messages=None, api_version=None):
    client = client or GetClient(version=api_version)
    super(Secrets, self).__init__(client, messages)
    self.service = self.client.projects_secrets

  def Create(
      self,
      secret_ref,
      policy,
      locations,
      labels,
      tags,
      expire_time=None,
      ttl=None,
      keys=None,
      next_rotation_time=None,
      rotation_period=None,
      topics=None,
      annotations=None,
      regional_kms_key_name=None,
      version_destroy_ttl=None,
      secret_location=None,
  ):
    """Create a secret."""
    keys = keys or []
    replication = _MakeReplicationMessage(self.messages, policy, locations,
                                          keys)
    topics_message_list = []
    if topics:
      for topic in topics:
        topics_message_list.append(self.messages.Topic(name=topic))
    new_annotations = self.messages.Secret.AnnotationsValue(
        additionalProperties=[])
    if annotations:
      for annotation_pair in annotations:
        new_annotations.additionalProperties.append(annotation_pair)

    rotation = None
    if next_rotation_time or rotation_period:
      rotation = self.messages.Rotation(
          nextRotationTime=next_rotation_time, rotationPeriod=rotation_period
      )

    customer_managed_encryption = None
    if regional_kms_key_name:
      customer_managed_encryption = self.messages.CustomerManagedEncryption(
          kmsKeyName=regional_kms_key_name
      )

      # For regional requests, replication should not be there.
      replication = None
    return self.service.Create(
        self.messages.SecretmanagerProjectsSecretsCreateRequest(
            parent=GetParentRelativeNameForSecret(secret_ref, secret_location),
            secretId=secret_ref.Name(),
            secret=self.messages.Secret(
                labels=labels,
                tags=tags,
                replication=replication,
                expireTime=expire_time,
                ttl=ttl,
                topics=topics_message_list,
                annotations=new_annotations,
                rotation=rotation,
                customerManagedEncryption=customer_managed_encryption,
                versionDestroyTtl=version_destroy_ttl,
            ),
        )
    )

  def Delete(self, secret_ref, etag=None, secret_location=None):
    """Delete a secret."""
    return self.service.Delete(
        self.messages.SecretmanagerProjectsSecretsDeleteRequest(
            etag=etag, name=GetRelativeName(secret_ref, secret_location)
        )
    )

  def Get(self, secret_ref, secret_location=None):
    """Get the secret with the given name."""
    return self.service.Get(
        self.messages.SecretmanagerProjectsSecretsGetRequest(
            name=GetRelativeName(secret_ref, secret_location)
        )
    )

  def GetOrNone(self, secret_ref, secret_location=None):
    """Attempt to get the secret, returning None if the secret does not exist."""
    try:
      return self.Get(secret_ref=secret_ref, secret_location=secret_location)
    except apitools_exceptions.HttpNotFoundError:
      return None

  def ListWithPager(self, project_ref, limit, request_filter=None):
    """List secrets returning a pager object."""
    request = self.messages.SecretmanagerProjectsSecretsListRequest(
        parent=project_ref.RelativeName(), filter=request_filter)

    return list_pager.YieldFromList(
        service=self.service,
        request=request,
        field='secrets',
        limit=limit,
        batch_size_attribute='pageSize')

  def AddVersion(self, secret_ref, data, data_crc32c, secret_location=None):
    """Adds a new version of an existing secret."""
    request = self.messages.SecretmanagerProjectsSecretsAddVersionRequest(
        parent=GetRelativeName(secret_ref, secret_location),
        addSecretVersionRequest=self.messages.AddSecretVersionRequest(
            payload=self.messages.SecretPayload(
                data=data, dataCrc32c=data_crc32c
            )
        ),
    )
    return self.service.AddVersion(request)

  def Update(
      self,
      secret_ref,
      labels,
      update_mask,
      etag=None,
      expire_time=None,
      ttl=None,
      topics=None,
      version_aliases=None,
      annotations=None,
      next_rotation_time=None,
      rotation_period=None,
      version_destroy_ttl=None,
      secret_location=None,
      regional_kms_key_name=None,
  ):
    """Update a secret."""

    rotation = None
    if next_rotation_time or rotation_period:
      rotation = self.messages.Rotation(
          nextRotationTime=next_rotation_time, rotationPeriod=rotation_period)

    topics_message_list = []

    if topics:
      for topic in topics:
        topics_message_list.append(self.messages.Topic(name=topic))
    new_version_aliases = self.messages.Secret.VersionAliasesValue(
        additionalProperties=[])
    if version_aliases:
      for version_alias_pair in version_aliases:
        new_version_aliases.additionalProperties.append(version_alias_pair)
    new_annotations = self.messages.Secret.AnnotationsValue(
        additionalProperties=[])
    if annotations:
      for annotation_pair in annotations:
        new_annotations.additionalProperties.append(annotation_pair)

    customer_managed_encryption = None
    if regional_kms_key_name:
      customer_managed_encryption = self.messages.CustomerManagedEncryption(
          kmsKeyName=regional_kms_key_name
      )
    return self.service.Patch(
        self.messages.SecretmanagerProjectsSecretsPatchRequest(
            name=GetRelativeName(secret_ref, secret_location),
            secret=self.messages.Secret(
                labels=labels,
                versionAliases=new_version_aliases,
                annotations=new_annotations,
                etag=etag,
                expireTime=expire_time,
                ttl=ttl,
                topics=topics_message_list,
                rotation=rotation,
                versionDestroyTtl=version_destroy_ttl,
                customerManagedEncryption=customer_managed_encryption,
            ),
            updateMask=_FormatUpdateMask(update_mask),
        )
    )

  def SetReplication(self, secret_ref, policy, locations, keys):
    """Set the replication policy on an existing secret.."""
    replication = _MakeReplicationMessage(
        self.messages, policy, locations, keys
    )
    return self.service.Patch(
        self.messages.SecretmanagerProjectsSecretsPatchRequest(
            name=secret_ref.RelativeName(),
            secret=self.messages.Secret(replication=replication),
            updateMask=_FormatUpdateMask(['replication']),
        )
    )

  def GetIamPolicy(self, resource_ref, secret_location=None):
    """Get iam policy request.

    Args:
      resource_ref: Multitype resource (regional or global secret resource)
      secret_location: location of the secret, None if global

    Returns:
      Operation response
    """
    # check the secret type
    if secret_location:
      self.service = self.client.projects_locations_secrets
      req = self.messages.SecretmanagerProjectsLocationsSecretsGetIamPolicyRequest(
          resource=GetRelativeName(resource_ref, secret_location),
          options_requestedPolicyVersion=iam_util.MAX_LIBRARY_IAM_SUPPORTED_VERSION,
      )
    else:
      self.service = self.client.projects_secrets
      req = self.messages.SecretmanagerProjectsSecretsGetIamPolicyRequest(
          options_requestedPolicyVersion=iam_util.MAX_LIBRARY_IAM_SUPPORTED_VERSION,
          resource=GetRelativeName(resource_ref, secret_location),
      )
    return self.service.GetIamPolicy(req)

  def SetIamPolicy(
      self, secret_ref, policy, update_mask=None, secret_location=None
  ):
    """Set iam policy request.

    Args:
      secret_ref: secret resource
      policy: policy to be set
      update_mask: update mask
      secret_location: location of the secret, None if global

    Returns:
      Operation response
    """
    # check the secret type
    if secret_location:
      self.service = self.client.projects_locations_secrets
      req = self.messages.SecretmanagerProjectsLocationsSecretsSetIamPolicyRequest(
          resource=GetRelativeName(secret_ref, secret_location),
          setIamPolicyRequest=self.messages.SetIamPolicyRequest(
              policy=policy, updateMask=update_mask
          ),
      )
    else:
      self.service = self.client.projects_secrets
      req = self.messages.SecretmanagerProjectsSecretsSetIamPolicyRequest(
          resource=GetRelativeName(secret_ref, secret_location),
          setIamPolicyRequest=self.messages.SetIamPolicyRequest(
              policy=policy, updateMask=update_mask
          ),
      )
    return self.service.SetIamPolicy(req)

  def AddIamPolicyBinding(
      self, resorce_ref, member, role, condition=None, secret_location=None
  ):
    """Add iam policy binding request."""
    policy = self.GetIamPolicy(resorce_ref, secret_location=secret_location)
    policy.version = iam_util.MAX_LIBRARY_IAM_SUPPORTED_VERSION
    iam_util.AddBindingToIamPolicyWithCondition(
        self.messages.Binding,
        self.messages.Expr,
        policy,
        member,
        role,
        condition=condition,
    )
    return self.SetIamPolicy(
        resorce_ref, policy, secret_location=secret_location
    )

  def RemoveIamPolicyBinding(
      self, resorce_ref, member, role, condition=None, secret_location=None
  ):
    """Remove iam policy binding request."""
    policy = self.GetIamPolicy(resorce_ref, secret_location=secret_location)
    policy.version = iam_util.MAX_LIBRARY_IAM_SUPPORTED_VERSION
    iam_util.RemoveBindingFromIamPolicyWithCondition(
        policy,
        member,
        role,
        condition=condition,
    )
    return self.SetIamPolicy(
        resorce_ref, policy, secret_location=secret_location
    )


class SecretsLatest(Client):
  """High-level client for latest secrets."""

  def __init__(self, client=None, messages=None, api_versions=None):
    super(SecretsLatest, self).__init__(client, messages, api_versions)
    self.service = self.client.projects_secrets_latest

  def Access(self, secret_ref, secret_location=None):
    """Access the latest version of a secret."""
    return self.service.Access(
        self.messages.SecretmanagerProjectsSecretsLatestAccessRequest(
            name=GetRelativeName(secret_ref, secret_location)
        )
    )


class Versions(Client):
  """High-level client for secret versions."""

  def __init__(self, client=None, messages=None, api_version=None):
    super(Versions, self).__init__(client, messages, api_version)
    self.service = self.client.projects_secrets_versions

  def Access(self, version_ref, secret_location=None):
    """Access a specific version of a secret."""
    return self.service.Access(
        self.messages.SecretmanagerProjectsSecretsVersionsAccessRequest(
            name=GetRelativeName(version_ref, secret_location)
        )
    )

  def Destroy(self, version_ref, etag=None, secret_location=None):
    """Destroy a secret version."""
    destroy_secret_version_request = self.messages.DestroySecretVersionRequest(
        etag=etag)
    return self.service.Destroy(
        self.messages.SecretmanagerProjectsSecretsVersionsDestroyRequest(
            destroySecretVersionRequest=destroy_secret_version_request,
            name=GetRelativeName(version_ref, secret_location),
        )
    )

  def Disable(self, version_ref, etag=None, secret_location=None):
    """Disable a secret version."""
    disable_secret_version_request = self.messages.DisableSecretVersionRequest(
        etag=etag)
    return self.service.Disable(
        self.messages.SecretmanagerProjectsSecretsVersionsDisableRequest(
            disableSecretVersionRequest=disable_secret_version_request,
            name=GetRelativeName(version_ref, secret_location),
        )
    )

  def Enable(self, version_ref, etag=None, secret_location=None):
    """Enable a secret version."""
    enable_secret_version_request = self.messages.EnableSecretVersionRequest(
        etag=etag)
    return self.service.Enable(
        self.messages.SecretmanagerProjectsSecretsVersionsEnableRequest(
            enableSecretVersionRequest=enable_secret_version_request,
            name=GetRelativeName(version_ref, secret_location),
        )
    )

  def Get(self, version_ref, secret_location=None):
    """Get the secret version with the given name."""
    return self.service.Get(
        self.messages.SecretmanagerProjectsSecretsVersionsGetRequest(
            name=GetRelativeName(version_ref, secret_location)
        )
    )

  def List(self, secret_ref, limit, secret_location=None):
    """List secrets and return an array."""
    request = self.messages.SecretmanagerProjectsSecretsVersionsListRequest(
        parent=GetRelativeName(secret_ref, secret_location), pageSize=limit
    )
    return self.service.List(request)

  def ListWithPager(
      self, secret_ref, limit, request_filter=None, secret_location=None
  ):
    """List secrets returning a pager object."""
    request = self.messages.SecretmanagerProjectsSecretsVersionsListRequest(
        parent=GetRelativeName(secret_ref, secret_location),
        filter=request_filter,
        pageSize=0,
    )
    return list_pager.YieldFromList(
        service=self.service,
        request=request,
        field='versions',
        limit=limit,
        batch_size=0,
        batch_size_attribute='pageSize')


def GetApiFromTrack(track):
  """Returns api version based on the track."""
  if track == base.ReleaseTrack.BETA:
    return 'v1beta2'
  elif track == base.ReleaseTrack.GA:
    return 'v1'


def GetRelativeName(resource, location):
  if location:
    resource_uri = resource.RelativeName()
    split = resource_uri.split('/')
    return '/'.join(split[:2]) + f'/locations/{location}/' + '/'.join(split[2:])
  return resource.RelativeName()


def GetParentRelativeNameForSecret(resource, location):
  if location:
    resource_uri = resource.Parent().RelativeName()
    return resource_uri + f'/locations/{location}'
  return resource.Parent().RelativeName()