HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/command_lib/functions/secrets_config.py
# -*- coding: utf-8 -*- #
# Copyright 2021 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utility for configuring and parsing secrets args."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import collections
import re
import textwrap

from googlecloudsdk.calliope import arg_parsers
from googlecloudsdk.calliope.arg_parsers import ArgumentTypeError
from googlecloudsdk.command_lib.util.args import map_util
from googlecloudsdk.core import log
import six

_SECRET_PATH_PATTERN = re.compile(
    '^(/+[a-zA-Z0-9-_.]*[a-zA-Z0-9-_]+)+'
    '((/*:(/*[a-zA-Z0-9-_.]*[a-zA-Z0-9-_]+)+)'
    '|(/+[a-zA-Z0-9-_.]*[a-zA-Z0-9-_]+))$'
)

_DEFAULT_PROJECT_SECRET_REF_PATTERN = re.compile(
    '^(?P<secret>[a-zA-Z0-9-_]+):(?P<version>[1-9][0-9]*|latest)$'
)
_SECRET_VERSION_RESOURCE_REF_PATTERN = re.compile(
    '^projects/([^/]+)/secrets/([a-zA-Z0-9-_]+)/versions/([1-9][0-9]*|latest)$'
)
_SECRET_VERSION_REF_PATTERN = re.compile(
    '^projects/(?P<project>[^/]+)/secrets/(?P<secret>[a-zA-Z0-9-_]+)'
    ':(?P<version>[1-9][0-9]*|latest)$'
)

_DEFAULT_PROJECT_IDENTIFIER = '*'

_SECRET_VERSION_SECRET_RESOURCE_PATTERN = re.compile(
    '^(?P<secret_resource>projects/[^/]+/secrets/[a-zA-Z0-9-_]+)'
    '/versions/(?P<version>[1-9][0-9]*|latest)$'
)
_SECRET_RESOURCE_PATTERN = re.compile(
    '^projects/(?P<project>[^/]+)/secrets/(?P<secret>[a-zA-Z0-9-_]+)$'
)


def _CanonicalizePath(secret_path):
  """Canonicalizes secret path to the form `/mount_path:/secret_file_path`.

  Gcloud secret path is more restrictive than the backend (shortn/_bwgb3xdRxL).
  Paths are reduced to their canonical forms before the request is made.

  Args:
    secret_path: Complete path to the secret.

  Returns:
    Canonicalized secret path.
  """
  secret_path = re.sub(r'/+', '/', secret_path)
  seperator = ':' if ':' in secret_path else '/'
  mount_path, _, secret_file_path = secret_path.rpartition(seperator)

  # Strip trailing / from mount path and beginning / from file path if present.
  mount_path = re.sub(r'/$', '', mount_path)
  secret_file_path = re.sub(r'^/', '', secret_file_path)

  return '{}:/{}'.format(mount_path, secret_file_path)


def _SecretsKeyType(key):
  """Validates and canonicalizes secrets key configuration.

  Args:
    key: Secrets key configuration.

  Returns:
    Canonicalized secrets key configuration.

  Raises:
    ArgumentTypeError: Secrets key configuration is not valid.
  """
  if not key.strip():
    raise ArgumentTypeError(
        'Secret environment variable names/secret paths cannot be empty.'
    )
  canonicalized_key = key
  if _SECRET_PATH_PATTERN.search(key):
    canonicalized_key = _CanonicalizePath(key)
  else:
    if '/' in key:
      log.warning(
          "'{}' will be interpreted as a secret environment variable "
          "name as it doesn't match the pattern for a secret path "
          "'/mount_path:/secret_file_path'.".format(key)
      )
    if key.startswith('X_GOOGLE_') or key in [
        'GOOGLE_ENTRYPOINT',
        'GOOGLE_FUNCTION_TARGET',
        'GOOGLE_RUNTIME',
        'GOOGLE_RUNTIME_VERSION',
    ]:
      raise ArgumentTypeError(
          "Secret environment variable name '{}' is reserved for internal "
          'use.'.format(key)
      )
  return canonicalized_key


def _CanonicalizeValue(value):
  """Canonicalizes secret value reference to the secret version resource name.

  Output format: `projects/{project}/secrets/{secret}/versions/{version}`.
  The project in the above reference will be * if the user used a default
  project secret.

  Args:
    value: Secret value reference as a string.

  Returns:
    Canonicalized secret value reference.
  """
  dp_secret_ref_match = _DEFAULT_PROJECT_SECRET_REF_PATTERN.search(value)
  secret_version_res_ref_match = _SECRET_VERSION_RESOURCE_REF_PATTERN.search(
      value
  )
  secret_version_ref_match = _SECRET_VERSION_REF_PATTERN.search(value)

  if dp_secret_ref_match:
    return 'projects/{project}/secrets/{secret}/versions/{version}'.format(
        project=_DEFAULT_PROJECT_IDENTIFIER,
        secret=dp_secret_ref_match.group('secret'),
        version=dp_secret_ref_match.group('version'),
    )
  elif secret_version_res_ref_match:
    return value
  elif secret_version_ref_match:
    return 'projects/{project}/secrets/{secret}/versions/{version}'.format(
        project=secret_version_ref_match.group('project'),
        secret=secret_version_ref_match.group('secret'),
        version=secret_version_ref_match.group('version'),
    )
  raise ArgumentTypeError(
      "Secrets value configuration must match the pattern 'SECRET:VERSION' or "
      "'projects/{{PROJECT}}/secrets/{{SECRET}}:{{VERSION}}' or "
      "'projects/{{PROJECT}}/secrets/{{SECRET}}/versions/{{VERSION}}' "
      "where VERSION is a number or the label 'latest' [{}]".format(value)
  )


def _SecretsValueType(value):
  """Validates secrets value configuration.

  The restrictions for gcloud are strict when compared to GCF to accommodate
  future changes without making it confusing for the user.

  Args:
    value: Secrets value configuration.

  Returns:
    Secrets value configuration as a string.

  Raises:
    ArgumentTypeError: Secrets value configuration is not valid.
  """
  if '=' in value:
    raise ArgumentTypeError(
        "Secrets value configuration cannot contain '=' [{}]".format(value)
    )
  return _CanonicalizeValue(value)


def _SecretsDiffer(project1, secret1, project2, secret2):
  """Returns true if the two secrets differ.

  The secrets can be considered as different if either the secret name is
  different or the project is different with the secret name being the same. If
  one project is represented using the project number and the other is
  represented using its project id, then it may not be possible to determine if
  the two projects are the same, so the validation is relaxed.

  Args:
    project1: Project ID or number of the first secret.
    secret1: Secret name of the first secret.
    project2: Project ID or number of the second secret.
    secret2: Secret name of the second secret.

  Returns:
    True if the two secrets differ, False otherwise.
  """
  return secret1 != secret2 or (
      project1 != project2
      and project1.isdigit() == project2.isdigit()
      and project1 != _DEFAULT_PROJECT_IDENTIFIER
      and project2 != _DEFAULT_PROJECT_IDENTIFIER
  )


def _ValidateSecrets(secrets_dict):
  """Additional secrets validations that require the entire dict.

  Args:
    secrets_dict: Secrets configuration dict to validate.
  """
  mount_path_to_secret = collections.defaultdict(list)
  for key, value in six.iteritems(secrets_dict):
    if _SECRET_PATH_PATTERN.search(key):
      mount_path = key.split(':')[0]
      secret_res1 = _SECRET_VERSION_SECRET_RESOURCE_PATTERN.search(value).group(
          'secret_resource'
      )

      if mount_path in mount_path_to_secret:
        secret_res_match1 = _SECRET_RESOURCE_PATTERN.search(secret_res1)
        project1 = secret_res_match1.group('project')
        secret1 = secret_res_match1.group('secret')

        for secret_res2 in mount_path_to_secret[mount_path]:
          secret_res_match2 = _SECRET_RESOURCE_PATTERN.search(secret_res2)
          project2 = secret_res_match2.group('project')
          secret2 = secret_res_match2.group('secret')

          if _SecretsDiffer(project1, secret1, project2, secret2):
            raise ArgumentTypeError(
                'More than one secret is configured for the mount path '
                "'{mount_path}' [violating secrets: {secret1},{secret2}]."
                .format(
                    mount_path=mount_path,
                    secret1=secret1
                    if project1 == _DEFAULT_PROJECT_IDENTIFIER
                    else secret_res1,
                    secret2=secret2
                    if project2 == _DEFAULT_PROJECT_IDENTIFIER
                    else secret_res2,
                )
            )
      else:
        mount_path_to_secret[mount_path].append(secret_res1)


class ArgSecretsDict(arg_parsers.ArgDict):
  """ArgDict customized for holding secrets configuration."""

  def __init__(
      self,
      key_type=None,
      value_type=None,
      spec=None,
      min_length=0,
      max_length=None,
      allow_key_only=False,
      required_keys=None,
      operators=None,
  ):
    """Initializes the base ArgDict by forwarding the parameters."""
    super(ArgSecretsDict, self).__init__(
        key_type=key_type,
        value_type=value_type,
        spec=spec,
        min_length=min_length,
        max_length=max_length,
        allow_key_only=allow_key_only,
        required_keys=required_keys,
        operators=operators,
    )

  def __call__(self, arg_value):  # pylint:disable=missing-docstring
    secrets_dict = collections.OrderedDict(
        sorted(six.iteritems(super(ArgSecretsDict, self).__call__(arg_value)))
    )
    _ValidateSecrets(secrets_dict)
    return secrets_dict


def ConfigureFlags(parser):
  """Add flags for configuring secret environment variables and secret volumes.

  Args:
    parser: Argument parser.
  """
  kv_metavar = (
      'SECRET_ENV_VAR=SECRET_VALUE_REF,'
      '/secret_path=SECRET_VALUE_REF,'
      '/mount_path:/secret_file_path=SECRET_VALUE_REF'
  )
  k_metavar = 'SECRET_ENV_VAR,/secret_path,/mount_path:/secret_file_path'

  flag_group = parser.add_mutually_exclusive_group()
  flag_group.add_argument(
      '--set-secrets',
      metavar=kv_metavar,
      action=arg_parsers.UpdateAction,
      type=ArgSecretsDict(
          key_type=_SecretsKeyType, value_type=_SecretsValueType
      ),
      help="""
      List of secret environment variables and secret volumes to configure.
      Existing secrets configuration will be overwritten.

      You can reference a secret value referred to as `SECRET_VALUE_REF` in the
      help text in the following ways.

      * Use `${SECRET}:${VERSION}` if you are referencing a secret in the same
        project, where `${SECRET}` is the name of the secret in secret manager
        (not the full resource name) and `${VERSION}` is the version of the
        secret which is either a `positive integer` or the label `latest`.
        For example, use `SECRET_FOO:1` to reference version `1` of the secret
        `SECRET_FOO` which exists in the same project as the function.

      * Use `projects/${PROJECT}/secrets/${SECRET}/versions/${VERSION}` or
        `projects/${PROJECT}/secrets/${SECRET}:${VERSION}` to reference a secret
        version using the full resource name, where `${PROJECT}` is either the
        project number (`preferred`) or the project ID of the project which
        contains the secret, `${SECRET}` is the name of the secret in secret
        manager (not the full resource name) and `${VERSION}` is the version of
        the secret which is either a `positive integer` or the label `latest`.
        For example, use `projects/1234567890/secrets/SECRET_FOO/versions/1` or
        `projects/project_id/secrets/SECRET_FOO/versions/1` to reference version
        `1` of the secret `SECRET_FOO` that exists in the project `1234567890`
        or `project_id` respectively.
        This format is useful when the secret exists in a different project.

      To configure the secret as an environment variable, use
      `SECRET_ENV_VAR=SECRET_VALUE_REF`. To use the value of the secret, read
      the environment variable `SECRET_ENV_VAR` as you would normally do in the
      function's programming language.

      We recommend using a `numeric` version for secret environment variables
      as any updates to the secret value are not reflected until new clones
      start.

      To mount the secret within a volume use `/secret_path=SECRET_VALUE_REF` or
      `/mount_path:/secret_file_path=SECRET_VALUE_REF`. To use the value of the
      secret, read the file at `/secret_path` as you would normally do in the
      function's programming language.

      For example, `/etc/secrets/secret_foo=SECRET_FOO:latest` or
      `/etc/secrets:/secret_foo=SECRET_FOO:latest` will make the value of the
      `latest` version of the secret `SECRET_FOO` available in a file
      `secret_foo` under the directory `/etc/secrets`. `/etc/secrets` will be
      considered as the `mount path` and will `not` be available for any other
      volume.

      We recommend referencing the `latest` version when using secret volumes so
      that the secret's value changes are reflected immediately.""",
  )

  update_remove_flag_group = flag_group.add_argument_group(
      help=textwrap.dedent(
          """\
      Only `--update-secrets` and `--remove-secrets` can be used together. If
      both are specified, then `--remove-secrets` will be applied first."""
      )
  )
  update_remove_flag_group.add_argument(
      '--update-secrets',
      metavar=kv_metavar,
      action=arg_parsers.UpdateAction,
      type=ArgSecretsDict(
          key_type=_SecretsKeyType, value_type=_SecretsValueType
      ),
      help="""
      List of secret environment variables and secret volumes to update.
      Existing secrets configuration not specified in this list will be
      preserved.""",
  )
  update_remove_flag_group.add_argument(
      '--remove-secrets',
      metavar=k_metavar,
      action=arg_parsers.UpdateAction,
      type=arg_parsers.ArgList(element_type=_SecretsKeyType),
      help="""
      List of secret environment variable names and secret paths to remove.

      Existing secrets configuration of secret environment variable names and
      secret paths not specified in this list will be preserved.

      To remove a secret environment variable, use the name of the environment
      variable `SECRET_ENV_VAR`.

      To remove a file within a secret volume or the volume itself, use the
      secret path as the key (either `/secret_path` or
      `/mount_path:/secret_file_path`).""",
  )

  flag_group.add_argument(
      '--clear-secrets',
      action='store_true',
      help='Remove all secret environment variables and volumes.',
  )


def IsArgsSpecified(args):
  """Returns true if at least one of the flags for secrets is specified.

  Args:
    args: Argparse namespace.

  Returns:
    True if at least one of the flags for secrets is specified.
  """
  secrets_flags = {
      '--set-secrets',
      '--update-secrets',
      '--remove-secrets',
      '--clear-secrets',
  }
  specified_flags = set(args.GetSpecifiedArgNames())
  return bool(specified_flags.intersection(secrets_flags))


def SplitSecretsDict(secrets_dict):
  """Splits the secrets dict into sorted ordered dicts for each secret type.

  Args:
    secrets_dict: Secrets configuration dict.

  Returns:
    A tuple (secret_env_vars, secret_volumes) of sorted ordered dicts for each
    secret type.
  """
  secret_volumes = {
      k: v
      for k, v in six.iteritems(secrets_dict)
      if _SECRET_PATH_PATTERN.search(k)
  }
  secret_env_vars = {
      k: v for k, v in six.iteritems(secrets_dict) if k not in secret_volumes
  }
  return (
      collections.OrderedDict(sorted(six.iteritems(secret_env_vars))),
      collections.OrderedDict(sorted(six.iteritems(secret_volumes))),
  )


def CanonicalizeKey(key):
  """Canonicalizes secrets configuration key.

  Args:
    key: Secrets configuration key.

  Returns:
    Canonicalized secrets configuration key.
  """
  if _SECRET_PATH_PATTERN.search(key):
    return _CanonicalizePath(key)
  return key


def _SubstituteDefaultProject(
    secret_version_ref, default_project_id, default_project_number
):
  """Replaces the default project number in place of * or project ID.

  Args:
    secret_version_ref: Secret value reference.
    default_project_id: The project ID of the project to which the function is
      deployed.
    default_project_number: The project number of the project to which the
      function is deployed.

  Returns:
    Secret value reference with * or project ID replaced by the default project.
  """
  return re.sub(
      r'projects/([*]|{project_id})/'.format(project_id=default_project_id),
      'projects/{project_number}/'.format(
          project_number=default_project_number
      ),
      secret_version_ref,
  )


def ApplyFlags(old_secrets, args, default_project_id, default_project_number):
  """Applies the current flags to existing secrets configuration.

  Args:
    old_secrets: Existing combined secrets configuration dict.
    args: Argparse namespace.
    default_project_id: The project ID of the project to which the function is
      deployed.
    default_project_number: The project number of the project to which the
      function is deployed.

  Returns:
    new_secrets: A new combined secrets configuration dict generated by
      applying the flags to the existing secrets configuration.

  Raises:
    ArgumentTypeError: Generated secrets configuration is invalid.
  """
  secret_flags = map_util.GetMapFlagsFromArgs('secrets', args)
  new_secrets = map_util.ApplyMapFlags(old_secrets, **secret_flags)

  new_secrets = {
      secrets_key: _SubstituteDefaultProject(
          secrets_value, default_project_id, default_project_number
      )
      for secrets_key, secrets_value in six.iteritems(new_secrets)
  }
  new_secrets = collections.OrderedDict(sorted(six.iteritems(new_secrets)))

  # Handles the case when the newly configured secrets could conflict with
  # existing secrets.
  _ValidateSecrets(new_secrets)
  return new_secrets