HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/command_lib/util/apis/update_args.py
# -*- coding: utf-8 -*- #
# Copyright 2023 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilities for creating/parsing update argument groups."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import abc
import enum

from googlecloudsdk.calliope import arg_parsers
from googlecloudsdk.calliope import arg_parsers_usage_text as usage_text
from googlecloudsdk.calliope import base
from googlecloudsdk.calliope.concepts import util as format_util
from googlecloudsdk.command_lib.util.apis import arg_utils
from googlecloudsdk.command_lib.util.apis import yaml_command_schema_util as util
import six


# TODO(b/280653078) The UX is still under review. These utilities are
# liable to change and should not be used in new surface yet.

# TODO(b/283949482): Place this file in util/args and replace the duplicate
# logic in the util files.


class Prefix(enum.Enum):
  ADD = 'add'
  UPDATE = 'update'
  REMOVE = 'remove'
  CLEAR = 'clear'


class _ConvertValueType(usage_text.DefaultArgTypeWrapper):
  """Wraps flag types in arg_utils.ConvertValue while maintaining help text.

  Attributes:
    arg_gen: UpdateBasicArgumentGenerator, update argument generator
  """

  def __init__(self, arg_gen):
    super(_ConvertValueType, self).__init__(arg_gen.flag_type)
    self.field = arg_gen.field
    self.repeated = arg_gen.repeated
    self.processor = arg_gen.processor
    self.choices = arg_gen.choices

  def __call__(self, arg_value):
    """Converts arg_value into type arg_type."""
    value = self.arg_type(arg_value)
    return arg_utils.ConvertValue(
        self.field,
        value,
        repeated=self.repeated,
        processor=self.processor,
        choices=util.Choice.ToChoiceMap(self.choices),
    )


class UpdateArgumentGenerator(six.with_metaclass(abc.ABCMeta, object)):
  """Update flag generator.

  To use this base class, provide required methods for parsing
  (GetArgFromNamespace and GetFieldValueFromNamespace) and override
  the flags that are needed to update the value. For example, if argument
  group requires a set flag, we would override the `set_arg` property and
  ApplySetFlag method.
  """

  def _GetTextFormatOfEmptyValue(self, value):
    if value:
      return value

    if isinstance(value, dict):
      return 'empty map'
    if isinstance(value, list):
      return 'empty list'
    if value is None:
      return 'null'

    return value

  def _CreateFlag(
      self, arg_name, flag_prefix=None, flag_type=None, action=None,
      metavar=None, help_text=None
  ):
    """Creates a flag.

    Args:
      arg_name: str, root name of the arg
      flag_prefix: Prefix | None, prefix for the flag name
      flag_type: func, type that flag is used to convert user input
      action: str, flag action
      metavar: str, user specified metavar for flag
      help_text: str, flag help text

    Returns:
      base.Argument with correct params
    """
    flag_name = arg_utils.GetFlagName(
        arg_name, flag_prefix and flag_prefix.value)
    arg = base.Argument(flag_name, action=action, help=help_text)

    if action == 'store_true':
      return arg

    arg.kwargs['type'] = flag_type
    if flag_metavar := arg_utils.GetMetavar(metavar, flag_type, flag_name):
      arg.kwargs['metavar'] = flag_metavar
    return arg

  # DEFAULT FLAGS GENERATED
  @property
  def set_arg(self):
    """Flag that sets field to specifed value."""
    return None

  @property
  def clear_arg(self):
    """Flag that clears field."""
    return None

  @property
  def update_arg(self):
    """Flag that updates value if part of existing field."""
    return None

  @property
  def remove_arg(self):
    """Flag that removes value if part of existing field."""
    return None

  def Generate(self, additional_flags=None):
    """Returns ArgumentGroup with all flags specified in generator.

    ArgumentGroup is returned where the set flag is mutually exclusive with
    the rest of the update flags. In addition, remove and clear flags are
    mutually exclusive. The following combinations are allowed

    # sets the foo value to value1,value2
    {command} --foo=value1,value2

    # adds values value3
    {command} --add-foo=value3

    # clears values and sets foo to value4,value5
    {command} --add-foo=value4,value5 --clear

    # removes value4 and adds value6
    {command} --add-foo=value6 --remove-foo=value4

    # removes value6 and then re-adds it
    {command} --add-foo=value6 --remove-foo=value6

    Args:
      additional_flags: [base.Argument], list of additional arguments needed
        to udpate the value

    Returns:
      base.ArgumentGroup, argument group containing flags
    """
    base_group = base.ArgumentGroup(
        mutex=True,
        required=False,
        hidden=self.is_hidden,
        help='Update {}.'.format(self.arg_name),
    )
    if self.set_arg:
      base_group.AddArgument(self.set_arg)

    update_group = base.ArgumentGroup(required=False)
    if self.update_arg:
      update_group.AddArgument(self.update_arg)

    clear_group = base.ArgumentGroup(mutex=True, required=False)
    if self.clear_arg:
      clear_group.AddArgument(self.clear_arg)
    if self.remove_arg:
      clear_group.AddArgument(self.remove_arg)

    if clear_group.arguments:
      update_group.AddArgument(clear_group)
    if update_group.arguments:
      base_group.AddArgument(update_group)

    if not additional_flags:
      return base_group

    wrapper_group = base.ArgumentGroup(
        required=False,
        hidden=self.is_hidden,
        help='All arguments needed to update {}.'.format(self.arg_name),
    )
    wrapper_group.AddArgument(base_group)
    for arg in additional_flags:
      wrapper_group.AddArgument(arg)
    return wrapper_group

  # METHODS REQUIRED FOR PARSING NEW VALUE
  @abc.abstractmethod
  def GetArgFromNamespace(self, namespace, arg):
    """Retrieves namespace value associated with flag.

    Args:
      namespace: The parsed command line argument namespace.
      arg: base.Argument, used to get namespace value

    Returns:
      value parsed from namespace
    """
    pass

  @abc.abstractmethod
  def GetFieldValueFromMessage(self, existing_message):
    """Retrieves existing field from message.

    Args:
      existing_message: apitools message we need to get field value from

    Returns:
      field value from apitools message
    """
    pass

  # DEFAULTED METHODS FOR PARSING NEW VALUE
  def ApplySetFlag(self, existing_val, unused_set_val):
    """Updates result to new value (No-op: implementation in subclass)."""
    return existing_val

  def ApplyClearFlag(self, existing_val, unused_clear_flag):
    """Clears existing value (No-op: implementation in subclass)."""
    return existing_val

  def ApplyRemoveFlag(self, existing_val, unused_remove_val):
    """Removes existing value (No-op: implementation in subclass)."""
    return existing_val

  def ApplyUpdateFlag(self, existing_val, unused_update_val):
    """Updates existing value (No-op: implementation in subclass)."""
    return existing_val

  def Parse(self, namespace, existing_message):
    """Parses update flags from namespace and returns updated message field.

    Args:
      namespace: The parsed command line argument namespace.
      existing_message: Apitools message that exists for given resource.

    Returns:
      Modified existing apitools message field.
    """
    result = self.GetFieldValueFromMessage(existing_message)
    set_value, clear_value, remove_value, update_value = (
        self.GetArgFromNamespace(namespace, self.set_arg),
        self.GetArgFromNamespace(namespace, self.clear_arg),
        self.GetArgFromNamespace(namespace, self.remove_arg),
        self.GetArgFromNamespace(namespace, self.update_arg),
    )

    # Whether or not the flags are mutually exclusive are determined by the
    # ArgumentGroup generated. We do not want to duplicate the mutex logic
    # so instead we consistently apply all flags in same order, first by
    # removing and then adding values.

    # Remove values
    result = self.ApplyClearFlag(result, clear_value)
    result = self.ApplyRemoveFlag(result, remove_value)

    # Add values
    result = self.ApplySetFlag(result, set_value)
    result = self.ApplyUpdateFlag(result, update_value)

    return result


class UpdateBasicArgumentGenerator(UpdateArgumentGenerator):
  """Update flag generator for simple flags."""

  @classmethod
  def FromArgData(cls, arg_data, field):
    """Creates a flag generator from yaml arg data and request message.

    Args:
      arg_data: yaml_arg_schema.Argument, data about flag being generated
      field: messages.Field, apitools field instance.

    Returns:
      UpdateArgumentGenerator, the correct version of flag generator
    """
    flag_type, action = arg_utils.GenerateFlagType(field, arg_data)

    is_repeated = (
        field.repeated if arg_data.repeated is None else arg_data.repeated
    )
    field_type = arg_utils.GetFieldType(field)

    if field_type == arg_utils.FieldType.MAP:
      gen_cls = UpdateMapArgumentGenerator
    elif is_repeated:
      gen_cls = UpdateListArgumentGenerator
    else:
      gen_cls = UpdateDefaultArgumentGenerator

    return gen_cls(
        arg_name=arg_data.arg_name,
        flag_type=flag_type,
        field=field,
        action=action,
        is_hidden=arg_data.hidden,
        help_text=arg_data.help_text,
        api_field=arg_data.api_field,
        repeated=arg_data.repeated,
        processor=arg_data.processor,
        choices=arg_data.choices,
        metavar=arg_data.metavar,
    )

  def __init__(
      self,
      arg_name,
      flag_type=None,
      field=None,
      action=None,
      is_hidden=False,
      help_text=None,
      api_field=None,
      repeated=False,
      processor=None,
      choices=None,
      metavar=None,
  ):
    super(UpdateBasicArgumentGenerator, self).__init__()
    self.arg_name = format_util.NormalizeFormat(arg_name)
    self.field = field
    self.flag_type = flag_type
    self.action = action
    self.is_hidden = is_hidden
    self.help_text = help_text
    self.api_field = api_field
    self.repeated = repeated
    self.processor = processor
    self.choices = choices
    self.metavar = metavar

  def GetArgFromNamespace(self, namespace, arg):
    if arg is None:
      return None
    return arg_utils.GetFromNamespace(namespace, arg.name)

  def GetFieldValueFromMessage(self, existing_message):
    """Retrieves existing field from message."""
    if existing_message:
      existing_value = arg_utils.GetFieldValueFromMessage(
          existing_message, self.api_field
      )
    else:
      existing_value = None

    if isinstance(existing_value, list):
      existing_value = existing_value.copy()
    return existing_value

  def _CreateBasicFlag(self, **kwargs):
    return self._CreateFlag(arg_name=self.arg_name, **kwargs)


class UpdateDefaultArgumentGenerator(UpdateBasicArgumentGenerator):
  """Update flag generator for simple values."""

  @property
  def _empty_value(self):
    return None

  @property
  def set_arg(self):
    return self._CreateBasicFlag(
        flag_type=_ConvertValueType(self),
        action=self.action,
        metavar=self.metavar,
        help_text='Set {} to new value.'.format(self.arg_name),
    )

  @property
  def clear_arg(self):
    return self._CreateBasicFlag(
        flag_prefix=Prefix.CLEAR,
        action='store_true',
        help_text='Clear {} value and set to {}.'.format(
            self.arg_name, self._GetTextFormatOfEmptyValue(self._empty_value)),
    )

  def ApplySetFlag(self, existing_val, set_val):
    if set_val is not None:
      return set_val
    return existing_val

  def ApplyClearFlag(self, existing_val, clear_flag):
    if clear_flag:
      return self._empty_value
    return existing_val


class UpdateListArgumentGenerator(UpdateBasicArgumentGenerator):
  """Update flag generator for list."""

  @property
  def _empty_value(self):
    return []

  @property
  def set_arg(self):
    return self._CreateBasicFlag(
        flag_type=_ConvertValueType(self),
        action=self.action,
        metavar=self.metavar,
        help_text='Set {} to new value.'.format(self.arg_name),
    )

  @property
  def clear_arg(self):
    return self._CreateBasicFlag(
        flag_prefix=Prefix.CLEAR,
        action='store_true',
        help_text='Clear {} value and set to {}.'.format(
            self.arg_name, self._GetTextFormatOfEmptyValue(self._empty_value)),
    )

  @property
  def update_arg(self):
    return self._CreateBasicFlag(
        flag_prefix=Prefix.ADD,
        flag_type=_ConvertValueType(self),
        action=self.action,
        help_text='Add new value to {} list.'.format(self.arg_name),
    )

  @property
  def remove_arg(self):
    return self._CreateBasicFlag(
        flag_prefix=Prefix.REMOVE,
        flag_type=_ConvertValueType(self),
        action=self.action,
        help_text='Remove existing value from {} list.'.format(self.arg_name),
    )

  def _ContainsVal(self, new_val, all_vals):
    if isinstance(self.flag_type, util.EquitableType):
      return any(
          self.flag_type.Matches(new_val, val) for val in all_vals)
    else:
      return new_val in all_vals

  def ApplySetFlag(self, existing_val, set_val):
    if set_val is not None:
      return set_val
    return existing_val

  def ApplyClearFlag(self, existing_val, clear_flag):
    if clear_flag:
      return self._empty_value
    return existing_val

  def ApplyRemoveFlag(self, existing_val, remove_val):
    if remove_val is not None:
      return [
          x for x in existing_val if not self._ContainsVal(x, remove_val)]
    return existing_val

  def ApplyUpdateFlag(self, existing_val, update_val):
    if update_val is not None:
      new_vals = [
          x for x in update_val if not self._ContainsVal(x, existing_val)]
      return existing_val + new_vals
    return existing_val


class UpdateMapArgumentGenerator(UpdateBasicArgumentGenerator):
  """Update flag generator for key-value pairs ie proto map fields."""

  @property
  def _empty_value(self):
    return {}

  @property
  def _is_list_field(self):
    return self.field.name == arg_utils.ADDITIONAL_PROPS

  def _WrapOutput(self, output_list):
    """Wraps field AdditionalProperties in apitools message if needed.

    Args:
      output_list: list of apitools AdditionalProperties messages.

    Returns:
      apitools message instance.
    """
    if self._is_list_field:
      return output_list
    message = self.field.type()
    arg_utils.SetFieldInMessage(
        message, arg_utils.ADDITIONAL_PROPS, output_list)
    return message

  def _GetPropsFieldValue(self, field):
    """Retrieves AdditionalProperties field value.

    Args:
      field: apitools instance that contains AdditionalProperties field

    Returns:
      list of apitools AdditionalProperties messages.
    """
    if not field:
      return []
    if self._is_list_field:
      return field
    return arg_utils.GetFieldValueFromMessage(field, arg_utils.ADDITIONAL_PROPS)

  @property
  def set_arg(self):
    return self._CreateBasicFlag(
        flag_type=_ConvertValueType(self),
        action=self.action,
        metavar=self.metavar,
        help_text='Set {} to new value.'.format(self.arg_name),
    )

  @property
  def clear_arg(self):
    return self._CreateBasicFlag(
        flag_prefix=Prefix.CLEAR,
        action='store_true',
        help_text='Clear {} value and set to {}.'.format(
            self.arg_name, self._GetTextFormatOfEmptyValue(self._empty_value)),
    )

  @property
  def update_arg(self):
    return self._CreateBasicFlag(
        flag_prefix=Prefix.UPDATE,
        flag_type=_ConvertValueType(self),
        action=self.action,
        help_text='Update {} value or add key value pair.'.format(
            self.arg_name
        ),
    )

  @property
  def remove_arg(self):
    if self._is_list_field:
      field = self.field
    else:
      field = arg_utils.GetFieldFromMessage(
          self.field.type, arg_utils.ADDITIONAL_PROPS
      )

    key_field = arg_utils.GetFieldFromMessage(field.type, 'key')
    key_type = key_field.type or arg_utils.TYPES.get(key_field.variant)
    key_list = arg_parsers.ArgObject(
        value_type=key_type, repeated=True)

    return self._CreateBasicFlag(
        flag_prefix=Prefix.REMOVE,
        flag_type=key_list,
        action='store',
        help_text='Remove existing value from map {}.'.format(self.arg_name),
    )

  def ApplySetFlag(self, existing_val, set_val):
    if set_val is not None:
      return set_val
    return existing_val

  def ApplyClearFlag(self, existing_val, clear_flag):
    if clear_flag:
      return self._WrapOutput([])
    return existing_val

  def ApplyUpdateFlag(self, existing_val, update_val):
    if update_val is not None:
      output_list = self._GetPropsFieldValue(existing_val)
      update_val_list = self._GetPropsFieldValue(update_val)
      update_key_set = set([x.key for x in update_val_list])
      deduped_list = [x for x in output_list if x.key not in update_key_set]
      return self._WrapOutput(deduped_list + update_val_list)
    return existing_val

  def ApplyRemoveFlag(self, existing_val, remove_val):
    if remove_val is not None:
      output_list = self._GetPropsFieldValue(existing_val)
      remove_val_set = set(remove_val)
      return self._WrapOutput(
          [x for x in output_list if x.key not in remove_val_set])
    return existing_val