HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/third_party/oauthlib/oauth2/rfc6749/endpoints/metadata.py
# -*- coding: utf-8 -*-
"""oauthlib.oauth2.rfc6749.endpoint.metadata ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

An implementation of the `OAuth 2.0 Authorization Server Metadata`.

.. _`OAuth 2.0 Authorization Server Metadata`:
https://tools.ietf.org/html/rfc8414
"""
from __future__ import absolute_import, unicode_literals

import copy
import json
import logging

from ....common import unicode_type
from .base import BaseEndpoint, catch_errors_and_unavailability
from .authorization import AuthorizationEndpoint
from .introspect import IntrospectEndpoint
from .token import TokenEndpoint
from .revocation import RevocationEndpoint
from .. import grant_types

log = logging.getLogger(__name__)


class MetadataEndpoint(BaseEndpoint):
  """OAuth2.0 Authorization Server Metadata endpoint.

   This specification generalizes the metadata format defined by
   `OpenID Connect Discovery 1.0` in a way that is compatible
   with OpenID Connect Discovery while being applicable to a wider set
   of OAuth 2.0 use cases.  This is intentionally parallel to the way
   that OAuth 2.0 Dynamic Client Registration Protocol [`RFC7591`_]
   generalized the dynamic client registration mechanisms defined by
   OpenID Connect Dynamic Client Registration 1.0
   in a way that is compatible with it.

   .. _`OpenID Connect Discovery 1.0`:
   https://openid.net/specs/openid-connect-discovery-1_0.html
   .. _`RFC7591`: https://tools.ietf.org/html/rfc7591
   """

  def __init__(self, endpoints, claims={}, raise_errors=True):
    assert isinstance(claims, dict)
    for endpoint in endpoints:
      assert isinstance(endpoint, BaseEndpoint)

    BaseEndpoint.__init__(self)
    self.raise_errors = raise_errors
    self.endpoints = endpoints
    self.initial_claims = claims
    self.claims = self.validate_metadata_server()

  @catch_errors_and_unavailability
  def create_metadata_response(self,
                               uri,
                               http_method="GET",
                               body=None,
                               headers=None):
    """Create metadata response"""
    headers = {"Content-Type": "application/json"}
    return headers, json.dumps(self.claims), 200

  def validate_metadata(self,
                        array,
                        key,
                        is_required=False,
                        is_list=False,
                        is_url=False,
                        is_issuer=False):
    if not self.raise_errors:
      return

    if key not in array:
      if is_required:
        raise ValueError("key {} is a mandatory metadata.".format(key))

    elif is_issuer:
      if not array[key].startswith("https"):
        raise ValueError("key {}: {} must be an HTTPS URL".format(
            key, array[key]))
      if "?" in array[key] or "&" in array[key] or "#" in array[key]:
        raise ValueError(
            "key {}: {} must not contain query or fragment components".format(
                key, array[key]))

    elif is_url:
      if not array[key].startswith("http"):
        raise ValueError("key {}: {} must be an URL".format(key, array[key]))

    elif is_list:
      if not isinstance(array[key], list):
        raise ValueError("key {}: {} must be an Array".format(key, array[key]))
      for elem in array[key]:
        if not isinstance(elem, unicode_type):
          raise ValueError(
              "array {}: {} must contains only string (not {})".format(
                  key, array[key], elem))

  def validate_metadata_token(self, claims, endpoint):
    """
        If the token endpoint is used in the grant type, the value of this
        parameter MUST be the same as the value of the "grant_type"
        parameter passed to the token endpoint defined in the grant type
        definition.
        """
    self._grant_types.extend(endpoint._grant_types.keys())
    claims.setdefault("token_endpoint_auth_methods_supported",
                      ["client_secret_post", "client_secret_basic"])

    self.validate_metadata(
        claims, "token_endpoint_auth_methods_supported", is_list=True)
    self.validate_metadata(
        claims,
        "token_endpoint_auth_signing_alg_values_supported",
        is_list=True)
    self.validate_metadata(
        claims, "token_endpoint", is_required=True, is_url=True)

  def validate_metadata_authorization(self, claims, endpoint):
    claims.setdefault(
        "response_types_supported",
        list(filter(lambda x: x != "none", endpoint._response_types.keys())))
    claims.setdefault("response_modes_supported", ["query", "fragment"])

    # The OAuth2.0 Implicit flow is defined as a "grant type" but it is not
    # using the "token" endpoint, as such, we have to add it explicitly to
    # the list of "grant_types_supported" when enabled.
    if "token" in claims["response_types_supported"]:
      self._grant_types.append("implicit")

    self.validate_metadata(
        claims, "response_types_supported", is_required=True, is_list=True)
    self.validate_metadata(claims, "response_modes_supported", is_list=True)
    if "code" in claims["response_types_supported"]:
      code_grant = endpoint._response_types["code"]
      if not isinstance(code_grant,
                        grant_types.AuthorizationCodeGrant) and hasattr(
                            code_grant, "default_grant"):
        code_grant = code_grant.default_grant

      claims.setdefault("code_challenge_methods_supported",
                        list(code_grant._code_challenge_methods.keys()))
      self.validate_metadata(
          claims, "code_challenge_methods_supported", is_list=True)
    self.validate_metadata(
        claims, "authorization_endpoint", is_required=True, is_url=True)

  def validate_metadata_revocation(self, claims, endpoint):
    claims.setdefault("revocation_endpoint_auth_methods_supported",
                      ["client_secret_post", "client_secret_basic"])

    self.validate_metadata(
        claims, "revocation_endpoint_auth_methods_supported", is_list=True)
    self.validate_metadata(
        claims,
        "revocation_endpoint_auth_signing_alg_values_supported",
        is_list=True)
    self.validate_metadata(
        claims, "revocation_endpoint", is_required=True, is_url=True)

  def validate_metadata_introspection(self, claims, endpoint):
    claims.setdefault("introspection_endpoint_auth_methods_supported",
                      ["client_secret_post", "client_secret_basic"])

    self.validate_metadata(
        claims, "introspection_endpoint_auth_methods_supported", is_list=True)
    self.validate_metadata(
        claims,
        "introspection_endpoint_auth_signing_alg_values_supported",
        is_list=True)
    self.validate_metadata(
        claims, "introspection_endpoint", is_required=True, is_url=True)

  def validate_metadata_server(self):
    """
        Authorization servers can have metadata describing their
        configuration.  The following authorization server metadata values
        are used by this specification. More details can be found in
        `RFC8414 section 2`_ :

       issuer
          REQUIRED

       authorization_endpoint
          URL of the authorization server's authorization endpoint
          [`RFC6749#Authorization`_].  This is REQUIRED unless no grant types
          are supported
          that use the authorization endpoint.

       token_endpoint
          URL of the authorization server's token endpoint [`RFC6749#Token`_].
          This
          is REQUIRED unless only the implicit grant type is supported.

       scopes_supported
          RECOMMENDED.

       response_types_supported
          REQUIRED.

       * Other OPTIONAL fields:
       jwks_uri
       registration_endpoint
       response_modes_supported

       grant_types_supported
          OPTIONAL.  JSON array containing a list of the OAuth 2.0 grant
          type values that this authorization server supports.  The array
          values used are the same as those used with the "grant_types"
          parameter defined by "OAuth 2.0 Dynamic Client Registration
          Protocol" [`RFC7591`_].  If omitted, the default value is
          "["authorization_code", "implicit"]".

       token_endpoint_auth_methods_supported

       token_endpoint_auth_signing_alg_values_supported

       service_documentation

       ui_locales_supported

       op_policy_uri

       op_tos_uri

       revocation_endpoint

       revocation_endpoint_auth_methods_supported

       revocation_endpoint_auth_signing_alg_values_supported

       introspection_endpoint

       introspection_endpoint_auth_methods_supported

       introspection_endpoint_auth_signing_alg_values_supported

       code_challenge_methods_supported

       Additional authorization server metadata parameters MAY also be used.
       Some are defined by other specifications, such as OpenID Connect
       Discovery 1.0 [`OpenID.Discovery`_].

        .. _`RFC8414 section 2`: https://tools.ietf.org/html/rfc8414#section-2
        .. _`RFC6749#Authorization`:
        https://tools.ietf.org/html/rfc6749#section-3.1
        .. _`RFC6749#Token`: https://tools.ietf.org/html/rfc6749#section-3.2
        .. _`RFC7591`: https://tools.ietf.org/html/rfc7591
        .. _`OpenID.Discovery`:
        https://openid.net/specs/openid-connect-discovery-1_0.html
        """
    claims = copy.deepcopy(self.initial_claims)
    self.validate_metadata(claims, "issuer", is_required=True, is_issuer=True)
    self.validate_metadata(claims, "jwks_uri", is_url=True)
    self.validate_metadata(claims, "scopes_supported", is_list=True)
    self.validate_metadata(claims, "service_documentation", is_url=True)
    self.validate_metadata(claims, "ui_locales_supported", is_list=True)
    self.validate_metadata(claims, "op_policy_uri", is_url=True)
    self.validate_metadata(claims, "op_tos_uri", is_url=True)

    self._grant_types = []
    for endpoint in self.endpoints:
      if isinstance(endpoint, TokenEndpoint):
        self.validate_metadata_token(claims, endpoint)
      if isinstance(endpoint, AuthorizationEndpoint):
        self.validate_metadata_authorization(claims, endpoint)
      if isinstance(endpoint, RevocationEndpoint):
        self.validate_metadata_revocation(claims, endpoint)
      if isinstance(endpoint, IntrospectEndpoint):
        self.validate_metadata_introspection(claims, endpoint)

    # "grant_types_supported" is a combination of all OAuth2 grant types
    # allowed in the current provider implementation.
    claims.setdefault("grant_types_supported", self._grant_types)
    self.validate_metadata(claims, "grant_types_supported", is_list=True)
    return claims