HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/396/lib/googlecloudsdk/api_lib/util/apis_internal.py
# -*- coding: utf-8 -*- #
# Copyright 2017 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Library for obtaining API clients and messages.

This should only be called by api_lib.util.apis, core.resources, gcloud meta
commands, and module tests.
"""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

from googlecloudsdk.api_lib.util import apis_util
from googlecloudsdk.api_lib.util import resource as resource_util
from googlecloudsdk.core import properties
from googlecloudsdk.core import transport
from googlecloudsdk.generated_clients.apis import apis_map
import six
from six.moves.urllib.parse import urljoin
from six.moves.urllib.parse import urlparse


def _GetApiNameAndAlias(api_name):
  # pylint:disable=protected-access
  return (apis_util._API_NAME_ALIASES.get(api_name, api_name), api_name)


def _GetDefaultVersion(api_name):
  api_name, _ = _GetApiNameAndAlias(api_name)
  api_vers = apis_map.MAP.get(api_name, {})
  for ver, api_def in six.iteritems(api_vers):
    if api_def.default_version:
      return ver
  return None


def _GetApiNames():
  """Returns list of avaiblable apis, ignoring the version."""
  return sorted(apis_map.MAP.keys())


def _GetVersions(api_name):
  """Return available versions for given api.

  Args:
    api_name: str, The API name (or the command surface name, if different).

  Raises:
    apis_util.UnknownAPIError: If api_name does not exist in the APIs map.

  Returns:
    list, of version names.
  """
  api_name, _ = _GetApiNameAndAlias(api_name)
  version_map = apis_map.MAP.get(api_name, None)
  if version_map is None:
    raise apis_util.UnknownAPIError(api_name)
  return list(version_map.keys())


def GetApiDef(api_name, api_version):
  """Returns the APIDef for the specified API and version.

  Args:
    api_name: str, The API name (or the command surface name, if different).
    api_version: str, The version of the API.

  Raises:
    apis_util.UnknownAPIError: If api_name does not exist in the APIs map.
    apis_util.UnknownVersionError: If api_version does not exist for given
      api_name in the APIs map.

  Returns:
    APIDef, The APIDef for the specified API and version.
  """
  api_name, api_name_alias = _GetApiNameAndAlias(api_name)
  if api_name not in apis_map.MAP:
    raise apis_util.UnknownAPIError(api_name)

  version_overrides = properties.VALUES.api_client_overrides.AllValues()

  # First attempt to get api specific override, then full surface override.
  version_override = version_overrides.get('{}/{}'.format(
      api_name, api_version))
  if not version_override:
    version_override = version_overrides.get(api_name_alias, None)

  api_version = version_override or api_version

  api_versions = apis_map.MAP[api_name]
  if api_version is None or api_version not in api_versions:
    raise apis_util.UnknownVersionError(api_name, api_version)
  else:
    api_def = api_versions[api_version]

  return api_def


def _GetClientClass(api_name, api_version):
  """Returns the client class for the API specified in the args.

  Args:
    api_name: str, The API name (or the command surface name, if different).
    api_version: str, The version of the API.

  Returns:
    base_api.BaseApiClient, Client class for the specified API.
  """
  api_def = GetApiDef(api_name, api_version)
  return _GetClientClassFromDef(api_def)


def _GetClientClassFromDef(api_def):
  """Returns the apitools client class for the API definition specified in args.

  Args:
    api_def: apis_map.APIDef, The definition of the API.

  Returns:
    base_api.BaseApiClient, Client class for the specified API.
  """
  client_full_classpath = api_def.apitools.client_full_classpath
  module_path, client_class_name = client_full_classpath.rsplit('.', 1)
  module_obj = __import__(module_path, fromlist=[client_class_name])
  return getattr(module_obj, client_class_name)


def _GetClientInstance(api_name,
                       api_version,
                       no_http=False,
                       http_client=None,
                       check_response_func=None,
                       http_timeout_sec=None):
  """Returns an instance of the API client specified in the args.

  Args:
    api_name: str, The API name (or the command surface name, if different).
    api_version: str, The version of the API.
    no_http: bool, True to not create an http object for this client.
    http_client: bring your own http client to use. Incompatible with
      no_http=True.
    check_response_func: error handling callback to give to apitools.
    http_timeout_sec: int, seconds of http timeout to set, defaults if None.

  Returns:
    base_api.BaseApiClient, An instance of the specified API client.
  """

  # pylint: disable=g-import-not-at-top
  if no_http:
    assert http_client is None
  elif http_client is None:
    # Normal gcloud authentication
    # Import http only when needed, as it depends on credential infrastructure
    # which is not needed in all cases.
    from googlecloudsdk.core.credentials import transports
    http_client = transports.GetApitoolsTransport(
        response_encoding=transport.ENCODING,
        timeout=http_timeout_sec if http_timeout_sec else 'unset')

  client_class = _GetClientClass(api_name, api_version)
  client_instance = client_class(
      url=_GetEffectiveApiEndpoint(api_name, api_version, client_class),
      get_credentials=False,
      http=http_client)
  if check_response_func is not None:
    client_instance.check_response_func = check_response_func
  api_key = properties.VALUES.core.api_key.Get()
  if api_key:
    client_instance.AddGlobalParam('key', api_key)
    header = 'X-Google-Project-Override'
    client_instance.additional_http_headers[header] = 'apikey'
  return client_instance


def _GetGapicClientClass(api_name,
                         api_version,
                         transport_choice=apis_util.GapicTransport.GRPC):
  """Returns the GAPIC client class for the API def specified by the args.

  Args:
    api_name: str, The API name (or the command surface name, if different).
    api_version: str, The version of the API.
    transport_choice: apis_util.GapicTransport, The transport to be used by the
      client.
  """
  api_def = GetApiDef(api_name, api_version)
  if transport_choice == apis_util.GapicTransport.GRPC_ASYNCIO:
    client_full_classpath = api_def.gapic.async_client_full_classpath
  elif transport_choice == apis_util.GapicTransport.REST:
    client_full_classpath = api_def.gapic.rest_client_full_classpath
  else:
    client_full_classpath = api_def.gapic.client_full_classpath

  module_path, client_class_name = client_full_classpath.rsplit('.', 1)
  module_obj = __import__(module_path, fromlist=[client_class_name])
  return getattr(module_obj, client_class_name)


def _GetGapicClientInstance(
    api_name,
    api_version,
    credentials,
    address_override_func=None,
    transport_choice=apis_util.GapicTransport.GRPC,
    attempt_direct_path=False,
    redact_request_body_reason=None,
):
  """Returns an instance of the GAPIC API client specified in the args.

  For apitools API clients, the API endpoint override is something like
  http://compute.googleapis.com/v1/. For GAPIC API clients, the DEFAULT_ENDPOINT
  is something like compute.googleapis.com. To use the same endpoint override
  property for both, we use the netloc of the API endpoint override.

  Args:
    api_name: str, The API name (or the command surface name, if different).
    api_version: str, The version of the API.
    credentials: google.auth.credentials.Credentials, the credentials to use.
    address_override_func: function, function to call to override the client
      host. It takes a single argument which is the original host.
    transport_choice: apis_util.GapicTransport, The transport to be used by the
      client.
    attempt_direct_path: bool, True if we want to attempt direct path gRPC where
      possible.
    redact_request_body_reason: str, the reason why the request body must be
      redacted if --log-http is used. If None, the body is not redacted.

  Returns:
    An instance of the specified GAPIC API client.
  """
  def AddressOverride(address):
    try:
      endpoint_override = properties.VALUES.api_endpoint_overrides.Property(
          api_name).Get()
    except properties.NoSuchPropertyError:
      endpoint_override = None

    if endpoint_override:
      address = urlparse(endpoint_override).netloc

    if address_override_func:
      address = address_override_func(address)

    if endpoint_override is not None:
      return address

    return UniversifyAddress(address)

  client_class = _GetGapicClientClass(
      api_name, api_version, transport_choice=transport_choice)

  return client_class(
      credentials,
      address_override_func=AddressOverride,
      mtls_enabled=_MtlsEnabled(api_name, api_version),
      attempt_direct_path=attempt_direct_path,
      redact_request_body_reason=redact_request_body_reason,
  )


def UniversifyAddress(address):
  """Update a URL based on the current universe domain."""
  universe_domain_property = properties.VALUES.core.universe_domain
  universe_domain = universe_domain_property.Get()
  if (address is not None and
      universe_domain_property.default != universe_domain):
    address = address.replace(universe_domain_property.default,
                              universe_domain, 1)
  return address


def _GetMtlsEndpoint(api_name, api_version, client_class=None):
  """Returns mtls endpoint."""
  api_def = GetApiDef(api_name, api_version)
  if api_def.apitools:
    client_class = client_class or _GetClientClass(api_name, api_version)
  else:
    client_class = client_class or _GetGapicClientClass(api_name, api_version)
  return api_def.mtls_endpoint_override or client_class.MTLS_BASE_URL


def _MtlsEnabled(api_name, api_version):
  """Checks if the API of the given version should use mTLS.

  If context_aware/always_use_mtls_endpoint is True, then mTLS will always be
  used.

  If context_aware/use_client_certificate is True, then mTLS will be used only
  if the API version is in the mTLS allowlist.

  gcloud maintains a client-side allowlist for the mTLS feature
  (go/gcloud-rollout-mtls).

  Args:
    api_name: str, The API name.
    api_version: str, The version of the API.

  Returns:
    True if the given service and version is in the mTLS allowlist.
  """
  if properties.VALUES.context_aware.always_use_mtls_endpoint.GetBool():
    return True

  if not properties.VALUES.context_aware.use_client_certificate.GetBool():
    return False

  api_def = GetApiDef(api_name, api_version)
  return api_def.enable_mtls


def _BuildEndpointOverride(endpoint_override, base_url):
  """Constructs a normalized endpoint URI depending on the client base_url."""
  url_base = urlparse(base_url)
  url_endpoint_override = urlparse(endpoint_override)
  if url_base.path == '/' or url_endpoint_override.path != '/':
    return endpoint_override
  return urljoin(
      '{}://{}'.format(url_endpoint_override.scheme,
                       url_endpoint_override.netloc), url_base.path)


def _GetBaseUrlFromApi(api_name, api_version):
  """Returns base url for given api."""
  if GetApiDef(api_name, api_version).apitools:
    client_class = _GetClientClass(api_name, api_version)
  else:
    client_class = _GetGapicClientClass(api_name, api_version)

  if hasattr(client_class, 'BASE_URL'):
    client_base_url = client_class.BASE_URL
  else:
    try:
      client_base_url = _GetResourceModule(api_name, api_version).BASE_URL
    except AttributeError:
      client_base_url = 'https://{}.googleapis.com/{}'.format(
          api_name, api_version
      )
  return UniversifyAddress(client_base_url)


def _GetEffectiveApiEndpoint(api_name, api_version, client_class=None):
  """Returns effective endpoint for given api."""
  try:
    endpoint_override = properties.VALUES.api_endpoint_overrides.Property(
        api_name).Get()
  except properties.NoSuchPropertyError:
    endpoint_override = None

  api_def = GetApiDef(api_name, api_version)
  if api_def.apitools:
    client_class = client_class or _GetClientClass(api_name, api_version)
  else:
    client_class = client_class or _GetGapicClientClass(api_name, api_version)
  client_base_url = _GetBaseUrlFromApi(api_name, api_version)
  if endpoint_override:
    address = _BuildEndpointOverride(endpoint_override, client_base_url)
  elif _MtlsEnabled(api_name, api_version):
    address = UniversifyAddress(
        _GetMtlsEndpoint(api_name, api_version, client_class)
    )
  else:
    address = client_base_url

  return address


def _GetMessagesModule(api_name, api_version):
  """Returns the messages module for the API specified in the args.

  Args:
    api_name: str, The API name (or the command surface name, if different).
    api_version: str, The version of the API.

  Returns:
    Module containing the definitions of messages for the specified API.
  """
  api_def = GetApiDef(api_name, api_version)
  # fromlist below must not be empty, see:
  # http://stackoverflow.com/questions/2724260/why-does-pythons-import-require-fromlist.
  return __import__(
      api_def.apitools.messages_full_modulepath, fromlist=['something'])


def _GetResourceModule(api_name, api_version):
  """Imports and returns given api resources module."""

  api_def = GetApiDef(api_name, api_version)
  # fromlist below must not be empty, see:
  # http://stackoverflow.com/questions/2724260/why-does-pythons-import-require-fromlist.
  if api_def.apitools:
    return __import__(
        api_def.apitools.class_path + '.' + 'resources', fromlist=['something']
    )
  # ApiDef must be gapic-only:
  return __import__(
      api_def.gapic.class_path + '.' + 'resources', fromlist=['something']
  )


def _GetApiCollections(api_name, api_version):
  """Yields all collections for for given api."""

  try:
    resources_module = _GetResourceModule(api_name, api_version)
  except ImportError:
    pass
  else:
    for collection in resources_module.Collections:
      yield resource_util.CollectionInfo(
          api_name,
          api_version,
          resources_module.BASE_URL,
          resources_module.DOCS_URL,
          collection.collection_name,
          collection.path,
          collection.flat_paths,
          collection.params,
          collection.enable_uri_parsing,
      )