HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/command_lib/util/apis/arg_marshalling.py
# -*- coding: utf-8 -*- #
# Copyright 2017 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Classes that generate and parse arguments for apitools messages."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import collections

from apitools.base.protorpclite import messages
from googlecloudsdk.calliope import base
from googlecloudsdk.calliope.concepts import util as resource_util
from googlecloudsdk.command_lib.util.apis import arg_utils
from googlecloudsdk.command_lib.util.apis import update
from googlecloudsdk.command_lib.util.apis import yaml_arg_schema
from googlecloudsdk.command_lib.util.apis import yaml_command_schema
from googlecloudsdk.command_lib.util.apis import yaml_command_schema_util as util
from googlecloudsdk.command_lib.util.args import labels_util
from googlecloudsdk.core import resources
from googlecloudsdk.core.resource import resource_property


class Error(Exception):
  """Base class for this module's exceptions."""


class ConflictingResourcesError(Error):
  """Error for whenever api method / primary resource cannot be determined."""


def _GetLabelsClass(message, api_field):
  return arg_utils.GetFieldFromMessage(message, api_field).type


def _ParseLabelsIntoCreateMessage(message, args, api_field):
  labels_cls = _GetLabelsClass(message, api_field)
  labels_field = labels_util.ParseCreateArgs(args, labels_cls)
  arg_utils.SetFieldInMessage(message, api_field, labels_field)


def _AddLabelsToUpdateMask(static_field, update_mask_path):
  if (update_mask_path not in static_field) or (
      not static_field[update_mask_path]):
    static_field[update_mask_path] = 'labels'
    return

  if 'labels' in static_field[update_mask_path].split(','):
    return

  static_field[
      update_mask_path] = static_field[update_mask_path] + ',' + 'labels'


def _RetrieveFieldValueFromMessage(message, api_field):
  path = api_field.split('.')
  for field_name in path:
    try:
      message = getattr(message, field_name)
    except AttributeError:
      raise AttributeError(
          'The message does not have field specified in {}.'.format(api_field))
  return message


def _ParseLabelsIntoUpdateMessage(message, args, api_field):
  """Find diff between existing labels and args, set labels into the message."""
  diff = labels_util.Diff.FromUpdateArgs(args)
  # Do nothing if 'labels' arguments weren't specified.
  if not diff.MayHaveUpdates():
    return False
  existing_labels = _RetrieveFieldValueFromMessage(message, api_field)
  label_cls = _GetLabelsClass(message, api_field)
  update_result = diff.Apply(label_cls, existing_labels)
  if update_result.needs_update:
    arg_utils.SetFieldInMessage(message, api_field, update_result.labels)
  return True


def _GetResources(params):
  """Retrieves all resource args from the arg_info tree.

  Args:
    params: an ArgGroup or list of args to parse through.

  Returns:
    YAMLConceptArgument (resource arg) list.
  """
  if isinstance(params, yaml_arg_schema.YAMLConceptArgument):
    return [params]
  if isinstance(params, yaml_arg_schema.Argument):
    return []
  if isinstance(params, yaml_arg_schema.ArgumentGroup):
    params = params.arguments

  result = []
  for param in params:
    result.extend(_GetResources(param))

  return result


def _GetPrimaryResource(resource_params, resource_collection):
  """Retrieves the primary resource arg.

  Args:
    resource_params: list of YAMLConceptParser
    resource_collection: registry.APICollection, resource collection
      associated with method

  Returns:
    YAMLConceptArgument (resource arg) or None.
  """

  # No resource params occurs if resource args are added through a hook.
  if not resource_params:
    return None

  primary_resources = [
      arg for arg in resource_params
      if arg.IsPrimaryResource(resource_collection)]

  if not primary_resources:
    if resource_collection:
      full_name = resource_collection.full_name
      api_version = resource_collection.api_version
    else:
      full_name = None
      api_version = None

    raise util.InvalidSchemaError(
        'No resource args were found that correspond with [{name} {version}]. '
        'Add resource arguments that corresponds with request.method '
        'collection [{name} {version}]. HINT: Can set resource arg '
        'is_primary_resource to True in yaml schema to receive more assistance '
        'with validation.'.format(
            name=full_name, version=api_version))

  if len(primary_resources) > 1:
    primary_resource_names = [arg.name for arg in primary_resources]
    raise util.InvalidSchemaError(
        'Only one resource arg can be listed as primary. Remove one of the '
        'primary resource args [{}] or set is_primary_resource to False in '
        'yaml schema.'.format(', '.join(primary_resource_names)))

  return primary_resources[0]


def _GetMethodResourceArgs(resource_args, methods):
  """Gets list of primary resource args and methods associated with them.

  Args:
    resource_args: list[YAMLConceptArg], list of potential primary resource
      args
    methods: list[registry.APIMethod], The method to generate arguments for.

  Returns:
    list[YAMLMethod] (resource arg) or None.
  """
  args = resource_args
  # Handle methodless commands with primary resource arg
  if not methods and (primary := _GetPrimaryResource(args, None)):
    return [MethodResourceArg(primary_resource=primary, method=None)]

  yaml_methods = []
  for method in methods:
    resource_arg = _GetPrimaryResource(
        args, method.resource_argument_collection)
    yaml_methods.append(MethodResourceArg(resource_arg, method))
  return yaml_methods


def _NormalizeNames(attributes):
  return [resource_util.NormalizeFormat(attr) for attr in attributes]


def _DoesDupResourceArgHaveSameAttributes(resource, resource_params):
  """Verify if there is a duplicated resource argument with the same attributes.

  Args:
    resource: yaml_arg_schema.Argument, resource to be verified.
    resource_params: [yaml_arg_schema.Argument], list to check duplicate.

  Returns:
    True if there is a duplicate resource arg in the list with same attributes.
  """
  for res_arg in resource_params:
    if res_arg != resource and res_arg.name == resource.name:
      # Normalize the attribute names to account for positional
      # and non-positional.
      return(_NormalizeNames(res_arg.attribute_names) ==
             _NormalizeNames(resource.attribute_names))
  return True


def _GetSharedFlags(resource_params):
  """Retrieves shared attributes between resource args.

  Args:
    resource_params: [yaml_arg_schema.Argument], yaml argument tree

  Returns:
    Map of attribute names to list of resources that contain that attribute.
  """
  resource_names = set()
  flags = collections.defaultdict(list)
  for arg in resource_params:
    # Presentation name is used to register the resource arg ie
    # arg.CONCEPTS.presentation_name.Parse() retrieves and parses CLI input.
    arg_name = arg.presentation_name
    if arg_name in resource_names:
      # If we found a duplicate resource arg, make sure it has same attributes.
      # If it has different attributes, current resource arg will override
      # previous one.
      if (
          arg_name in resource_names
          and not _DoesDupResourceArgHaveSameAttributes(arg, resource_params)
      ):
        raise util.InvalidSchemaError(
            'More than one resource argument has the name [{}] with different '
            'attributes. Remove the duplicate resource declarations.'.format(
                arg_name
            )
        )
    else:
      resource_names.add(arg_name)

    # iterate thorugh attributes flags
    for flag_name in arg.attribute_to_flag_map.values():
      if flag_name not in arg.ignored_flags:
        flags[flag_name].append(arg_name)

  # Shared attributes: attribute entries with more than 1 resource args.
  return {
      flag_name: resource_args
      for flag_name, resource_args in flags.items()
      if len(resource_args) > 1
  }


def _GetCollectionName(method, is_parent=False):
  collection_name = method.resource_argument_collection.full_name
  if is_parent:
    resource_collection, _, _ = collection_name.rpartition('.')
  else:
    resource_collection = collection_name
  return resource_collection


class MethodResourceArg:
  """Method and the resource argument associated with it."""

  def __init__(self, primary_resource, method):
    self.primary_resource = primary_resource
    self.method = method

  def Parse(self, namespace):
    if self.primary_resource:
      return self.primary_resource.ParseResourceArg(namespace)
    else:
      return None


class DeclarativeArgumentGenerator(object):
  """An argument generator that operates off a declarative configuration.

  When using this generator, you must provide attributes for the arguments that
  should be generated. All resource arguments must be provided and arguments
  will only be generated for API fields for which attributes were provided.
  """

  def __init__(self, arg_info):
    """Creates a new Argument Generator.

    Args:
      arg_info: [yaml_arg_schema.Argument], Information about
        request fields and how to map them into arguments.
    """
    self.arg_info = arg_info
    self.resource_args = _GetResources(self.arg_info)

  def GenerateArgs(self, methods):
    """Generates all the CLI arguments required to call this method.

    Args:
      methods: list[APIMethod], list of methods to generate arguments for.

    Returns:
      {str, calliope.base.Action}, A map of field name to the argument.
    """
    shared_flag_resource_dict = _GetSharedFlags(self.resource_args)
    shared_resource_flag_list = list(shared_flag_resource_dict)

    args = [arg.Generate(methods, shared_resource_flag_list)
            for arg in self.arg_info]

    primary_resource_args = _GetMethodResourceArgs(self.resource_args, methods)
    primary_names = set(
        arg.primary_resource and arg.primary_resource.name
        for arg in primary_resource_args)

    for flag_name, resource_args in shared_flag_resource_dict.items():
      resource_names = list(set(resource_args))
      resource_names.sort(
          key=lambda name: '' if name in primary_names else name)

      args.append(base.Argument(
          flag_name,
          help='For resources [{}], provides fallback value for resource '
               '{attr} attribute. When the resource\'s full URI path is not '
               'provided, {attr} will fallback to this flag value.'.format(
                   ', '.join(resource_names),
                   attr=resource_util.StripPrefix(flag_name))))

    return args

  def GetPrimaryResource(self, methods, namespace):
    """Gets primary resource based on user input and returns single method.

    This determines which api method to use to make api request. If there
    is only one potential request method, return the one request method.

    Args:
      methods: list[APIMethod], The method to generate arguments for.
      namespace: The argparse namespace.

    Returns:
      MethodResourceArg, gets the primary resource arg and method the
        user specified in the namespace.

    Raises:
      ConflictingResourcesError: occurs when user specifies too many primary
        resources.
    """

    specified_methods = []
    primary_resources = _GetMethodResourceArgs(self.resource_args, methods)
    # Do not need to look at user specified args if there is only one primary
    # resource arg or method.
    if not primary_resources:
      return MethodResourceArg(primary_resource=None, method=None)
    elif len(primary_resources) == 1:
      return primary_resources.pop()

    for method_info in primary_resources:
      method = method_info.method
      primary_resource = method_info.primary_resource

      # A primary resource can be None if added to a hook. If more than one
      # collection is specified, we require that a primary resource is added.
      # Otherwise, we cannot evaluate which method to use.
      if not method or not primary_resource:
        raise util.InvalidSchemaError(
            'If more than one request collection is specified, a resource '
            'argument that corresponds with the collection, must be '
            'specified in YAML command.'
        )

      method_collection = _GetCollectionName(
          method, is_parent=primary_resource.is_parent_resource)

      specified_resource = method_info.Parse(namespace)
      primary_collection = (
          specified_resource and
          specified_resource.GetCollectionInfo().full_name)

      if method_collection == primary_collection:
        specified_methods.append(method_info)

    if len(specified_methods) > 1:
      uris = []
      for method_info in specified_methods:
        if parsed := method_info.Parse(namespace):
          uris.append(parsed.RelativeName())

      args = ', '.join(uris)
      raise ConflictingResourcesError(
          f'User specified multiple primary resource arguments: [{args}]. '
          'Unable to determine api request method.')

    if len(specified_methods) == 1:
      return specified_methods.pop()
    else:
      return MethodResourceArg(primary_resource=None, method=None)

  def CreateRequest(self,
                    namespace,
                    method,
                    static_fields=None,
                    labels=None,
                    command_type=None,
                    existing_message=None):
    """Generates the request object for the method call from the parsed args.

    Args:
      namespace: The argparse namespace.
      method: APIMethod, api method used to make request message.
      static_fields: {str, value}, A mapping of API field name to value to
        insert into the message. This is a convenient way to insert extra data
        while the request is being constructed for fields that don't have
        corresponding arguments.
      labels: The labels section of the command spec.
      command_type: Type of the command, i.e. CREATE, UPDATE.
      existing_message: the apitools message returned from server, which is used
        to construct the to-be-modified message when the command follows
        get-modify-update pattern.

    Returns:
      The apitools message to be send to the method.
    """
    new_message = method.GetRequestType()()

    # If an apitools message is provided, use the existing one by default
    # instead of creating an empty one.
    if existing_message:
      message = arg_utils.ParseExistingMessageIntoMessage(
          new_message, existing_message, method)
    else:
      message = new_message

    # Add labels into message
    if labels:
      if command_type == yaml_command_schema.CommandType.CREATE:
        _ParseLabelsIntoCreateMessage(message, namespace, labels.api_field)
      elif command_type == yaml_command_schema.CommandType.UPDATE:
        need_update = _ParseLabelsIntoUpdateMessage(message, namespace,
                                                    labels.api_field)
        if need_update:
          update_mask_path = update.GetMaskFieldPath(method)
          _AddLabelsToUpdateMask(static_fields, update_mask_path)

    # Insert static fields into message.
    arg_utils.ParseStaticFieldsIntoMessage(message, static_fields=static_fields)

    # Parse api Fields into message.
    for arg in self.arg_info:
      arg.Parse(method, message, namespace)

    return message

  def GetResponseResourceRef(self, id_value, namespace, method):
    """Gets a resource reference for a resource returned by a list call.

    It parses the namespace to find a reference to the parent collection and
    then creates a reference to the child resource with the given id_value.

    Args:
      id_value: str, The id of the child resource that was returned.
      namespace: The argparse namespace.
      method: APIMethod, method used to make the api request

    Returns:
      resources.Resource, The parsed resource reference.
    """
    methods = [method] if method else []
    parent_ref = self.GetPrimaryResource(methods, namespace).Parse(namespace)
    return resources.REGISTRY.Parse(
        id_value,
        collection=method.collection.full_name,
        api_version=method.collection.api_version,
        params=parent_ref.AsDict())

  def Limit(self, namespace):
    """Gets the value of the limit flag (if present)."""
    return getattr(namespace, 'limit', None)

  def PageSize(self, namespace):
    """Gets the value of the page size flag (if present)."""
    return getattr(namespace, 'page_size', None)


class AutoArgumentGenerator(object):
  """An argument generator to generate arguments for all fields in a message.

  When using this generator, you don't provide any manual configuration for
  arguments, it is all done automatically based on the request messages.

  There are two modes for this generator. In 'raw' mode, no modifications are
  done at all to the generated fields. In normal mode, certain list fields are
  not generated and instead our global list flags are used (and orchestrate
  the proper API fields automatically). In both cases, we generate additional
  resource arguments for path parameters.
  """
  FLAT_RESOURCE_ARG_NAME = 'resource'
  IGNORABLE_LIST_FIELDS = {'filter', 'pageToken', 'orderBy'}

  def __init__(self, method, raw=False):
    """Creates a new Argument Generator.

    Args:
      method: APIMethod, The method to generate arguments for.
      raw: bool, True to do no special processing of arguments for list
        commands. If False, typical List command flags will be added in and the
        equivalent API fields will be ignored.
    """
    self.method = method
    self.raw = raw
    self.is_atomic = self.method.detailed_params != self.method.params

    self.ignored_fields = set()
    if not raw and self.method.HasTokenizedRequest():
      self.ignored_fields |= AutoArgumentGenerator.IGNORABLE_LIST_FIELDS
      batch_page_size_field = self.method.BatchPageSizeField()
      if batch_page_size_field:
        self.ignored_fields.add(batch_page_size_field)

  def GenerateArgs(self):
    """Generates all the CLI arguments required to call this method.

    Returns:
      {str, calliope.base.Action}, A map of field name to the argument.
    """
    seen = set()
    args = []

    def _UpdateArgs(arguments):
      """Update args."""
      for arg in arguments:
        try:
          name = arg.name
        except IndexError:
          # An argument group does not have a name.
          pass
        else:
          if name in seen:
            continue
          seen.add(name)
        args.append(arg)

    # NOTICE: The call order is significant. Duplicate arg names are possible.
    # The first of the duplicate args entered wins.
    _UpdateArgs(self._GenerateResourceArg())
    _UpdateArgs(self._GenerateArguments('', self.method.GetRequestType()))
    _UpdateArgs(self._GenerateListMethodFlags())

    return args

  def CreateRequest(self, namespace):
    """Generates the request object for the method call from the parsed args.

    Args:
      namespace: The argparse namespace.

    Returns:
      The apitools message to be send to the method.
    """
    request_type = self.method.GetRequestType()
    # Recursively create the message and sub-messages.
    fields = self._ParseArguments(namespace, '', request_type)

    # For each actual method path field, add the attribute to the request.
    ref = self._ParseResourceArg(namespace)
    if ref:
      relative_name = ref.RelativeName()
      fields.update({f: getattr(ref, f, relative_name)
                     for f in self.method.params})
    return request_type(**fields)

  def Limit(self, namespace):
    """Gets the value of the limit flag (if present)."""
    if not self.raw:
      return getattr(namespace, 'limit', None)
    else:
      return None

  def PageSize(self, namespace):
    """Gets the value of the page size flag (if present)."""
    if not self.raw:
      return getattr(namespace, 'page_size', None)
    else:
      return None

  def _GenerateListMethodFlags(self):
    """Generates all the CLI flags for a List command.

    Returns:
      {str, calliope.base.Action}, A map of field name to the argument.
    """
    flags = []
    if not self.raw and self.method.IsList():
      flags.append(base.FILTER_FLAG)
      flags.append(base.SORT_BY_FLAG)
      if self.method.HasTokenizedRequest() and self.method.ListItemField():
        # We can use YieldFromList() with a limit.
        flags.append(base.LIMIT_FLAG)
        if self.method.BatchPageSizeField():
          # API supports page size.
          flags.append(base.PAGE_SIZE_FLAG)
    return flags

  def _GenerateArguments(self, prefix, message):
    """Gets the arguments to add to the parser that appear in the method body.

    Args:
      prefix: str, A string to prepend to the name of the flag. This is used
        for flags representing fields of a submessage.
      message: The apitools message to generate the flags for.

    Returns:
      {str, calliope.base.Argument}, A map of field name to argument.
    """
    args = []
    field_helps = arg_utils.FieldHelpDocs(message)
    for field in message.all_fields():
      field_help = field_helps.get(field.name, None)
      name = self._GetArgName(field.name, field_help)
      if not name:
        continue
      name = prefix + name
      if field.variant == messages.Variant.MESSAGE:
        sub_args = self._GenerateArguments(name + '.', field.type)
        if sub_args:
          help_text = (name + ': ' + field_help) if field_help else ''
          group = base.ArgumentGroup(help=help_text)
          args.append(group)
          for arg in sub_args:
            group.AddArgument(arg)
      else:
        attributes = yaml_arg_schema.Argument(name, name, field_help)
        arg = arg_utils.GenerateFlag(field, attributes, fix_bools=False,
                                     category='MESSAGE')
        if not arg.kwargs.get('help'):
          arg.kwargs['help'] = 'API doc needs help for field [{}].'.format(name)
        args.append(arg)
    return args

  def _GenerateResourceArg(self):
    """Gets the flags to add to the parser that appear in the method path.

    Returns:
      {str, calliope.base.Argument}, A map of field name to argument.
    """
    args = []
    field_names = (self.method.request_collection.detailed_params
                   if self.method.request_collection else None)
    if not field_names:
      return args
    field_helps = arg_utils.FieldHelpDocs(self.method.GetRequestType())
    default_help = 'For substitution into: ' + self.method.detailed_path

    # Make a dedicated positional in addition to the flags for each part of
    # the URI path.
    arg = base.Argument(
        AutoArgumentGenerator.FLAT_RESOURCE_ARG_NAME,
        nargs='?',
        help='The GRI for the resource being operated on.')
    args.append(arg)

    for field in field_names:
      arg = base.Argument(
          '--' + field,
          metavar=resource_property.ConvertToAngrySnakeCase(field),
          category='RESOURCE',
          help=field_helps.get(field, default_help))
      args.append(arg)
    return args

  def _ParseArguments(self, namespace, prefix, message):
    """Recursively generates data for the request message and any sub-messages.

    Args:
      namespace: The argparse namespace containing the all the parsed arguments.
      prefix: str, The flag prefix for the sub-message being generated.
      message: The apitools class for the message.

    Returns:
      A dict of message field data that can be passed to an apitools Message.
    """
    kwargs = {}
    for field in message.all_fields():
      arg_name = self._GetArgName(field.name)
      if not arg_name:
        continue
      arg_name = prefix + arg_name
      # Field is a sub-message, recursively generate it.
      if field.variant == messages.Variant.MESSAGE:
        sub_kwargs = self._ParseArguments(namespace, arg_name + '.', field.type)
        if sub_kwargs:
          # Only construct the sub-message if we have something to put in it.
          value = field.type(**sub_kwargs)
          kwargs[field.name] = value if not field.repeated else [value]
      # Field is a scalar, just get the value.
      else:
        value = arg_utils.GetFromNamespace(namespace, arg_name)
        if value is not None:
          kwargs[field.name] = arg_utils.ConvertValue(field, value)
    return kwargs

  def _ParseResourceArg(self, namespace):
    """Gets the resource ref for the resource specified as the positional arg.

    Args:
      namespace: The argparse namespace.

    Returns:
      The parsed resource ref or None if no resource arg was generated for this
      method.
    """
    field_names = (self.method.request_collection.detailed_params
                   if self.method.request_collection else None)
    if not field_names:
      return
    r = getattr(namespace, AutoArgumentGenerator.FLAT_RESOURCE_ARG_NAME)
    enforce_collection = getattr(namespace, 'enforce_collection', True)

    params = {}
    defaults = {}
    for f in field_names:
      value = getattr(namespace, f)
      if value:
        params[f] = value
      else:
        default = arg_utils.DEFAULT_PARAMS.get(f, lambda: None)()
        if default:
          defaults[f] = default

    if not r and not params and len(defaults) < len(field_names):
      # No values were explicitly given and there are not enough defaults for
      # the parse to work.
      return None

    defaults.update(params)
    return resources.REGISTRY.Parse(
        r, collection=self.method.request_collection.full_name,
        enforce_collection=enforce_collection,
        api_version=self.method.request_collection.api_version,
        params=defaults)

  def _GetArgName(self, field_name, field_help=None):
    """Gets the name of the argument to generate for the field.

    Args:
      field_name: str, The name of the field.
      field_help: str, The help for the field in the API docs.

    Returns:
      str, The name of the argument to generate, or None if this field is output
      only or should be ignored.
    """
    if field_help and arg_utils.IsOutputField(field_help):
      return None
    if field_name in self.ignored_fields:
      return None
    if (field_name == self.method.request_field and
        field_name.lower().endswith('request')):
      return 'request'
    return field_name