HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/394/lib/googlecloudsdk/core/credentials/requests.py
# -*- coding: utf-8 -*- #
# Copyright 2020 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""A module to get a credentialed http object for making API calls."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

from google.auth import external_account as google_auth_external_account
from google.auth.transport import requests as google_auth_requests
from googlecloudsdk.calliope import base
from googlecloudsdk.core import exceptions
from googlecloudsdk.core import requests
from googlecloudsdk.core import transport as core_transport
from googlecloudsdk.core.credentials import store
from googlecloudsdk.core.credentials import transport

REFRESH_STATUS_CODES = [401]
MAX_REFRESH_ATTEMPTS = 1


class Error(exceptions.Error):
  """Exceptions for this module."""


def GetSession(timeout='unset',
               ca_certs=None,
               enable_resource_quota=True,
               allow_account_impersonation=True,
               session=None,
               streaming_response_body=False,
               redact_request_body_reason=None):
  """Get requests.Session object for working with the Google API.

  Args:
    timeout: double, The timeout in seconds to pass to httplib2.  This is the
        socket level timeout.  If timeout is None, timeout is infinite.  If
        default argument 'unset' is given, a sensible default is selected.
    ca_certs: str, absolute filename of a ca_certs file that overrides the
        default
    enable_resource_quota: bool, By default, we are going to tell APIs to use
        the quota of the project being operated on. For some APIs we want to use
        gcloud's quota, so you can explicitly disable that behavior by passing
        False here.
    allow_account_impersonation: bool, True to allow use of impersonated service
        account credentials for calls made with this client. If False, the
        active user credentials will always be used.
    session: requests.Session instance. Otherwise, a new requests.Session will
        be initialized.
    streaming_response_body: bool, True indicates that the response body will
        be a streaming body.
    redact_request_body_reason: str, the reason why the request body must be
        redacted if --log-http is used. If None, the body is not redacted.

  Returns:
    1. A regular requests.Session object if no credentials are available;
    2. Or an authorized requests.Session object authorized by google-auth
       credentials.

  Raises:
    creds_exceptions.Error: If an error loading the credentials occurs.
  """
  session = requests.GetSession(
      timeout=timeout,
      ca_certs=ca_certs,
      session=session,
      streaming_response_body=streaming_response_body,
      redact_request_body_reason=redact_request_body_reason,
  )
  request_wrapper = RequestWrapper()
  use_google_auth = base.UseGoogleAuth()
  credentials = store.LoadIfEnabled(
      allow_account_impersonation, use_google_auth
  )
  session = request_wrapper.WrapQuota(
      session,
      enable_resource_quota,
      allow_account_impersonation,
      True,
      credentials=credentials,
  )
  session = request_wrapper.WrapCredentials(
      session, allow_account_impersonation, credentials=credentials
  )

  return session


class RequestWrapper(
    transport.CredentialWrappingMixin,
    transport.QuotaHandlerMixin,
    requests.RequestWrapper,
):
  """Class for wrapping requests.Session requests."""

  def AuthorizeClient(self, http_client, creds):
    """Returns an http_client authorized with the given credentials."""
    orig_request = http_client.request
    credential_refresh_state = {'attempt': 0}
    auth_request = google_auth_requests.Request(http_client)

    def WrappedRequest(method, url, data=None, headers=None, **kwargs):
      wrapped_request = http_client.request
      http_client.request = orig_request

      creds.before_request(auth_request, method, url, headers)

      http_client.request = wrapped_request
      response = orig_request(
          method, url, data=data, headers=headers or {}, **kwargs)

      if (response.status_code in REFRESH_STATUS_CODES and
          not (isinstance(creds, google_auth_external_account.Credentials) and
               creds.valid) and
          credential_refresh_state['attempt'] < MAX_REFRESH_ATTEMPTS):
        credential_refresh_state['attempt'] += 1
        creds.refresh(requests.GoogleAuthRequest())
        response = orig_request(
            method, url, data=data, headers=headers or {}, **kwargs)

      return response

    http_client.request = WrappedRequest
    return http_client

  def WrapQuota(
      self,
      http_client,
      enable_resource_quota,
      allow_account_impersonation,
      use_google_auth,
      credentials=None,
  ):
    """Returns an http_client with quota project handling."""
    quota_project = self.QuotaProject(
        enable_resource_quota,
        allow_account_impersonation,
        use_google_auth,
        credentials=credentials,
    )
    if not quota_project:
      return http_client
    orig_request = http_client.request
    wrapped_request = self.QuotaWrappedRequest(http_client, quota_project)

    def RequestWithRetry(*args, **kwargs):
      """Retries the request after removing the quota project header.

      Try the request with the X-Goog-User-Project header. If the account does
      not have the permission to expense the quota of the user project in the
      header, remove the header and retry.

      Args:
        *args: *args to send to requests.Session.request method.
        **kwargs: **kwargs to send to requests.Session.request method.

      Returns:
        Response from requests.Session.request.
      """
      response = wrapped_request(*args, **kwargs)
      if response.status_code != 403:
        return response
      old_encoding = response.encoding
      response.encoding = response.encoding or core_transport.ENCODING
      try:
        err_details = response.json()['error']['details']
      except (KeyError, ValueError):
        return response
      finally:
        response.encoding = old_encoding
      for err_detail in err_details:
        if (err_detail.get('@type')
            == 'type.googleapis.com/google.rpc.ErrorInfo' and
            err_detail.get('reason') == transport.USER_PROJECT_ERROR_REASON and
            err_detail.get('domain') == transport.USER_PROJECT_ERROR_DOMAIN):
          return orig_request(*args, **kwargs)
      return response

    if base.UserProjectQuotaWithFallbackEnabled():
      http_client.request = RequestWithRetry
    else:
      http_client.request = wrapped_request
    return http_client