HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/396/lib/googlecloudsdk/core/credentials/introspect.py
# -*- coding: utf-8 -*- #
# Copyright 2021 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Provides utilities for token introspection."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import functools
import json
import logging

from google.auth import exceptions as google_auth_exceptions
from google.auth import external_account
from google.oauth2 import utils as oauth2_utils
from googlecloudsdk.core import config
from googlecloudsdk.core import exceptions
from googlecloudsdk.core import properties
from six.moves import http_client
from six.moves import urllib


_ACCESS_TOKEN_TYPE = 'urn:ietf:params:oauth:token-type:access_token'
_URLENCODED_HEADERS = {'Content-Type': 'application/x-www-form-urlencoded'}
_EXTERNAL_ACCT_TOKEN_INTROSPECT_ENDPOINT = (
    'https://sts.googleapis.com/v1/introspect'
)


class Error(exceptions.Error):
  """A base exception for this module."""


class InactiveCredentialsError(Error):
  """Raised when the provided credentials are invalid or expired."""


class TokenIntrospectionError(Error):
  """Raised when an error is encountered while calling token introspection."""


class IntrospectionClient(oauth2_utils.OAuthClientAuthHandler):
  """Implements the OAuth 2.0 token introspection spec.

  This is based on https://tools.ietf.org/html/rfc7662.
  The implementation supports 3 types of client authentication when calling
  the endpoints: no authentication, basic header authentication and POST body
  authentication.
  """

  def __init__(self, token_introspect_endpoint, client_authentication=None):
    """Initializes an OAuth introspection client instance.

    Args:
      token_introspect_endpoint (str): The token introspection endpoint.
      client_authentication (Optional[oauth2_utils.ClientAuthentication]): The
        optional OAuth client authentication credentials if available.
    """
    super(IntrospectionClient, self).__init__(client_authentication)
    self._token_introspect_endpoint = token_introspect_endpoint

  def introspect(self, request, token, token_type_hint=_ACCESS_TOKEN_TYPE):
    """Returns the meta-information associated with an OAuth token.

    Args:
      request (google.auth.transport.Request): A callable that makes HTTP
        requests.
      token (str): The OAuth token whose meta-information are to be returned.
      token_type_hint (Optional[str]): The optional token type. The default is
        access_token.

    Returns:
      Mapping: The active token meta-information returned by the introspection
        endpoint.

    Raises:
      InactiveCredentialsError: If the credentials are invalid or expired.
      TokenIntrospectionError: If an error is encountered while calling the
        token introspection endpoint.
    """
    headers = _URLENCODED_HEADERS.copy()
    request_body = {
        'token': token,
        'token_type_hint': token_type_hint,
    }
    # Apply OAuth client authentication.
    self.apply_client_authentication_options(headers, request_body)

    # Execute request.
    response = request(
        url=self._token_introspect_endpoint,
        method='POST',
        headers=headers,
        body=urllib.parse.urlencode(request_body).encode('utf-8'),
    )

    response_body = (
        response.data.decode('utf-8')
        if hasattr(response.data, 'decode')
        else response.data
    )

    # If non-200 response received, translate to TokenIntrospectionError.
    if response.status != http_client.OK:
      raise TokenIntrospectionError(response_body)

    response_data = json.loads(response_body)

    if response_data.get('active'):
      return response_data
    else:
      raise InactiveCredentialsError(response_body)


def GetExternalAccountId(creds):
  """Returns the external account credentials' identifier.

  This requires basic client authentication and only works with external
  account credentials that have not been impersonated. The returned username
  field is used for the account ID.

  Args:
    creds (google.auth.external_account.Credentials): The external account
      credentials whose account ID is to be determined.

  Returns:
    Optional(str): The account ID string if determinable.

  Raises:
    InactiveCredentialsError: If the credentials are invalid or expired.
    TokenIntrospectionError: If an error is encountered while calling the
      token introspection endpoint.
  """
  # pylint: disable=g-import-not-at-top
  from googlecloudsdk.core import requests as core_requests
  # pylint: enable=g-import-not-at-top
  # Use basic client authentication.
  client_authentication = oauth2_utils.ClientAuthentication(
      oauth2_utils.ClientAuthType.basic,
      config.CLOUDSDK_CLIENT_ID,
      config.CLOUDSDK_CLIENT_NOTSOSECRET,
  )

  # Check if the introspection endpoint has been overridden,
  # otherwise use default endpoint. Prioritize property override first then
  # credential config.
  token_introspection_endpoint = _EXTERNAL_ACCT_TOKEN_INTROSPECT_ENDPOINT

  endpoint_override = properties.VALUES.auth.token_introspection_endpoint.Get()
  property_override = creds.token_info_url

  if endpoint_override or property_override:
    token_introspection_endpoint = endpoint_override or property_override

  oauth_introspection = IntrospectionClient(
      token_introspect_endpoint=token_introspection_endpoint,
      client_authentication=client_authentication,
  )
  # Create request with mTLS certificate injection for X.509 credentials.
  # If mTLS is required but the certificate and key paths cannot be obtained, ``
  # fall back to basic auth only.
  request = core_requests.GoogleAuthRequest()
  # Check for mTLS attributes. This is necessary because not all
  # external_account.Credentials subclasses support mTLS, and there's no public
  # interface to check for this capability.
  if (
      isinstance(creds, external_account.Credentials)
      and hasattr(creds, '_mtls_required')
      and callable(getattr(creds, '_mtls_required'))
      and creds._mtls_required()  # pylint: disable=protected-access
  ):
    try:
      cert_path, key_path = creds._get_mtls_cert_and_key_paths()  # pylint: disable=protected-access
      request = functools.partial(request, cert=(cert_path, key_path))
    except (
        AttributeError,
        ValueError,
        google_auth_exceptions.GoogleAuthError,
        IOError,
        OSError,
    ) as e:
      # If mTLS is required but certificate and key paths are unavailable,
      # log the error and fall back to basic auth only.
      logging.debug('Could not get mTLS certificate and key paths: %s', e)
      pass
  if not creds.valid:
    creds.refresh(request)
  token_info = oauth_introspection.introspect(request, creds.token)
  # User friendly identifier is stored in username.
  return token_info.get('username')