HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/command_lib/code/secrets.py
# -*- coding: utf-8 -*- #
# Copyright 2021 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Library for the Secret Manager integration in the local environment."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

import base64
import re

from googlecloudsdk.api_lib.util import apis
from googlecloudsdk.command_lib.code import kubernetes
from googlecloudsdk.command_lib.run import secrets_mapping

SECRETS_MESSAGE_MODULE = apis.GetMessagesModule('secretmanager', 'v1')


class SecretManagerSecret(object):
  """A secret to be fetched from Secret Manager."""

  def __init__(self, name, versions, mapped_secret=None):
    self.name = name
    self.versions = versions
    self.mapped_secret = mapped_secret

  def __eq__(self, other):
    return (self.name == other.name and self.versions == other.versions and
            self.mapped_secret == other.mapped_secret)

  def __repr__(self):
    return '<Secret: (name="{}", versions={}, mapped_secret="{}")>'.format(
        self.name, self.versions, self.mapped_secret)

  def __hash__(self):
    return hash((self.name, self.versions, self.mapped_secret))


def BuildSecrets(project_name, secret_list, namespace, client=None):
  """Fetch secrets from Secret Manager and create k8s secrets with the data."""
  if client is None:
    client = _SecretsClient()
  secrets = []
  for secret in secret_list:
    secrets.append(
        _BuildSecret(client, project_name, secret.name, secret.mapped_secret,
                     secret.versions, namespace))
  return secrets


def _BuildSecret(client, project, secret_name, mapped_secret, versions,
                 namespace):
  """Build the k8s secret resource for minikube from Secret Manager data."""
  if secrets_mapping.SpecialVersion.MOUNT_ALL in versions:
    # TODO(b/187972361): Do we need to load all secret versions for the secret?
    raise ValueError('local development requires you to specify all secret '
                     'versions that you need to use.')
  secrets = {}
  for version in versions:
    secrets[version] = client.GetSecretData(project, secret_name, mapped_secret,
                                            version)
  return _BuildK8sSecret(secret_name, secrets, namespace)


def _BuildK8sSecret(secret_name, secrets, namespace):
  """Turn a map of SecretManager responses into a k8s secret."""
  data = {}
  for version, secret in secrets.items():
    data[version] = base64.b64encode(secret.payload.data).decode('ascii')
  metadata = {'name': secret_name}
  if namespace:
    metadata['namespace'] = namespace
  d = {'metadata': metadata, 'data': data}
  d['apiVersion'] = 'v1'
  d['kind'] = 'Secret'
  return d


def _DeleteSecrets(secret_map, namespace, context_name):
  kubernetes.DeleteResources('secret', list(secret_map.keys()), namespace,
                             context_name)


class _SecretsClient(object):
  """Client implementation for calling Secret Manager to fetch secrets."""

  def __init__(self):
    self.secrets_client = apis.GetClientInstance('secretmanager', 'v1')

  def GetSecretData(self, project, secret_name, mapped_secret, version):
    """Retrieve secret from secret manager."""
    if mapped_secret:
      if mapped_secret.startswith('projects/'):
        # mapping a cross-project secret.
        resource_name = '{}/versions/{}'.format(mapped_secret, version)
      else:
        # if we're mapping a local secret to a valid k8s name
        resource_name = 'projects/{}/secrets/{}/versions/{}'.format(
            project, mapped_secret, version)
    else:
      resource_name = 'projects/{}/secrets/{}/versions/{}'.format(
          project, secret_name, version)
    return self.secrets_client.projects_secrets_versions.Access(
        SECRETS_MESSAGE_MODULE
        .SecretmanagerProjectsSecretsVersionsAccessRequest(name=resource_name))


def IsValidK8sName(name):
  # k8s names must start and end with alphanumeric, only contain alphanumeric,
  # -, and ., and contain at most 253 characters
  return re.match(r'[a-z0-9]([a-z0-9\-\.]{0,251}[a-z0-9])?$', name)