HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/api_lib/cloudbuild/cloudbuild_util.py
# -*- coding: utf-8 -*- #
# Copyright 2016 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilities for the cloudbuild API."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import enum
import re

from apitools.base.protorpclite import messages as proto_messages
from apitools.base.py import encoding as apitools_encoding
from googlecloudsdk.api_lib.cloudbuild import cloudbuild_exceptions
from googlecloudsdk.api_lib.util import apis
from googlecloudsdk.calliope import base
from googlecloudsdk.calliope import exceptions as c_exceptions
from googlecloudsdk.core import yaml
from googlecloudsdk.core.resource import resource_property
from googlecloudsdk.core.util import files

import six

_API_NAME = 'cloudbuild'
_GA_API_VERSION = 'v1'
_BETA_API_VERSION = 'v1beta1'

RELEASE_TRACK_TO_API_VERSION = {
    base.ReleaseTrack.GA: _GA_API_VERSION,
    base.ReleaseTrack.BETA: _GA_API_VERSION,
    base.ReleaseTrack.ALPHA: _GA_API_VERSION,
}

WORKERPOOL_NAME_MATCHER = r'projects/.*/locations/.*/workerPools/.*'
WORKERPOOL_NAME_SELECTOR = r'projects/.*/locations/.*/workerPools/(.*)'
WORKERPOOL_REGION_SELECTOR = r'projects/.*/locations/(.*)/workerPools/.*'

# Default for optionally-regional requests when the user does not specify.
DEFAULT_REGION = 'global'

BYTES_IN_ONE_GB = 2**30


class WorkerpoolTypes(enum.Enum):
  UNKNOWN = 0
  PRIVATE = 1
  HYBRID = 2


def GetMessagesModule(release_track=base.ReleaseTrack.GA):
  """Returns the messages module for Cloud Build.

  Args:
    release_track: The desired value of the enum
      googlecloudsdk.calliope.base.ReleaseTrack.

  Returns:
    Module containing the definitions of messages for Cloud Build.
  """
  return apis.GetMessagesModule(_API_NAME,
                                RELEASE_TRACK_TO_API_VERSION[release_track])


def GetClientClass(release_track=base.ReleaseTrack.GA):
  """Returns the client class for Cloud Build.

  Args:
    release_track: The desired value of the enum
      googlecloudsdk.calliope.base.ReleaseTrack.

  Returns:
    base_api.BaseApiClient, Client class for Cloud Build.
  """
  return apis.GetClientClass(_API_NAME,
                             RELEASE_TRACK_TO_API_VERSION[release_track])


def GetClientInstance(
    release_track=base.ReleaseTrack.GA,
    use_http=True,
    skip_activation_prompt=False,
):
  """Returns an instance of the Cloud Build client.

  Args:
    release_track: The desired value of the enum
      googlecloudsdk.calliope.base.ReleaseTrack.
    use_http: bool, True to create an http object for this client.
    skip_activation_prompt: bool, True to skip prompting for service activation.
      Should be used only if service activation was checked earlier in the
      command.

  Returns:
    base_api.BaseApiClient, An instance of the Cloud Build client.
  """
  return apis.GetClientInstance(
      _API_NAME,
      RELEASE_TRACK_TO_API_VERSION[release_track],
      no_http=(not use_http),
      skip_activation_prompt=skip_activation_prompt,
  )


def EncodeSubstitutions(substitutions, messages):
  if not substitutions:
    return None
  # Sort for tests
  return apitools_encoding.DictToAdditionalPropertyMessage(
      substitutions, messages.Build.SubstitutionsValue, sort_items=True)


def EncodeTriggerSubstitutions(substitutions, value_type):
  if not substitutions:
    return None
  substitution_properties = []
  for key, value in sorted(six.iteritems(substitutions)):  # Sort for tests
    substitution_properties.append(
        value_type.AdditionalProperty(key=key, value=value))
  return value_type(additionalProperties=substitution_properties)


def EncodeUpdatedTriggerSubstitutions(old_substitutions, substitutions,
                                      messages):
  """Encodes the trigger substitutions for the update command.

  Args:
    old_substitutions: The existing substitutions to be updated.
    substitutions: The substitutions to be added to the existing substitutions.
    messages: A Cloud Build messages module.

  Returns:
    The updated trigger substitutions.
  """
  if not substitutions:
    return old_substitutions
  substitution_map = {}
  if old_substitutions:
    for sub in old_substitutions.additionalProperties:
      substitution_map[sub.key] = sub.value

  for key, value in six.iteritems(substitutions):
    substitution_map[key] = value

  updated_substitutions = []
  for key, value in sorted(substitution_map.items()):  # Sort for tests.
    updated_substitutions.append(
        messages.BuildTrigger.SubstitutionsValue.AdditionalProperty(
            key=key, value=value
        )
    )

  return messages.BuildTrigger.SubstitutionsValue(
      additionalProperties=updated_substitutions
  )


def RemoveTriggerSubstitutions(
    old_substitutions, substitutions_to_be_removed, messages
):
  """Removes existing substitutions for the update command.

  Args:
    old_substitutions: The existing substitutions.
    substitutions_to_be_removed: The substitutions to be removed if exist.
    messages: A Cloud Build messages module.

  Returns:
    The updated trigger substitutions.
  """
  if not substitutions_to_be_removed:
    return None
  substitution_properties = []
  if old_substitutions:
    for sub in old_substitutions.additionalProperties:
      if sub.key not in substitutions_to_be_removed:
        substitution_properties.append(
            messages.BuildTrigger.SubstitutionsValue.AdditionalProperty(
                key=sub.key, value=sub.value
            )
        )
  if not substitution_properties:
    substitution_properties.append(
        messages.BuildTrigger.SubstitutionsValue.AdditionalProperty()
    )
  return messages.BuildTrigger.SubstitutionsValue(
      additionalProperties=substitution_properties
  )


def EncodeEmptyTriggerSubstitutions(messages):
  substitution_properties = [
      messages.BuildTrigger.SubstitutionsValue.AdditionalProperty()
  ]
  return messages.BuildTrigger.SubstitutionsValue(
      additionalProperties=substitution_properties
  )


def SnakeToCamelString(snake):
  """Change a snake_case string into a camelCase string.

  Args:
    snake: str, the string to be transformed.

  Returns:
    str, the transformed string.
  """
  parts = snake.split('_')
  if not parts:
    return snake

  # Handle snake with leading '_'s by collapsing them into the next part.
  # Legit field names will never look like this, but completeness of the
  # function is important.
  leading_blanks = 0
  for p in parts:
    if not p:
      leading_blanks += 1
    else:
      break
  if leading_blanks:
    parts = parts[leading_blanks:]
    if not parts:
      # If they were all blanks, then we over-counted by one because of split
      # behavior.
      return '_' * (leading_blanks - 1)
    parts[0] = '_' * leading_blanks + parts[0]

  return ''.join(parts[:1] + [s.capitalize() for s in parts[1:]])


def SnakeToCamel(msg, skip=None):
  """Recursively transform all keys and values from snake_case to camelCase.

  If a key is in skip, then its value is left alone.

  Args:
    msg: dict, list, or other. If 'other', the function returns immediately.
    skip: contains dict keys whose values should not have camel case applied.

  Returns:
    Same type as msg, except all strings that were snake_case are now CamelCase,
    except for the values of dict keys contained in skip.
  """
  if skip is None:
    skip = []
  if isinstance(msg, dict):
    return {
        SnakeToCamelString(key):
        (SnakeToCamel(val, skip) if key not in skip else val)
        for key, val in six.iteritems(msg)
    }
  elif isinstance(msg, list):
    return [SnakeToCamel(elem, skip) for elem in msg]
  else:
    return msg


def MessageToFieldPaths(msg):
  """Produce field paths from a message object.

  The result is used to create a FieldMask proto message that contains all field
  paths presented in the object.
  https://github.com/protocolbuffers/protobuf/blob/master/src/google/protobuf/field_mask.proto

  Args:
    msg: A user defined message object that extends the messages.Message class.
    https://github.com/google/apitools/blob/master/apitools/base/protorpclite/messages.py

  Returns:
    The list of field paths.
  """
  fields = []
  for field in msg.all_fields():
    v = msg.get_assigned_value(field.name)
    if field.repeated and not v:
      # Repeated field is initialized as an empty list.
      continue
    if v is not None:
      # ConvertToSnakeCase produces private_poolv1_config or hybrid_pool_config.
      if field.name == 'privatePoolV1Config':
        name = 'private_pool_v1_config'
      elif field.name == 'hybridPoolConfig':
        name = 'hybrid_pool_config'
      else:
        name = resource_property.ConvertToSnakeCase(field.name)
      if hasattr(v, 'all_fields'):
        # message has sub-messages, constructing subpaths.
        for f in MessageToFieldPaths(v):
          fields.append('{}.{}'.format(name, f))
      else:
        fields.append(name)
  return fields


def _UnpackCheckUnused(obj, msg_type):
  """Stuff a dict into a proto message, and fail if there are unused values.

  Args:
    obj: dict(), The structured data to be reflected into the message type.
    msg_type: type, The proto message type.

  Raises:
    ValueError: If there is an unused value in obj.

  Returns:
    Proto message, The message that was created from obj.
  """
  msg = apitools_encoding.DictToMessage(obj, msg_type)

  def _CheckForUnusedFields(obj):
    """Check for any unused fields in nested messages or lists."""
    if isinstance(obj, proto_messages.Message):
      unused_fields = obj.all_unrecognized_fields()
      if unused_fields:
        if len(unused_fields) > 1:
          # Because this message shows up in a dotted path, use braces.
          # eg .foo.bar.{x,y,z}
          unused_msg = '{%s}' % ','.join(sorted(unused_fields))
        else:
          # For single items, omit the braces.
          # eg .foo.bar.x
          unused_msg = unused_fields[0]
        raise ValueError('.%s: unused' % unused_msg)
      for used_field in obj.all_fields():
        try:
          field = getattr(obj, used_field.name)
          _CheckForUnusedFields(field)
        except ValueError as e:
          raise ValueError('.%s%s' % (used_field.name, e))
    if isinstance(obj, list):
      for i, item in enumerate(obj):
        try:
          _CheckForUnusedFields(item)
        except ValueError as e:
          raise ValueError('[%d]%s' % (i, e))

  _CheckForUnusedFields(msg)

  return msg


def LoadMessageFromStream(stream,
                          msg_type,
                          msg_friendly_name,
                          skip_camel_case=None,
                          path=None):
  """Load a proto message from a stream of JSON or YAML text.

  Args:
    stream: file-like object containing the JSON or YAML data to be decoded.
    msg_type: The protobuf message type to create.
    msg_friendly_name: A readable name for the message type, for use in error
      messages.
    skip_camel_case: Contains proto field names or map keys whose values should
      not have camel case applied.
    path: str or None. Optional path to be used in error messages.

  Raises:
    ParserError: If there was a problem parsing the stream as a dict.
    ParseProtoException: If there was a problem interpreting the stream as the
    given message type.

  Returns:
    Proto message, The message that got decoded.
  """
  if skip_camel_case is None:
    skip_camel_case = []
  # Turn the data into a dict
  try:
    structured_data = yaml.load(stream, file_hint=path)
  except yaml.Error as e:
    raise cloudbuild_exceptions.ParserError(path, e.inner_error)
  if not isinstance(structured_data, dict):
    raise cloudbuild_exceptions.ParserError(path,
                                            'Could not parse as a dictionary.')

  return _YamlToMessage(structured_data, msg_type, msg_friendly_name,
                        skip_camel_case, path)


def LoadMessagesFromStream(stream,
                           msg_type,
                           msg_friendly_name,
                           skip_camel_case=None,
                           path=None):
  """Load multiple proto message from a stream of JSON or YAML text.

  Args:
    stream: file-like object containing the JSON or YAML data to be decoded.
    msg_type: The protobuf message type to create.
    msg_friendly_name: A readable name for the message type, for use in error
      messages.
    skip_camel_case: Contains proto field names or map keys whose values should
      not have camel case applied.
    path: str or None. Optional path to be used in error messages.

  Raises:
    ParserError: If there was a problem parsing the stream.
    ParseProtoException: If there was a problem interpreting the stream as the
    given message type.

  Returns:
    Proto message list of the messages that got decoded.
  """
  if skip_camel_case is None:
    skip_camel_case = []
  # Turn the data into a dict
  try:
    structured_data = yaml.load_all(stream, file_hint=path)
  except yaml.Error as e:
    raise cloudbuild_exceptions.ParserError(path, e.inner_error)

  return [
      _YamlToMessage(item, msg_type, msg_friendly_name, skip_camel_case, path)
      for item in structured_data
  ]


def _YamlToMessage(structured_data,
                   msg_type,
                   msg_friendly_name,
                   skip_camel_case=None,
                   path=None):
  """Load a proto message from a file containing JSON or YAML text.

  Args:
    structured_data: Dict containing the decoded YAML data.
    msg_type: The protobuf message type to create.
    msg_friendly_name: A readable name for the message type, for use in error
      messages.
    skip_camel_case: Contains proto field names or map keys whose values should
      not have camel case applied.
    path: str or None. Optional path to be used in error messages.

  Raises:
    ParseProtoException: If there was a problem interpreting the file as the
    given message type.

  Returns:
    Proto message, The message that got decoded.
  """

  # Transform snake_case into camelCase.
  structured_data = SnakeToCamel(structured_data, skip_camel_case)

  # Then, turn the dict into a proto message.
  try:
    msg = _UnpackCheckUnused(structured_data, msg_type)
  except Exception as e:
    # Catch all exceptions here because a valid YAML can sometimes not be a
    # valid message, so we need to catch all errors in the dict to message
    # conversion.
    raise cloudbuild_exceptions.ParseProtoException(path, msg_friendly_name,
                                                    '%s' % e)

  return msg


def LoadMessageFromPath(path,
                        msg_type,
                        msg_friendly_name,
                        skip_camel_case=None):
  """Load a proto message from a file containing JSON or YAML text.

  Args:
    path: The path to a file containing the JSON or YAML data to be decoded.
    msg_type: The protobuf message type to create.
    msg_friendly_name: A readable name for the message type, for use in error
      messages.
    skip_camel_case: Contains proto field names or map keys whose values should
      not have camel case applied.

  Raises:
    files.MissingFileError: If the file does not exist.
    ParserError: If there was a problem parsing the file as a dict.
    ParseProtoException: If there was a problem interpreting the file as the
    given message type.

  Returns:
    Proto message, The message that got decoded.
  """
  with files.FileReader(path) as f:  # Returns user-friendly error messages
    return LoadMessageFromStream(f, msg_type, msg_friendly_name,
                                 skip_camel_case, path)


def LoadMessagesFromPath(path,
                         msg_type,
                         msg_friendly_name,
                         skip_camel_case=None):
  """Load a proto message from a file containing JSON or YAML text.

  Args:
    path: The path to a file containing the JSON or YAML data to be decoded.
    msg_type: The protobuf message type to create.
    msg_friendly_name: A readable name for the message type, for use in error
      messages.
    skip_camel_case: Contains proto field names or map keys whose values should
      not have camel case applied.

  Raises:
    files.MissingFileError: If the file does not exist.
    ParseProtoException: If there was a problem interpreting the file as the
    given message type.

  Returns:
    Proto message list of the messages that got decoded.
  """
  with files.FileReader(path) as f:  # Returns user-friendly error messages
    return LoadMessagesFromStream(f, msg_type, msg_friendly_name,
                                  skip_camel_case, path)


def IsWorkerPool(resource_name):
  """Determine if the provided full resource name is a worker pool.

  Args:
    resource_name: str, The string to test.

  Returns:
    bool, True if the string is a worker pool's full resource name.
  """
  return bool(re.match(WORKERPOOL_NAME_MATCHER, resource_name))


def WorkerPoolShortName(resource_name):
  """Get the name part of a worker pool's full resource name.

  For example, "projects/abc/locations/def/workerPools/ghi" returns "ghi".

  Args:
    resource_name: A worker pool's full resource name.

  Raises:
    ValueError: If the full resource name was not well-formatted.

  Returns:
    The worker pool's short name.
  """
  match = re.search(WORKERPOOL_NAME_SELECTOR, resource_name)
  if match:
    return match.group(1)
  raise ValueError('The worker pool resource name must match "%s"' %
                   (WORKERPOOL_NAME_MATCHER,))


def WorkerPoolRegion(resource_name):
  """Get the region part of a worker pool's full resource name.

  For example, "projects/abc/locations/def/workerPools/ghi" returns "def".

  Args:
    resource_name: str, A worker pool's full resource name.

  Raises:
    ValueError: If the full resource name was not well-formatted.

  Returns:
    str, The worker pool's region string.
  """
  match = re.search(WORKERPOOL_REGION_SELECTOR, resource_name)
  if match:
    return match.group(1)
  raise ValueError('The worker pool resource name must match "%s"' %
                   (WORKERPOOL_NAME_MATCHER,))


def GitHubEnterpriseConfigFromArgs(args, update=False):
  """Construct the GitHubEnterpriseConfig resource from the command line args.

  Args:
    args: An argparse namespace. All the arguments that were provided to this
        command invocation.
      update: bool, if the args are for an update.

  Returns:
    A populated GitHubEnterpriseConfig message.
  """
  messages = GetMessagesModule()

  ghe = messages.GitHubEnterpriseConfig()
  ghe.hostUrl = args.host_uri
  ghe.appId = args.app_id
  if args.webhook_key is not None:
    ghe.webhookKey = args.webhook_key
  if not update and args.peered_network is not None:
    ghe.peeredNetwork = args.peered_network
  if args.gcs_bucket is not None:
    gcs_location = messages.GCSLocation()
    gcs_location.bucket = args.gcs_bucket
    gcs_location.object = args.gcs_object
    if args.generation is not None:
      gcs_location.generation = args.generation
    ghe.appConfigJson = gcs_location
  else:
    secret_location = messages.GitHubEnterpriseSecrets()
    secret_location.privateKeyName = args.private_key_name
    secret_location.webhookSecretName = args.webhook_secret_name
    secret_location.oauthSecretName = args.oauth_secret_name
    secret_location.oauthClientIdName = args.oauth_client_id_name
    ghe.secrets = secret_location
  return ghe


def BitbucketServerConfigFromArgs(args, update=False):
  """Construct the BitbucketServer resource from the command line args.

  Args:
    args: an argparse namespace. All the arguments that were provided to this
      command invocation.
    update: bool, if the args are for an update.

  Returns:
    A populated BitbucketServerConfig message.
  """
  messages = GetMessagesModule()

  bbs = messages.BitbucketServerConfig()
  bbs.hostUri = args.host_uri
  bbs.username = args.user_name
  bbs.apiKey = args.api_key
  secret_location = messages.BitbucketServerSecrets()
  secret_location.adminAccessTokenVersionName = (
      args.admin_access_token_secret_version
  )
  secret_location.readAccessTokenVersionName = (
      args.read_access_token_secret_version
  )
  secret_location.webhookSecretVersionName = args.webhook_secret_secret_version
  if update or secret_location is not None:
    bbs.secrets = secret_location
  if not update:
    if args.peered_network is None and args.peered_network_ip_range is not None:
      raise c_exceptions.RequiredArgumentException(
          'peered-network-ip-range',
          (
              '--peered-network is required when specifying'
              ' --peered-network-ip-range.'
          ),
      )
    if args.peered_network is not None:
      bbs.peeredNetwork = args.peered_network
      bbs.peeredNetworkIpRange = args.peered_network_ip_range
  if args.IsSpecified('ssl_ca_file'):
    bbs.sslCa = args.ssl_ca_file
  return bbs


def GitLabConfigFromArgs(args):
  """Construct the GitLabConfig resource from the command line args.

  Args:
    args: an argparse namespace. All the arguments that were provided to this
      command invocation.

  Returns:
    A populated GitLabConfig message.
  """
  messages = GetMessagesModule()

  config = messages.GitLabConfig()
  config.username = args.user_name

  secrets = messages.GitLabSecrets()
  secrets.apiAccessTokenVersion = args.api_access_token_secret_version
  secrets.readAccessTokenVersion = args.read_access_token_secret_version
  secrets.webhookSecretVersion = args.webhook_secret_secret_version
  secrets.apiKeyVersion = args.api_key_secret_version
  if not _IsEmptyMessage(secrets):
    config.secrets = secrets

  enterprise_config = messages.GitLabEnterpriseConfig()
  enterprise_config.hostUri = args.host_uri
  service_directory_config = messages.ServiceDirectoryConfig()
  service_directory_config.service = args.service_directory_service
  enterprise_config.serviceDirectoryConfig = service_directory_config
  if args.IsSpecified('ssl_ca_file'):
    enterprise_config.sslCa = args.ssl_ca_file
  if not _IsEmptyMessage(enterprise_config):
    config.enterpriseConfig = enterprise_config

  return config


def _IsEmptyMessage(message):
  if message is None:
    return True

  message_dict = apitools_encoding.MessageToDict(message)
  return not any(message_dict.values())


def WorkerPoolIsSpecified(build_config):
  return (
      build_config is not None
      and build_config.options is not None
      and build_config.options.pool is not None
      and build_config.options.pool.name is not None
  )


def WorkerPoolConfigIsSpecified(build_config):
  return (
      build_config is not None
      and build_config.options is not None
      and build_config.options.pool is not None
      and build_config.options.pool.workerConfig is not None
  )


def BytesToGb(size):
  """Converts bytes to GB.

  Args:
    size: a size in GB  Does not require size to be a multiple of 1 GB unlike
      utils.BytesToGb from from googlecloudsdk.api_lib.compute

  Returns:
    size in bytes.
  """
  if not size:
    return None

  return size // BYTES_IN_ONE_GB