HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/core/transport.py
# -*- coding: utf-8 -*- #
# Copyright 2020 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Module for common transport utilities, such as request wrapping."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import abc
import os
import platform
import re
import time
import uuid

from googlecloudsdk.calliope import exceptions
from googlecloudsdk.core import config
from googlecloudsdk.core import log
from googlecloudsdk.core import metrics
from googlecloudsdk.core import properties
from googlecloudsdk.core.console import console_attr
from googlecloudsdk.core.console import console_io
from googlecloudsdk.core.util import platforms
import six
from six.moves import urllib
from six.moves import zip  # pylint: disable=redefined-builtin

ENCODING = None if six.PY2 else 'utf-8'
INVOCATION_ID = uuid.uuid4().hex

TOKEN_URIS = [
    'https://accounts.google.com/o/oauth2/token',
    'https://www.googleapis.com/oauth2/v3/token',
    'https://www.googleapis.com/oauth2/v4/token',
    'https://oauth2.googleapis.com/token',
    'https://oauth2.googleapis.com/oauth2/v4/token'
]


class Request(six.with_metaclass(abc.ABCMeta, object)):
  """Encapsulates parameters for making a general HTTP request.

  Attributes:
    uri: URI of the HTTP resource.
    method: HTTP method to perform, such as GET, POST, DELETE, etc.
    headers: Additional headers to include in the request.
    body: Body of the request.
  """

  def __init__(self, uri, method, headers, body):
    """Instantiates a Request object.

    Args:
      uri: URI of the HTTP resource.
      method: HTTP method to perform, such as GET, POST, DELETE, etc.
      headers: Additional headers to include in the request.
      body: Body of the request.

    Returns:
      Request
    """
    self.uri = uri
    self.method = method
    self.headers = headers
    self.body = body

  @classmethod
  @abc.abstractmethod
  def FromRequestArgs(cls, *args, **kwargs):
    """Returns a Request object.

    Args:
      *args: args to be passed into http.request
      **kwargs: dictionary of kwargs to be passed into http.request

    Returns:
      Request
    """

  @abc.abstractmethod
  def ToRequestArgs(self):
    """Returns the args and kwargs to be used when calling http.request."""


class Response(six.with_metaclass(abc.ABCMeta, object)):
  """Encapsulates responses from making a general HTTP request.

  Attributes:
    status_code:
    headers: Headers of the response.
    body: Body of the response.
  """

  def __init__(self, status_code, headers, body):
    """Instantiates a Response object.

    Args:
      status_code:
      headers: Headers of the response.
      body: Body of the response.

    Returns:
      Response
    """
    self.status_code = status_code
    self.headers = headers
    self.body = body

  @classmethod
  @abc.abstractmethod
  def FromResponse(cls, response):
    """Returns a Response object.

    Args:
      response: raw response from calling http.request.

    Returns:
      Response
    """


class RequestWrapper(six.with_metaclass(abc.ABCMeta, object)):
  """Class for wrapping http requests.

  The general process is that you can define a series of handlers that get
  executed before and after the original http request you are mapping. All the
  request handlers are executed in the order provided. Request handlers must
  return a result that is used when invoking the corresponding response handler.
  Request handlers don't actually execute the request but rather just modify the
  request arguments. After all request handlers are executed, the original http
  request is executed. Finally, all response handlers are executed in order,
  getting passed both the http response as well as the result from their
  corresponding request handler.

  Attributes:
    request_class: Class used to represent a generic HTTP request.
    response_class: Class used to represent a generic HTTP request.
  """
  request_class = Request
  response_class = Response

  @abc.abstractmethod
  def DecodeResponse(self, response, response_encoding):
    """Decodes the response body according to response_encoding."""

  def WrapWithDefaults(self,
                       http_client,
                       response_encoding=None,
                       streaming_response_body=False,
                       redact_request_body_reason=None):
    """Wraps request with user-agent, and trace reporting.

    Args:
      http_client: The original http client to be wrapped.
      response_encoding: str, the encoding to use to decode the response.
      streaming_response_body: bool, True indicates that the response body will
          be a streaming body.
      redact_request_body_reason: str, the reason why the request body must be
          redacted if --log-http is used. If None, the body is not redacted.

    Returns:
      http, The same http object but with the request method wrapped.
    """
    gcloud_ua = MakeUserAgentString(
        properties.VALUES.metrics.command_name.Get())
    handlers = [
        Handler(RecordStartTime(), ReportDuration()),
        # TODO(b/160008076): The CLOUDSDK_USER_AGENT prefix may already be
        # provided by the upper stack before the plan of b/160008076 is
        # finished. The transport will not duplicate this string if this is the
        # case. Update MaybePrependToHeader() to PrependToHeader() in the step 4
        # of b/160008076.
        # More details in the 'Plan of record' section of b/160008076.
        Handler(MaybePrependToHeader('user-agent', config.CLOUDSDK_USER_AGENT)),
        Handler(AppendToHeader('user-agent', gcloud_ua))
    ]

    trace_value = properties.VALUES.core.trace_token.Get()
    if trace_value:
      handlers.append(Handler(SetHeader('Cookie', trace_value)))

    request_reason = properties.VALUES.core.request_reason.Get()
    if request_reason:
      handlers.append(
          Handler(SetHeader('X-Goog-Request-Reason', request_reason))
      )

    request_org_restriction_headers = (
        properties.VALUES.resource_policy.org_restriction_header.Get()
    )
    if request_org_restriction_headers:
      handlers.append(
          Handler(
              SetHeader(
                  'X-Goog-Allowed-Resources', request_org_restriction_headers
              )
          )
      )

    # Do this one last so that it sees the effects of the other modifiers.
    if properties.VALUES.core.log_http.GetBool():
      redact_token = properties.VALUES.core.log_http_redact_token.GetBool()
      show_request_body = (
          properties.VALUES.core.log_http_show_request_body.GetBool()
      )
      handlers.append(
          Handler(
              LogRequest(
                  redact_token,
                  redact_request_body_reason if not show_request_body else None,
              ),
              LogResponse(streaming_response_body),
          )
      )
    if (
        'CLOUDSDK_CORE_DRY_RUN' in os.environ  # pylint: disable=undefined-variable
        and os.environ['CLOUDSDK_CORE_DRY_RUN'] == '1'
    ):
      handlers.append(
          Handler(
              LogRequestDryRun(),
              lambda *args: None,
          )
      )

    self.WrapRequest(http_client, handlers, response_encoding=response_encoding)
    return http_client

  def WrapRequest(
      self,
      http_client,
      handlers,
      exc_handler=None,
      exc_type=Exception,
      response_encoding=None,
  ):
    """Wraps an http client with request modifiers.

    Args:
      http_client: The original http client to be wrapped.
      handlers: [Handler], The handlers to execute before and after the original
        request.
      exc_handler: f(e), A function that takes an exception and handles it. It
        should also throw an exception if you don't want it to be swallowed.
      exc_type: The type of exception that should be caught and given to the
        handler. It could be a tuple to catch more than one exception type.
      response_encoding: str, the encoding to use to decode the response.
    """
    orig_request = http_client.request

    def WrappedRequest(*args, **kwargs):
      """Replacement http_client.request() method."""
      handler_request = self.request_class.FromRequestArgs(*args, **kwargs)

      # Encode request headers
      headers = {h: v for h, v in six.iteritems(handler_request.headers)}
      handler_request.headers = {}
      for h, v in six.iteritems(headers):
        h, v = _EncodeHeader(h, v)
        handler_request.headers[h] = v

      modifier_data = []
      for handler in handlers:
        modifier_result = handler.request(handler_request)
        modifier_data.append(modifier_result)

      try:
        modified_args, modified_kwargs = handler_request.ToRequestArgs()
        response = orig_request(*modified_args, **modified_kwargs)
      except exc_type as e:  # pylint: disable=broad-except
        response = None
        if exc_handler:
          exc_handler(e)
          return
        else:
          raise

      if response_encoding is not None:
        response = self.DecodeResponse(response, response_encoding)

      handler_response = self.response_class.FromResponse(response)
      for handler, data in zip(handlers, modifier_data):
        if handler.response:
          handler.response(handler_response, data)

      return response

    http_client.request = WrappedRequest


class Handler(object):
  """A holder object for a pair of request and response handlers.

  Request handlers are invoked before the original http request, response
  handlers are invoked after.
  """

  def __init__(self, request, response=None):
    """Creates a new Handler.

    Args:
      request: f(request) -> data, A function that gets called before the
        original http request gets called. It is passed a Request object that
        encapsulates the parameters of an http request. It returns data to be
        passed to its corresponding response hander.
      response: f(response, data), A function that gets called after the
        original http request. It is passed a Response object that encapsulates
        the response of an http request as well as whatever the request handler
        returned as data.
    """
    self.request = request
    self.response = response


def _EncodeHeader(header, value):
  if isinstance(header, six.text_type):
    header = header.encode('utf-8')
  if isinstance(value, six.text_type):
    value = value.encode('utf-8')
  return header, value


def MaybePrependToHeader(header, value):
  """Prepends the given value if the existing header does not start with it.

  Args:
    header: str, The name of the header to prepend to.
    value: str, The value to prepend to the existing header value.

  Returns:
    A function that can be used in a Handler.request.
  """
  header, value = _EncodeHeader(header, value)

  def _MaybePrependToHeader(request):
    """Maybe prepends a value to a header on a request."""
    headers = request.headers
    current_value = b''
    for hdr, v in six.iteritems(headers):
      if hdr.lower() == header.lower():
        current_value = v
        del headers[hdr]
        break

    if not current_value.startswith(value):
      current_value = (value + b' ' + current_value).strip()
    headers[header] = current_value

  return _MaybePrependToHeader


def AppendToHeader(header, value):
  """Appends the given value to the existing value in the http request.

  Args:
    header: str, The name of the header to append to.
    value: str, The value to append to the existing header value.

  Returns:
    A function that can be used in a Handler.request.
  """
  header, value = _EncodeHeader(header, value)

  def _AppendToHeader(request):
    """Appends a value to a header on a request."""
    headers = request.headers
    current_value = b''
    for hdr, v in six.iteritems(headers):
      if hdr.lower() == header.lower():
        current_value = v
        del headers[hdr]
        break

    headers[header] = ((current_value + b' ' +
                        value).strip() if current_value else value)

  return _AppendToHeader


def SetHeader(header, value):
  """Sets the given header value in the http request.

  Args:
    header: str, The name of the header to set to.
    value: str, The new value of the header.

  Returns:
    A function that can be used in a Handler.request.
  """
  header, value = _EncodeHeader(header, value)

  def _SetHeader(request):
    """Sets a header on a request."""
    headers = request.headers
    for hdr in six.iterkeys(headers):
      if hdr.lower() == header.lower():
        del headers[hdr]
        break

    headers[header] = value

  return _SetHeader


def AddQueryParam(param, value):
  """Adds the given query parameter to the http request.

  Args:
    param: str, The name of the parameter.
    value: str, The value of the parameter.

  Returns:
    A function that can be used in a Handler.request.
  """

  def _AddQueryParam(request):
    """Sets a query parameter on a request."""
    url_parts = urllib.parse.urlsplit(request.uri)
    query_params = urllib.parse.parse_qs(url_parts.query)
    query_params[param] = value
    # Need to do this to convert a SplitResult into a list so it can be
    # modified.
    url_parts = list(url_parts)
    # pylint:disable=redundant-keyword-arg, this is valid syntax for this lib
    url_parts[3] = urllib.parse.urlencode(query_params, doseq=True)

    # pylint:disable=too-many-function-args, This is just bogus.
    new_url = urllib.parse.urlunsplit(url_parts)
    request.uri = new_url

  return _AddQueryParam


def LogRequest(redact_token=True, redact_request_body_reason=None):
  """Logs the contents of the http request.

  Args:
    redact_token: bool, True to redact auth tokens.
    redact_request_body_reason: str, the reason why the request body must be
        redacted if --log-http is used. If None, the body is not redacted.

  Returns:
    A function that can be used in a Handler.request.
  """

  def _LogRequest(request):
    """Logs a request."""
    uri = request.uri
    method = request.method
    headers = request.headers
    body = request.body or ''

    # If set, these prevent the printing of the http body and replace it with
    # the reason the body is not being printed.
    redact_req_body_reason = None
    redact_resp_body_reason = None

    if redact_token and IsTokenUri(uri):
      redact_req_body_reason = (
          'Contains oauth token. Set log_http_redact_token property to false '
          'to print the body of this request.')
      redact_resp_body_reason = (
          'Contains oauth token. Set log_http_redact_token property to false '
          'to print the body of this response.')
    elif redact_request_body_reason is not None:
      redact_req_body_reason = redact_request_body_reason

    log.status.Print('=======================')
    log.status.Print('==== request start ====')
    log.status.Print('uri: {uri}'.format(uri=uri))
    log.status.Print('method: {method}'.format(method=method))
    log.status.Print('== headers start ==')
    for h, v in sorted(six.iteritems(headers)):
      if redact_token and h.lower() in (b'authorization',
                                        b'x-goog-iam-authorization-token'):
        v = '--- Token Redacted ---'
      log.status.Print('{0}: {1}'.format(h, v))
    log.status.Print('== headers end ==')
    log.status.Print('== body start ==')
    if redact_req_body_reason is None:
      log.status.Print(body)
    else:
      log.status.Print('Body redacted: {}'.format(redact_req_body_reason))
    log.status.Print('== body end ==')
    log.status.Print('==== request end ====')

    return {
        'start_time': time.time(),
        'redact_resp_body_reason': redact_resp_body_reason,
    }

  return _LogRequest


def LogRequestDryRun():
  """Dry run the http request.

  Returns:
    A function that can be used in a Handler.request.
  """

  def _LogRequest(request):
    """Blocks a dry-run request."""
    raise exceptions.DryRunError(request)

  return _LogRequest


def LogResponse(streaming_response_body=False):
  """Logs the contents of the http response.

  Args:
    streaming_response_body: bool, True indicates that the response body will be
      a streaming body.

  Returns:
    A function that can be used in a Handler.response.
  """

  def _LogResponse(response, data):
    """Logs a response."""
    redact_resp_body_reason = data['redact_resp_body_reason']
    time_taken = time.time() - data['start_time']
    log.status.Print('---- response start ----')
    log.status.Print('status: {0}'.format(response.status_code))
    log.status.Print('-- headers start --')
    for h, v in sorted(six.iteritems(response.headers)):
      log.status.Print('{0}: {1}'.format(h, v))
    log.status.Print('-- headers end --')
    log.status.Print('-- body start --')
    if streaming_response_body:
      log.status.Print('<streaming body>')
    elif redact_resp_body_reason is None:
      log.status.Print(response.body)
    else:
      log.status.Print('Body redacted: {}'.format(redact_resp_body_reason))
    log.status.Print('-- body end --')
    log.status.Print(
        'total round trip time (request+response): {0:.3f} secs'.format(
            time_taken))
    log.status.Print('---- response end ----')
    log.status.Print('----------------------')

  return _LogResponse


def RecordStartTime():
  """Records the time at which the request was started.

  Returns:
    A function that can be used in a Handler.request.
  """

  def _RecordStartTime(request):
    """Records the start time of a request."""
    del request  # Unused.
    return {'start_time': time.time()}

  return _RecordStartTime


def ReportDuration():
  """Reports the duration of response to the metrics module.

  Returns:
    A function that can be used in a Handler.response.
  """

  def _ReportDuration(response, data):
    """Records the duration of a request."""
    del response  # Unused.
    duration = time.time() - data['start_time']
    metrics.RPCDuration(duration)

  return _ReportDuration


def GetAndCacheArchitecture(user_platform):
  """Get and cache architecture of client machine.

  For M1 Macs running x86_64 Python using Rosetta, user_platform.architecture
  (from platform.machine()) returns x86_64. We can use
  IsActuallyM1ArmArchitecture() to determine the underlying hardware; however,
  it requires a system call that might take ~5ms.
  To mitigate this, we will persist this value as an internal property with
  INSTALLATION scope.

  Args:
    user_platform: platforms.Platform.Current()

  Returns:
    client machine architecture
  """

  active_config_store = config.GetConfigStore()
  try:
    cached_arch = active_config_store.Get('client_arch')
  except Exception:  # pylint: disable=broad-except
    cached_arch = None
  if active_config_store and cached_arch:
    return cached_arch

  # Determine if this is an M1 Mac Python using x86_64 emulation.
  if (user_platform.operating_system == platforms.OperatingSystem.MACOSX and
      user_platform.architecture == platforms.Architecture.x86_64 and
      platforms.Platform.IsActuallyM1ArmArchitecture()):
    arch = '{}_{}'.format(
        platforms.Architecture.x86_64, platforms.Architecture.arm)
  else:
    arch = str(user_platform.architecture)

  if active_config_store:
    active_config_store.Set('client_arch', arch)
  return arch


def MakeUserAgentString(cmd_path=None):
  """Return a user-agent string for this request.

  Contains 'gcloud' in addition to several other product IDs used for tracing in
  metrics reporting.

  Args:
    cmd_path: str representing the current command for tracing.

  Returns:
    str, User Agent string.
  """
  user_platform = platforms.Platform.Current()
  architecture = GetAndCacheArchitecture(user_platform)

  return (
      'gcloud/{version}'
      ' command/{cmd}'
      ' invocation-id/{inv_id}'
      ' environment/{environment}'
      ' environment-version/{env_version}'
      ' client-os/{os}'
      ' client-os-ver/{os_version}'
      ' client-pltf-arch/{architecture}'
      ' interactive/{is_interactive}'
      ' from-script/{from_script}'
      ' python/{py_version}'
      ' term/{term}'
      ' {gcloud_mcp_metrics}'
      ' {ua_fragment}'
  ).format(
      version=config.CLOUD_SDK_VERSION.replace(' ', '_'),
      cmd=(cmd_path or properties.VALUES.metrics.command_name.Get()),
      inv_id=INVOCATION_ID,
      environment=properties.GetMetricsEnvironment(),
      env_version=properties.VALUES.metrics.environment_version.Get(),
      os=user_platform.operating_system,
      os_version=user_platform.operating_system.clean_version
      if user_platform.operating_system
      else None,
      architecture=architecture,
      is_interactive=console_io.IsInteractive(error=True, heuristic=True),
      py_version=platform.python_version(),
      ua_fragment=user_platform.UserAgentFragment(),
      from_script=console_io.IsRunFromShellScript(),
      term=console_attr.GetConsoleAttr().GetTermIdentifier(),
      gcloud_mcp_metrics=GetValidMCPMetricsString(),
  )


def GetDefaultTimeout():
  return properties.VALUES.core.http_timeout.GetInt() or 300


def IsTokenUri(uri):
  """Determine if the given URI is for requesting an access token."""
  if uri in TOKEN_URIS:
    return True

  metadata_regexp = ('(metadata.google.internal|169.254.169.254)/'
                     'computeMetadata/.*?/instance/service-accounts/.*?/token')

  impersonate_service_account = ('iamcredentials.googleapis.com/v.*?/projects/'
                                 '-/serviceAccounts/.*?:generateAccessToken')

  if re.search(metadata_regexp, uri) is not None:
    return True

  if re.search(impersonate_service_account, uri) is not None:
    return True

  return False


def GetValidMCPMetricsString():
  """Returns the valid MCP metrics string if it exists, otherwise returns empty string."""
  if (
      'GCLOUD_MCP_METRICS' in os.environ
  ):
    metrics_string = os.environ['GCLOUD_MCP_METRICS']
    if ValidateMCPMetricsFormat(metrics_string):
      return metrics_string
  return ''


def ValidateMCPMetricsFormat(metrics_string):
  """Checks if a string follows the MCP metrics format.

  Args:
    metrics_string: The string to validate.

  Returns:
    True if the string matches the format, False otherwise.
  """
  pattern = (
      r'goog-mcp/agent/[^/]+/agent-version/[^/]+/mcp-server/[^/]+/'
      r'mcp-version/[^/]+/mcp-tool/[^/]+'
  )
  return re.fullmatch(pattern, metrics_string) is not None