HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/394/lib/googlecloudsdk/command_lib/util/concepts/completers.py
# -*- coding: utf-8 -*- #
# Copyright 2018 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""completers for resource library."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import collections

from apitools.base.protorpclite import messages

from googlecloudsdk.api_lib.util import resource as resource_lib  # pylint: disable=unused-import
from googlecloudsdk.command_lib.util import completers
from googlecloudsdk.command_lib.util.apis import arg_utils
from googlecloudsdk.command_lib.util.apis import registry
from googlecloudsdk.command_lib.util.concepts import resource_parameter_info
from googlecloudsdk.core import exceptions
from googlecloudsdk.core import log
from googlecloudsdk.core import properties
from googlecloudsdk.core import resources

import six

DEFAULT_ID_FIELD = 'name'
_PROJECTS_COLLECTION = 'cloudresourcemanager.projects'
_PROJECT_ID_FIELD = 'projectId'


class Error(exceptions.Error):
  """Base error class for this module."""


class ParentTranslator(object):
  """Translates parent collections for completers.

  Attributes:
    collection: str, the collection name.
    param_translation: {str: str}, lookup from the params of the child
      collection to the params of the special parent collection. If None,
      then the collections match and translate methods are a no-op.
  """

  def __init__(self, collection, param_translation=None):
    self.collection = collection
    self.param_translation = param_translation or {}

  def ToChildParams(self, params):
    """Translate from original parent params to params that match the child."""
    if self.param_translation:
      for orig_param, new_param in six.iteritems(self.param_translation):
        params[orig_param] = params.get(new_param)
        del params[new_param]
    return params

  def MessageResourceMap(self, message, ref):
    """Get dict for translating parent params into the given message type."""
    message_resource_map = {}
    # Parse resource with any params in the translator that are needed for the
    # request.
    for orig_param, special_param in six.iteritems(self.param_translation):
      try:
        message.field_by_name(orig_param)
      # The field is not found, meaning that the original param isn't in the
      # message.
      except KeyError:
        continue
      message_resource_map[orig_param] = getattr(ref, special_param, None)
    return message_resource_map

  def Parse(self, parent_params, parameter_info, aggregations_dict):
    """Parse the parent resource from parameter info and aggregations.

    Args:
      parent_params: [str], a list of params in the current collection's parent
        collection.
      parameter_info: the runtime ResourceParameterInfo object.
      aggregations_dict: {str: str}, a dict of params to values that are
        being aggregated from earlier updates.

    Returns:
      resources.Resource | None, the parsed parent reference or None if there
        is not enough information to parse.
    """
    param_values = {
        self.param_translation.get(p, p): parameter_info.GetValue(p)
        for p in parent_params}
    for p, value in six.iteritems(aggregations_dict):
      translated_name = self.param_translation.get(p, p)
      if value and not param_values.get(translated_name, None):
        param_values[translated_name] = value
    try:
      return resources.Resource(
          resources.REGISTRY,
          collection_info=resources.REGISTRY.GetCollectionInfo(self.collection),
          subcollection='',
          param_values=param_values,
          endpoint_url=None)
    # Not all completion list calls may need to have a parent, so even if we
    # can't parse a parent, we log the error and attempt to send an update call
    # without one. (Any error returned by the API will be raised.)
    except resources.Error as e:
      log.info(six.text_type(e).rstrip())
      return None


# A map from parent params (in original resource parser order, joined with '.')
# to special collections. If the original params are different from the special
# collection, the param_translator is used to translate back and forth between
# the original params and the special collection.
_PARENT_TRANSLATORS = {
    'projectsId': ParentTranslator(_PROJECTS_COLLECTION,
                                   {'projectsId': _PROJECT_ID_FIELD}),
    'projectId': ParentTranslator(_PROJECTS_COLLECTION)}


class CollectionConfig(collections.namedtuple(
    'CollectionConfig',
    [
        # static params are used to build the List request when updating
        # the cache (equivalent to completion_request_params in AttributeConfig
        # objects)
        'static_params',
        # Configures the ID field that is used to parse the results of a List
        # request when updating the cache. Equivalent to completion_id_field
        # in AttributeConfig objects.
        'id_field',
        # Configures the param name for the completer.
        'param_name']
    )):
  """Stores data about special collections for configuring completion."""


# This maps special collections to configuration for CompleterInfo objects
# rather than using configuration from the parent resource's collection.
# Currently only covers projects.
_SPECIAL_COLLECTIONS_MAP = {
    _PROJECTS_COLLECTION: CollectionConfig({'filter': 'lifecycleState:ACTIVE'},
                                           _PROJECT_ID_FIELD,
                                           _PROJECT_ID_FIELD)}


class ResourceArgumentCompleter(completers.ResourceCompleter):
  """A completer for an argument that's part of a resource argument."""

  def __init__(self, resource_spec, collection_info, method,
               static_params=None, id_field=None, param=None, **kwargs):
    """Initializes."""
    self.resource_spec = resource_spec
    self._method = method
    self._static_params = static_params or {}
    self.id_field = id_field or DEFAULT_ID_FIELD
    collection_name = collection_info.full_name
    api_version = collection_info.api_version
    super(ResourceArgumentCompleter, self).__init__(
        collection=collection_name,
        api_version=api_version,
        param=param,
        parse_all=True,
        **kwargs)

  @property
  def method(self):
    """Gets the list method for the collection.

    Returns:
      googlecloudsdk.command_lib.util.apis.registry.APIMethod, the method.
    """
    return self._method

  def _ParentParams(self):
    """Get the parent params of the collection."""
    return self.collection_info.GetParams('')[:-1]

  def _GetUpdaters(self):
    """Helper function to build dict of updaters."""
    # Find the attribute that matches the final param of the collection for this
    # completer.
    final_param = self.collection_info.GetParams('')[-1]
    for i, attribute in enumerate(self.resource_spec.attributes):
      if self.resource_spec.ParamName(attribute.name) == final_param:
        attribute_idx = i
        break
    else:
      attribute_idx = 0

    updaters = {}
    for i, attribute in enumerate(
        self.resource_spec.attributes[:attribute_idx]):
      completer = CompleterForAttribute(self.resource_spec, attribute.name)
      if completer:
        updaters[self.resource_spec.ParamName(attribute.name)] = (completer,
                                                                  True)
      else:
        updaters[self.resource_spec.ParamName(attribute.name)] = (None,
                                                                  False)
    return updaters

  def ParameterInfo(self, parsed_args, argument):
    """Builds a ResourceParameterInfo object.

    Args:
      parsed_args: the namespace.
      argument: unused.

    Returns:
      ResourceParameterInfo, the parameter info for runtime information.
    """
    resource_info = parsed_args.CONCEPTS.ArgNameToConceptInfo(argument.dest)

    updaters = self._GetUpdaters()

    return resource_parameter_info.ResourceParameterInfo(
        resource_info, parsed_args, argument, updaters=updaters,
        collection=self.collection)

  def ValidateAttributeSources(self, aggregations):
    """Validates that parent attributes values exitst before making request."""
    parameters_needing_resolution = set([p.name for p in self.parameters[:-1]])
    resolved_parameters = set([a.name for a in aggregations])
    # attributes can also be resolved by completers
    for attribute in self.resource_spec.attributes:
      if CompleterForAttribute(self.resource_spec, attribute.name):
        resolved_parameters.add(
            self.resource_spec.attribute_to_params_map[attribute.name])
    return parameters_needing_resolution.issubset(resolved_parameters)

  def Update(self, parameter_info, aggregations):
    if self.method is None:
      return None

    if not self.ValidateAttributeSources(aggregations):
      return None

    log.info(
        'Cache query parameters={} aggregations={}'
        'resource info={}'.format(
            [(p, parameter_info.GetValue(p))
             for p in self.collection_info.GetParams('')],
            [(p.name, p.value) for p in aggregations],
            parameter_info.resource_info.attribute_to_args_map))
    parent_translator = self._GetParentTranslator(parameter_info, aggregations)
    try:
      query = self.BuildListQuery(parameter_info, aggregations,
                                  parent_translator=parent_translator)
    except Exception as e:  # pylint: disable=broad-except
      if properties.VALUES.core.print_completion_tracebacks.GetBool():
        raise
      log.info(six.text_type(e).rstrip())
      raise Error('Could not build query to list completions: {} {}'.format(
          type(e), six.text_type(e).rstrip()))
    try:
      response = self.method.Call(query)
      response_collection = self.method.collection
      items = [self._ParseResponse(r, response_collection,
                                   parameter_info=parameter_info,
                                   aggregations=aggregations,
                                   parent_translator=parent_translator)
               for r in response]
      log.info('cache items={}'.format(
          [i.RelativeName() for i in items]))
    except Exception as e:  # pylint: disable=broad-except
      if properties.VALUES.core.print_completion_tracebacks.GetBool():
        raise
      log.info(six.text_type(e).rstrip())
      # Give user more information if they hit an apitools validation error,
      # which probably means that they haven't provided enough information
      # for us to complete.
      if isinstance(e, messages.ValidationError):
        raise Error('Update query failed, may not have enough information to '
                    'list existing resources: {} {}'.format(
                        type(e), six.text_type(e).rstrip()))
      raise Error('Update query [{}]: {} {}'.format(
          query, type(e), six.text_type(e).rstrip()))
    return [self.StringToRow(item.RelativeName()) for item in items]

  def _ParseResponse(self, response, response_collection,
                     parameter_info=None, aggregations=None,
                     parent_translator=None):
    """Gets a resource ref from a single item in a list response."""
    param_values = self._GetParamValuesFromParent(
        parameter_info, aggregations=aggregations,
        parent_translator=parent_translator)
    param_names = response_collection.detailed_params
    for param in param_names:
      val = getattr(response, param, None)
      if val is not None:
        param_values[param] = val

    line = getattr(response, self.id_field, '')
    return resources.REGISTRY.Parse(
        line, collection=response_collection.full_name, params=param_values)

  def _GetParamValuesFromParent(self, parameter_info, aggregations=None,
                                parent_translator=None):
    parent_ref = self.GetParent(parameter_info, aggregations=aggregations,
                                parent_translator=parent_translator)
    if not parent_ref:
      return {}
    params = parent_ref.AsDict()
    if parent_translator:
      return parent_translator.ToChildParams(params)
    return params

  def _GetAggregationsValuesDict(self, aggregations):
    """Build a {str: str} dict of name to value for aggregations."""
    aggregations_dict = {}
    aggregations = [] if aggregations is None else aggregations
    for aggregation in aggregations:
      if aggregation.value:
        aggregations_dict[aggregation.name] = aggregation.value
    return aggregations_dict

  def BuildListQuery(self, parameter_info, aggregations=None,
                     parent_translator=None):
    """Builds a list request to list values for the given argument.

    Args:
      parameter_info: the runtime ResourceParameterInfo object.
      aggregations: a list of _RuntimeParameter objects.
      parent_translator: a ParentTranslator object if needed.

    Returns:
      The apitools request.
    """
    method = self.method
    if method is None:
      return None
    message = method.GetRequestType()()
    for field, value in six.iteritems(self._static_params):
      arg_utils.SetFieldInMessage(message, field, value)
    parent = self.GetParent(parameter_info, aggregations=aggregations,
                            parent_translator=parent_translator)
    if not parent:
      return message
    message_resource_map = {}

    if parent_translator:
      message_resource_map = parent_translator.MessageResourceMap(
          message, parent)

    arg_utils.ParseResourceIntoMessage(
        parent, method, message,
        message_resource_map=message_resource_map, is_primary_resource=True)
    return message

  def _GetParentTranslator(self, parameter_info, aggregations=None):
    """Get a special parent translator if needed and available."""
    aggregations_dict = self._GetAggregationsValuesDict(aggregations)
    param_values = self._GetRawParamValuesForParent(
        parameter_info, aggregations_dict=aggregations_dict)
    try:
      self._ParseDefaultParent(param_values)
      # If there's no error, we don't need a translator.
      return None
    except resources.ParentCollectionResolutionException:
      # Check the parent params against the _PARENT_TRANSLATORS dict, using the
      # parent params (joined by '.' in original resource parser order) as a
      # key.
      key = '.'.join(self._ParentParams())
      if key in _PARENT_TRANSLATORS:
        return _PARENT_TRANSLATORS.get(key)
    # Errors will be raised and logged later when actually parsing the parent.
    except resources.Error:
      return None

  def _GetRawParamValuesForParent(self, parameter_info, aggregations_dict=None):
    """Get raw param values for the resource in prep for parsing parent."""
    param_values = {p: parameter_info.GetValue(p) for p in self._ParentParams()}
    for name, value in six.iteritems(aggregations_dict or {}):
      if value and not param_values.get(name, None):
        param_values[name] = value
    final_param = self.collection_info.GetParams('')[-1]
    if param_values.get(final_param, None) is None:
      param_values[final_param] = 'fake'  # Stripped when we get the parent.
    return param_values

  def _ParseDefaultParent(self, param_values):
    """Parse the parent for a resource using default collection."""
    resource = resources.Resource(
        resources.REGISTRY,
        collection_info=self.collection_info,
        subcollection='',
        param_values=param_values,
        endpoint_url=None)
    return resource.Parent()

  def GetParent(self, parameter_info, aggregations=None,
                parent_translator=None):
    """Gets the parent reference of the parsed parameters.

    Args:
      parameter_info: the runtime ResourceParameterInfo object.
      aggregations: a list of _RuntimeParameter objects.
      parent_translator: a ParentTranslator for translating to a special
        parent collection, if needed.

    Returns:
      googlecloudsdk.core.resources.Resource | None, the parent resource or None
        if no parent was found.
    """
    aggregations_dict = self._GetAggregationsValuesDict(aggregations)
    param_values = self._GetRawParamValuesForParent(
        parameter_info, aggregations_dict=aggregations_dict)
    try:
      if not parent_translator:
        return self._ParseDefaultParent(param_values)
      return parent_translator.Parse(self._ParentParams(), parameter_info,
                                     aggregations_dict)
    except resources.ParentCollectionResolutionException as e:
      # We don't know the parent collection.
      log.info(six.text_type(e).rstrip())
      return None
    # No resource could be parsed.
    except resources.Error as e:
      log.info(six.text_type(e).rstrip())
      return None

  def __eq__(self, other):
    """Overrides."""
    # Not using type(self) because the class is created programmatically.
    if not isinstance(other, ResourceArgumentCompleter):
      return False
    return (self.resource_spec == other.resource_spec and
            self.collection == other.collection and
            self.method == other.method)


def _MatchCollection(resource_spec, attribute):
  """Gets the collection for an attribute in a resource."""
  resource_collection_info = resource_spec._collection_info  # pylint: disable=protected-access
  resource_collection = registry.APICollection(
      resource_collection_info)
  if resource_collection is None:
    return None
  if attribute == resource_spec.attributes[-1]:
    return resource_collection.name
  attribute_idx = resource_spec.attributes.index(attribute)
  api_name = resource_collection_info.api_name
  resource_collections = registry.GetAPICollections(
      api_name,
      resource_collection_info.api_version)
  params = resource_collection.detailed_params[:attribute_idx + 1]
  for c in resource_collections:
    if c.detailed_params == params:
      return c.name


def _GetCompleterCollectionInfo(resource_spec, attribute):
  """Gets collection info for an attribute in a resource."""
  api_version = None
  collection = _MatchCollection(resource_spec, attribute)
  if collection:
    # pylint: disable=protected-access
    full_collection_name = (
        resource_spec._collection_info.api_name + '.' + collection)
    api_version = resource_spec._collection_info.api_version
  # The CloudResourceManager projects collection can be used for "synthetic"
  # project resources that don't have their own method.
  elif attribute.name == 'project':
    full_collection_name = 'cloudresourcemanager.projects'
  else:
    return None
  return resources.REGISTRY.GetCollectionInfo(full_collection_name,
                                              api_version=api_version)


class CompleterInfo(object):
  """Holds data that can be used to instantiate a resource completer."""

  def __init__(self, static_params=None, id_field=None, collection_info=None,
               method=None, param_name=None):
    self.static_params = static_params
    self.id_field = id_field
    self.collection_info = collection_info
    self.method = method
    self.param_name = param_name

  @classmethod
  def FromResource(cls, resource_spec, attribute_name):
    """Gets the method, param_name, and other configuration for a completer.

    Args:
      resource_spec: concepts.ResourceSpec, the overall resource.
      attribute_name: str, the name of the attribute whose argument will use
        this completer.

    Raises:
      AttributeError: if the attribute doesn't belong to the resource.

    Returns:
      CompleterInfo, the instantiated object.
    """
    for a in resource_spec.attributes:
      if a.name == attribute_name:
        attribute = a
        break
    else:
      raise AttributeError(
          'Attribute [{}] not found in resource.'.format(attribute_name))
    param_name = resource_spec.ParamName(attribute_name)
    static_params = attribute.completion_request_params
    id_field = attribute.completion_id_field
    collection_info = _GetCompleterCollectionInfo(resource_spec, attribute)
    if collection_info.full_name in _SPECIAL_COLLECTIONS_MAP:
      special_info = _SPECIAL_COLLECTIONS_MAP.get(collection_info.full_name)
      method = registry.GetMethod(collection_info.full_name, 'list')
      static_params = special_info.static_params
      id_field = special_info.id_field
      param_name = special_info.param_name
    if not collection_info:
      return CompleterInfo(static_params, id_field, None, None, param_name)
    # If there is no appropriate list method for the collection, we can't auto-
    # create a completer.
    try:
      method = registry.GetMethod(
          collection_info.full_name, 'list',
          api_version=collection_info.api_version)
    except registry.UnknownMethodError:
      if (collection_info.full_name != _PROJECTS_COLLECTION
          and collection_info.full_name.split('.')[-1] == 'projects'):
        # The CloudResourceManager projects methods can be used for "synthetic"
        # project resources that don't have their own method.
        # This is a bit of a hack, so if any resource arguments come up for
        # which this doesn't work, a toggle should be added to the
        # ResourceSpec class to disable this.
        # Does not use param_name from the special collections map because
        # the collection exists with the current params, it's just the list
        # method that we're borrowing.
        special_info = _SPECIAL_COLLECTIONS_MAP.get(_PROJECTS_COLLECTION)
        method = registry.GetMethod(_PROJECTS_COLLECTION, 'list')
        static_params = special_info.static_params
        id_field = special_info.id_field
      else:
        method = None
    except registry.Error:
      method = None
    return CompleterInfo(static_params, id_field, collection_info, method,
                         param_name)

  def GetMethod(self):
    """Get the APIMethod for an attribute in a resource."""
    return self.method


def CompleterForAttribute(resource_spec, attribute_name):
  """Gets a resource argument completer for a specific attribute."""

  class Completer(ResourceArgumentCompleter):
    """A specific completer for this attribute and resource."""

    def __init__(self, resource_spec=resource_spec,
                 attribute_name=attribute_name, **kwargs):
      completer_info = CompleterInfo.FromResource(resource_spec, attribute_name)

      super(Completer, self).__init__(
          resource_spec,
          completer_info.collection_info,
          completer_info.method,
          static_params=completer_info.static_params,
          id_field=completer_info.id_field,
          param=completer_info.param_name,
          **kwargs)

    @classmethod
    def validate(cls):
      """Checks whether the completer is valid (has a list method)."""
      return bool(
          CompleterInfo.FromResource(resource_spec, attribute_name).GetMethod())

  if not Completer.validate():
    return None

  return Completer