HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/396/lib/googlecloudsdk/api_lib/util/exceptions.py
# -*- coding: utf-8 -*- #
# Copyright 2016 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""A module that converts API exceptions to core exceptions."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import collections
import io
import json
import logging
import string
import urllib.parse

from apitools.base.py import exceptions as apitools_exceptions
from googlecloudsdk.api_lib.util import resource as resource_util
from googlecloudsdk.core import exceptions as core_exceptions
from googlecloudsdk.core import log
from googlecloudsdk.core import properties
from googlecloudsdk.core.resource import resource_lex
from googlecloudsdk.core.resource import resource_printer
from googlecloudsdk.core.resource import resource_property
from googlecloudsdk.core.util import encoding
import six


# Some formatter characters are special inside {...}. The _Escape / _Expand pair
# escapes special chars inside {...} and ignores them outside.
_ESCAPE = '~'  # '\x01'
_ESCAPED_COLON = 'C'
_ESCAPED_ESCAPE = 'E'
_ESCAPED_LEFT_CURLY = 'L'
_ESCAPED_RIGHT_CURLY = 'R'

# ErrorInfo identifier used for extracting domain based handlers.
ERROR_INFO_SUFFIX = 'google.rpc.ErrorInfo'
LOCALIZED_MESSAGE_SUFFIX = 'google.rpc.LocalizedMessage'
HELP_SUFFIX = 'google.rpc.Help'


def _Escape(s):
  """Return s with format special characters escaped."""
  r = []
  n = 0
  for c in s:
    if c == _ESCAPE:
      r.append(_ESCAPE + _ESCAPED_ESCAPE + _ESCAPE)
    elif c == ':':
      r.append(_ESCAPE + _ESCAPED_COLON + _ESCAPE)
    elif c == '{':
      if n > 0:
        r.append(_ESCAPE + _ESCAPED_LEFT_CURLY + _ESCAPE)
      else:
        r.append('{')
      n += 1
    elif c == '}':
      n -= 1
      if n > 0:
        r.append(_ESCAPE + _ESCAPED_RIGHT_CURLY + _ESCAPE)
      else:
        r.append('}')
    else:
      r.append(c)
  return ''.join(r)


def _Expand(s):
  """Return s with escaped format special characters expanded."""
  r = []
  n = 0
  i = 0
  while i < len(s):
    c = s[i]
    i += 1
    if c == _ESCAPE and i + 1 < len(s) and s[i + 1] == _ESCAPE:
      c = s[i]
      i += 2
      if c == _ESCAPED_LEFT_CURLY:
        if n > 0:
          r.append(_ESCAPE + _ESCAPED_LEFT_CURLY)
        else:
          r.append('{')
        n += 1
      elif c == _ESCAPED_RIGHT_CURLY:
        n -= 1
        if n > 0:
          r.append(_ESCAPE + _ESCAPED_RIGHT_CURLY)
        else:
          r.append('}')
      elif n > 0:
        r.append(s[i - 3:i])
      elif c == _ESCAPED_COLON:
        r.append(':')
      elif c == _ESCAPED_ESCAPE:
        r.append(_ESCAPE)
    else:
      r.append(c)
  return ''.join(r)


class _JsonSortedDict(dict):
  """A dict with a sorted JSON string representation."""

  def __str__(self):
    return json.dumps(self, sort_keys=True)


class FormattableErrorPayload(string.Formatter):
  """Generic payload for an HTTP error that supports format strings.

  Attributes:
    content: The dumped JSON content.
    message: The human readable error message.
    status_code: The HTTP status code number.
    status_description: The status_code description.
    status_message: Context specific status message.
  """

  def __init__(self, http_error):
    """Initialize a FormattableErrorPayload instance.

    Args:
      http_error: An Exception that subclasses can use to populate class
        attributes, or a string to use as the error message.
    """
    super(FormattableErrorPayload, self).__init__()
    self._value = '{?}'
    self.content = {}
    self.status_code = 0
    self.status_description = ''
    self.status_message = ''
    if isinstance(http_error, six.string_types):
      self.message = http_error
    else:
      self.message = self._MakeGenericMessage()

  def get_field(self, field_name, unused_args, unused_kwargs):
    r"""Returns the value of field_name for string.Formatter.format().

    Args:
      field_name: The format string field name to get in the form
        name - the value of name in the payload, '' if undefined
        name?FORMAT - if name is non-empty then re-formats with FORMAT, where
          {?} is the value of name. For example, if name=NAME then
          {name?\nname is "{?}".} expands to '\nname is "NAME".'.
        .a.b.c - the value of a.b.c in the JSON decoded payload contents.
          For example, '{.errors.reason?[{?}]}' expands to [REASON] if
          .errors.reason is defined.
      unused_args: Ignored.
      unused_kwargs: Ignored.

    Returns:
      The value of field_name for string.Formatter.format().
    """
    field_name = _Expand(field_name)
    if field_name == '?':
      return self._value, field_name
    parts = field_name.split('?', 1)
    subparts = parts.pop(0).split(':', 1)
    name = subparts.pop(0)
    # Remove the leading 0. from the name if it exists to keep formatter in sync
    # with resoruce lexers expecations. Needed for python 3.14 compatibility.
    if (name.startswith('0.') and len(name) > 1) or name == '0.':
      name = name[1:]
    printer_format = subparts.pop(0) if subparts else None
    recursive_format = parts.pop(0) if parts else None
    name, value = self._GetField(name)
    if not value and not isinstance(value, (int, float)):
      return '', name
    if printer_format or not isinstance(
        value, (six.text_type, six.binary_type, float) + six.integer_types):
      buf = io.StringIO()
      resource_printer.Print(
          value, printer_format or 'default', out=buf, single=True)
      value = buf.getvalue().strip()
    if recursive_format:
      self._value = value
      value = self.format(_Expand(recursive_format))
    return value, name

  def _GetField(self, name):
    """Gets the value corresponding to name in self.content or class attributes.

    If `name` starts with a period, treat it as a key in self.content and get
    the corresponding value. Otherwise get the value of the class attribute
    named `name` first and fall back to checking keys in self.content.

    Args:
      name (str): The name of the attribute to return the value of.

    Returns:
      A tuple where the first value is `name` with any leading periods dropped,
      and the second value is the value of a class attribute or key in
      self.content.
    """
    if '.' in name:
      if name.startswith('.'):
        # Only check self.content.
        check_payload_attributes = False
        name = name[1:]
      else:
        # Check the payload attributes first, then self.content.
        check_payload_attributes = True
      key = resource_lex.Lexer(name).Key()
      content = self.content
      if check_payload_attributes and key:
        value = self.__dict__.get(key[0], None)
        if value:
          content = {key[0]: value}
      value = resource_property.Get(content, key, None)
    elif name:
      value = self.__dict__.get(name, None)
    else:
      value = None

    return name, value

  def _MakeGenericMessage(self):
    """Makes a generic human readable message from the HttpError."""
    description = self._MakeDescription()
    if self.status_message:
      return '{0}: {1}'.format(description, self.status_message)
    return description

  def _MakeDescription(self):
    """Makes description for error by checking which fields are filled in."""
    description = self.status_description
    if description:
      if description.endswith('.'):
        description = description[:-1]
      return description
    # Example: 'HTTPError 403'
    return 'HTTPError {0}'.format(self.status_code)


class HttpErrorPayload(FormattableErrorPayload):
  r"""Converts apitools HttpError payload to an object.

  Attributes:
    api_name: The url api name.
    api_version: The url version.
    content: The dumped JSON content.
    details: A list of {'@type': TYPE, 'detail': STRING} typed details.
    domain_details: ErrorInfo metadata Indexed by domain.
    violations: map of subject to error message for that subject.
    field_violations: map of field name to error message for that field.
    error_info: content['error'].
    instance_name: The url instance name.
    message: The human readable error message.
    resource_item: The resource type.
    resource_name: The url resource name.
    resource_version: The resource version.
    status_code: The HTTP status code number.
    status_description: The status_code description.
    status_message: Context specific status message.
    unparsed_details: The unparsed details.
    type_details: ErrorDetails Indexed by type.
    url: The HTTP url. .<a>.<b>...: The <a>.<b>... attribute in the JSON content
      (synthesized in get_field()).

  Grammar:
    Format strings inherit from python's string.formatter. where we pass tokens
    obtained by the resource projection framework format strings.

  Examples:
    error_format values and resulting output:

    'Error: [{status_code}] {status_message}{url?\n{?}}{.debugInfo?\n{?}}'

      Error: [404] Not found
      http://dotcom/foo/bar
      <content.debugInfo in yaml print format>

    'Error: {status_code} {details?\n\ndetails:\n{?}}'

      Error: 404

      details:
      - foo
      - bar

     'Error [{status_code}] {status_message}\n'
     '{.:value(details.detail.list(separator="\n"))}'

       Error [400] Invalid request.
       foo
       bar
  """

  def __init__(self, http_error):
    super(HttpErrorPayload, self).__init__(http_error)
    self.api_name = ''
    self.api_version = ''
    self.details = []
    self.violations = {}
    self.field_violations = {}
    self.error_info = None
    self.instance_name = ''
    self.resource_item = ''
    self.resource_name = ''
    self.resource_version = ''
    self.url = ''
    self._cred_info = None
    if not isinstance(http_error, six.string_types):
      self._ExtractResponseAndJsonContent(http_error)
      self._ExtractUrlResourceAndInstanceNames(http_error)
      self.message = self._MakeGenericMessage()

  def _GetField(self, name):
    if name.startswith('field_violations.'):
      _, field = name.split('.', 1)
      value = self.field_violations.get(field)
    elif name.startswith('violations.'):
      _, subject = name.split('.', 1)
      value = self.violations.get(subject)
    else:
      name, value = super(HttpErrorPayload, self)._GetField(name)
    return name, value

  def _GetMTLSEndpointOverride(self, endpoint_override):
    """Generates mTLS endpoint override for a given endpoint override.

    Args:
      endpoint_override: The endpoint to generate the mTLS endpoint override
        for.

    Returns:
      The mTLS endpoint override, if one exists. Otherwise, None.
    """
    if 'sandbox.googleapis.com' in endpoint_override:
      return endpoint_override.replace(
          'sandbox.googleapis.com', 'mtls.sandbox.googleapis.com'
      )
    elif 'googleapis.com' in endpoint_override:
      return endpoint_override.replace('googleapis.com', 'mtls.googleapis.com')
    else:
      return None

  def _IsMTLSEnabledAndApiEndpointOverridesPresent(self):
    """Returns whether mTLS is enabled and an endpoint override is present for the current gcloud invocation."""
    return (
        properties.VALUES.api_endpoint_overrides.AllValues()
        and properties.VALUES.context_aware.use_client_certificate.GetBool()
        and properties.IsInternalUserCheck()
    )

  def _IsExistingOverrideMTLS(self, endpoint_override):
    """Returns whether the existing endpoint override is using mTLS already."""
    urlparsed = urllib.parse.urlparse(endpoint_override)
    domain_parts = urlparsed.netloc
    if not domain_parts:
      return None
    domain = domain_parts.split('.')[1]
    return domain.startswith('mtls')

  def _ExtractResponseAndJsonContent(self, http_error):
    """Extracts the response and JSON content from the HttpError."""
    response = getattr(http_error, 'response', None)
    if response:
      self.status_code = int(response.get('status', 0))
      self.status_description = encoding.Decode(response.get('reason', ''))
    content = encoding.Decode(http_error.content)
    try:
      # X-GOOG-API-FORMAT-VERSION: 2
      self.content = _JsonSortedDict(json.loads(content))
      self.error_info = _JsonSortedDict(self.content['error'])
      if not self.status_code:  # Could have been set above.
        self.status_code = int(self.error_info.get('code', 0))

      if self.status_code in [401, 403, 404] and self.error_info.get(
          'message', ''
      ):
        from googlecloudsdk.core.credentials import store as c_store  # pylint: disable=g-import-not-at-top

        # Append the credential info to the error message.
        self._cred_info = c_store.CredentialInfo.GetCredentialInfo()
        if self._cred_info:
          cred_info_message = self._cred_info.GetInfoString()
          existing_message = self.content['error']['message']
          # Some surface actually appends an ending dot at the end of the
          # message, so we need to make sure not adding an ending dot if the
          # existing message doesn't have one. (Note that cred_info_message
          # string we created ends with a dot.)
          if existing_message[-1] != '.':
            # add dot after existing_message and remove the ending dot from
            # cred_info_message.
            self.content['error']['message'] = (
                existing_message + '. ' + cred_info_message[:-1]
            )
          else:
            self.content['error']['message'] = (
                existing_message + ' ' + cred_info_message
            )
          self.error_info['message'] = self.content['error']['message']

      if not self.status_description:  # Could have been set above.
        self.status_description = self.error_info.get('status', '')
      if self._IsMTLSEnabledAndApiEndpointOverridesPresent():
        endpoint_overrides = (
            properties.VALUES.api_endpoint_overrides.AllValues()
        )
        for api_name, endpoint_override in endpoint_overrides.items():
          mtls_endpoint = self._GetMTLSEndpointOverride(endpoint_override)
          if (
              not self._IsExistingOverrideMTLS(endpoint_override)
              and http_error.url.startswith(endpoint_override)
              and mtls_endpoint
          ):
            self.error_info['message'] = (
                'Certificate-based access is enabled, but the endpoint override'
                ' for the following API is not using mTLS: {}. Please update'
                ' the endpoint override to use mTLS endpoint: {} '.format(
                    [api_name, endpoint_override], mtls_endpoint
                )
            )
            self.error_info['details'] = [{
                '@type': 'type.googleapis.com/google.rpc.ErrorInfo',
                'reason': 'ACCESS_DENIED',
                'metadata': {
                    'endpoint_override': endpoint_override,
                    'mtls_endpoint_override': mtls_endpoint,
                },
            }]
            break
      self.status_message = self.error_info.get('message', '')
      self.details = self.error_info.get('details', [])
      self.violations = self._ExtractViolations(self.details)
      self.field_violations = self._ExtractFieldViolations(self.details)
      self.type_details = self._IndexErrorDetailsByType(self.details)
      self.domain_details = self._IndexErrorInfoByDomain(self.details)
      if properties.VALUES.core.parse_error_details.GetBool():
        self.unparsed_details = self.RedactParsedTypes(self.details)
    except (KeyError, TypeError, ValueError):
      self.status_message = content
    except AttributeError:
      pass

  def RedactParsedTypes(self, details):
    """Redacts the parsed types from the details list."""
    unparsed_details = []
    for item in details:
      error_type = item.get('@type', None)
      error_suffix = error_type.split('/')[-1]
      if error_suffix not in (LOCALIZED_MESSAGE_SUFFIX, HELP_SUFFIX):
        unparsed_details.append(item)
    return unparsed_details

  def _IndexErrorDetailsByType(self, details):
    """Extracts and indexes error details list by the type attribute."""
    type_map = collections.defaultdict(list)
    for item in details:
      error_type = item.get('@type', None)
      if error_type:
        error_type_suffix = error_type.split('.')[-1]
        type_map[error_type_suffix].append(item)
    return type_map

  def _IndexErrorInfoByDomain(self, details):
    """Extracts and indexes error info list by the domain attribute."""
    domain_map = collections.defaultdict(list)
    for item in details:
      error_type = item.get('@type', None)
      if error_type.endswith(ERROR_INFO_SUFFIX):
        domain = item.get('domain', None)
        if domain:
          domain_map[domain].append(item)
    return domain_map

  def _ExtractUrlResourceAndInstanceNames(self, http_error):
    """Extracts the url resource type and instance names from the HttpError."""
    self.url = http_error.url
    if not self.url:
      return

    try:
      name, version, resource_path = resource_util.SplitEndpointUrl(
          self.url)
    except resource_util.InvalidEndpointException:
      return

    if name:
      self.api_name = name
    if version:
      self.api_version = version

    # We do not attempt to parse this, as generally it doesn't represent a
    # resource uri.
    resource_parts = resource_path.split('/')
    if not 1 < len(resource_parts) < 4:
      return
    self.resource_name = resource_parts[0]
    instance_name = resource_parts[1]

    self.instance_name = instance_name.split('?')[0]
    self.resource_item = '{} instance'.format(self.resource_name)

  def _MakeDescription(self):
    """Makes description for error by checking which fields are filled in."""
    if self.status_code and self.resource_item and self.instance_name:
      if self.status_code == 403:
        if self._cred_info:
          account = (
              self._cred_info.impersonated_account or self._cred_info.account
          )
        else:
          account = properties.VALUES.core.account.Get()
        return (
            '[{0}] does not have permission to access {1} [{2}] (or '
            'it may not exist)'
        ).format(account, self.resource_item, self.instance_name)
      if self.status_code == 404:
        return '{0} [{1}] not found'.format(
            self.resource_item.capitalize(), self.instance_name)
      if self.status_code == 409:
        if self.resource_name == 'projects':
          return ('Resource in projects [{0}] '
                  'is the subject of a conflict').format(self.instance_name)
        else:
          return '{0} [{1}] is the subject of a conflict'.format(
              self.resource_item.capitalize(), self.instance_name)

    return super(HttpErrorPayload, self)._MakeDescription()

  def _ExtractViolations(self, details):
    """Extracts a map of violations from the given error's details.

    Args:
      details: JSON-parsed details field from parsed json of error.

    Returns:
      Map[str, str] sub -> error description. The iterator of it is ordered by
      the order the subjects first appear in the errror.
    """
    results = collections.OrderedDict()
    for detail in details:
      if 'violations' not in detail:
        continue
      violations = detail['violations']
      if not isinstance(violations, list):
        continue
      sub = detail.get('subject')
      for violation in violations:
        try:
          local_sub = violation.get('subject')
          subject = sub or local_sub
          if subject:
            if subject in results:
              results[subject] += '\n' + violation['description']
            else:
              results[subject] = violation['description']
        except (KeyError, TypeError):
          # If violation or description are the wrong type or don't exist.
          pass
    return results

  def _ExtractFieldViolations(self, details):
    """Extracts a map of field violations from the given error's details.

    Args:
      details: JSON-parsed details field from parsed json of error.

    Returns:
      Map[str, str] field (in dotted format) -> error description.
      The iterator of it is ordered by the order the fields first
      appear in the error.
    """
    results = collections.OrderedDict()
    for deet in details:
      if 'fieldViolations' not in deet:
        continue
      violations = deet['fieldViolations']
      if not isinstance(violations, list):
        continue
      f = deet.get('field')
      for viol in violations:
        try:
          local_f = viol.get('field')
          field = f or local_f
          if field:
            if field in results:
              results[field] += '\n' + viol['description']
            else:
              results[field] = viol['description']
        except (KeyError, TypeError):
          # If violation or description are the wrong type or don't exist.
          pass
    return results


class HttpException(core_exceptions.Error):
  """Transforms apitools HttpError to api_lib HttpException.

  Attributes:
    error: The original HttpError.
    error_format: An HttpErrorPayload format string.
    payload: The HttpErrorPayload object.
  """

  def __init__(self, error, error_format=None, payload_class=HttpErrorPayload):
    super(HttpException, self).__init__('')
    self.error = error
    self.error_format = error_format
    self.payload = payload_class(error)

  def __str__(self):
    error_format = self.error_format
    if error_format is None:
      error_prefix = '{message?}'
      if properties.VALUES.core.parse_error_details.GetBool():
        parsed_localized_messages = (
            '{type_details.LocalizedMessage'
            ':value(message.list(separator="\n"))?\n{?}}'
        )
        parsed_help_messages = (
            '{type_details.Help'
            ':value(links.flatten(show="values",separator="\n"))?\n{?}}'
        )
        unparsed_details = '{unparsed_details?\n{?}}'
        error_format = (
            error_prefix
            + parsed_localized_messages
            + parsed_help_messages
            + unparsed_details
        )
      else:
        error_format = error_prefix + '{details?\n{?}}'
      if log.GetVerbosity() <= logging.DEBUG:
        error_format += '{.debugInfo?\n{?}}'
    return _Expand(self.payload.format(_Escape(error_format)))

  @property
  def message(self):
    return six.text_type(self)

  def __hash__(self):
    return hash(self.message)

  def __eq__(self, other):
    if isinstance(other, HttpException):
      return self.message == other.message
    return False


def CatchHTTPErrorRaiseHTTPException(format_str=None):
  """Decorator that catches an HttpError and returns a custom error message.

  It catches the raw Http Error and runs it through the given format string to
  get the desired message.

  Args:
    format_str: An HttpErrorPayload format string. Note that any properties that
    are accessed here are on the HTTPErrorPayload object, and not the raw
    object returned from the server.

  Returns:
    A custom error message.

  Example:
    @CatchHTTPErrorRaiseHTTPException('Error [{status_code}]')
    def some_func_that_might_throw_an_error():
      ...
  """

  def CatchHTTPErrorRaiseHTTPExceptionDecorator(run_func):
    # Need to define a secondary wrapper to get an argument to the outer
    # decorator.
    def Wrapper(*args, **kwargs):
      try:
        return run_func(*args, **kwargs)
      except apitools_exceptions.HttpError as error:
        exc = HttpException(error, format_str)
        core_exceptions.reraise(exc)
    return Wrapper

  return CatchHTTPErrorRaiseHTTPExceptionDecorator