HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/command_lib/util/apis/update_resource_args.py
# -*- coding: utf-8 -*- #
# Copyright 2023 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Utilities for creating/parsing update resource argument groups."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

from googlecloudsdk.calliope import base
from googlecloudsdk.calliope.concepts import util as format_util
from googlecloudsdk.command_lib.util.apis import arg_utils
from googlecloudsdk.command_lib.util.apis import update_args
from googlecloudsdk.command_lib.util.apis import yaml_command_schema_util as util
from googlecloudsdk.command_lib.util.concepts import concept_parsers
from googlecloudsdk.core import resources


# TODO(b/280653078) The UX is still under review. These utilities are
# liable to change and should not be used in new surface yet.

# TODO(b/283949482): Place this file in util/args and replace the duplicate
# logic in the util files.


def _GetRelativeNameField(arg_data):
  """Gets message field where the resource's relative name is mapped."""
  api_fields = [
      key
      for key, value in arg_data.resource_method_params.items()
      if util.REL_NAME_FORMAT_KEY in value
  ]
  if not api_fields:
    return None

  return api_fields[0]


def _GetSharedAttributeFlags(arg_data, shared_resource_flags):
  """Gets a list of all shared resource attributes."""
  ignored_flags = set()
  anchor_names = set(attr.name for attr in arg_data.anchors)

  for attr_name, flag_name in arg_data.attribute_to_flag_map.items():
    if (flag_name in arg_data.ignored_flags or
        flag_name in shared_resource_flags or
        attr_name in anchor_names):
      continue
    ignored_flags.add(flag_name)

  return list(ignored_flags)


def _GetResourceArgGenerator(
    arg_data, resource_collection, ignored_flags):
  """Gets a function to generate a resource arg."""
  def ArgGen(name, group_help, flag_name_override):
    group_help += '\n\n'
    if arg_data.group_help:
      group_help += arg_data.group_help

    return arg_data.GenerateResourceArg(
        resource_collection,
        presentation_flag_name=name,
        flag_name_override=flag_name_override,
        shared_resource_flags=ignored_flags,
        group_help=group_help)
  return ArgGen


def _GenerateSharedFlags(
    arg_data, resource_collection, shared_flag_names):
  """Generates a list of flags needed to generate more than one resource arg."""
  arg_gen = _GetResourceArgGenerator(
      arg_data, resource_collection, None)

  # Generate a fake resource arg where none of the flags are filtered out.
  resource_arg_info = arg_gen(
      '--current', '', None).GetInfo('--current')

  return [
      arg for arg in resource_arg_info.GetAttributeArgs()
      if arg.name in shared_flag_names
  ]


class UpdateResourceArgumentGenerator(update_args.UpdateArgumentGenerator):
  """Update flag generator for resource args."""

  @classmethod
  def FromArgData(
      cls, arg_data, method_resource_collection, is_list_method=False,
      shared_resource_flags=None
  ):
    if arg_data.multitype and arg_data.repeated:
      gen_cls = UpdateMultitypeListResourceArgumentGenerator
    elif arg_data.multitype:
      gen_cls = UpdateMultitypeResourceArgumentGenerator
    elif arg_data.repeated:
      gen_cls = UpdateListResourceArgumentGenerator
    else:
      gen_cls = UpdateDefaultResourceArgumentGenerator

    presentation_flag_name = arg_data.GetPresentationFlagName(
        method_resource_collection, is_list_method)
    is_primary = arg_data.IsPrimaryResource(method_resource_collection)
    if is_primary:
      raise util.InvalidSchemaError(
          '{} is a primary resource. Primary resources are required and '
          'cannot be listed as clearable.'.format(presentation_flag_name)
      )

    api_field = _GetRelativeNameField(arg_data)
    if not api_field:
      raise util.InvalidSchemaError(
          '{} does not specify the message field where the relative name is '
          'mapped in resource_method_params. Message field name is needed '
          'in order add update args. Please update '
          'resource_method_params.'.format(presentation_flag_name)
      )

    shared_flags = shared_resource_flags or []
    shared_attribute_flags = _GetSharedAttributeFlags(
        arg_data, shared_flags)
    # All of the flags the resource arg should not generate
    all_shared_flags = shared_attribute_flags + shared_flags

    return gen_cls(
        presentation_flag_name=presentation_flag_name,
        arg_gen=_GetResourceArgGenerator(
            arg_data, method_resource_collection, all_shared_flags),
        api_field=api_field,
        repeated=arg_data.repeated,
        collections=arg_data.collections,
        is_primary=is_primary,
        # attributes shared between update args we need to generate and
        # add to the root.
        shared_attribute_flags=_GenerateSharedFlags(
            arg_data, method_resource_collection, shared_attribute_flags),
        anchor_names=[attr.name for attr in arg_data.anchors],
    )

  def __init__(
      self,
      presentation_flag_name,
      arg_gen=None,
      is_hidden=False,
      api_field=None,
      repeated=False,
      collections=None,
      is_primary=None,
      shared_attribute_flags=None,
      anchor_names=None,
  ):
    super(UpdateResourceArgumentGenerator, self).__init__()
    self.arg_name = format_util.NormalizeFormat(
        presentation_flag_name)
    self.arg_gen = arg_gen
    self.is_hidden = is_hidden
    self.api_field = api_field
    self.repeated = repeated
    self.collections = collections or []
    self.is_primary = is_primary
    self.shared_attribute_flags = shared_attribute_flags or []
    self.anchor_names = anchor_names or []

  def _GetAnchorFlag(self, attr_name, flag_prefix_value):
    if len(self.anchor_names) > 1:
      base_name = attr_name
    else:
      base_name = self.arg_name
    return arg_utils.GetFlagName(base_name, flag_prefix=flag_prefix_value)

  def _CreateResourceFlag(self, flag_prefix=None, group_help=None):
    prefix = flag_prefix and flag_prefix.value
    flag_name = arg_utils.GetFlagName(
        self.arg_name,
        flag_prefix=prefix)

    flag_name_override = {
        anchor_name: self._GetAnchorFlag(anchor_name, prefix)
        for anchor_name in self.anchor_names
    }

    return self.arg_gen(
        flag_name, group_help=group_help, flag_name_override=flag_name_override)

  def _RelativeName(self, value):
    resource = None
    for collection in self.collections:
      try:
        resource = resources.REGISTRY.ParseRelativeName(
            value,
            collection.full_name,
            api_version=collection.api_version)
      except resources.Error:
        continue

    return resource

  def GetArgFromNamespace(self, namespace, arg):
    """Retrieves namespace value associated with flag.

    Args:
      namespace: The parsed command line argument namespace.
      arg: base.Argument|concept_parsers.ConceptParser|None, used to get
        namespace value

    Returns:
      value parsed from namespace
    """
    if isinstance(arg, base.Argument):
      return arg_utils.GetFromNamespace(namespace, arg.name)

    if isinstance(arg, concept_parsers.ConceptParser):
      all_anchors = list(arg.specs.keys())
      if len(all_anchors) != 1:
        raise ValueError(
            'ConceptParser must contain exactly one spec for clearable '
            'but found specs {}. {} cannot parse the namespace value if more '
            'than or less than one spec is added to the '
            'ConceptParser.'.format(all_anchors, type(self).__name__))
      name = all_anchors[0]
      value = arg_utils.GetFromNamespace(namespace.CONCEPTS, name)
      if value:
        value = value.Parse()
      return value

    return None

  def GetFieldValueFromMessage(self, existing_message):
    value = arg_utils.GetFieldValueFromMessage(existing_message, self.api_field)
    if not value:
      return None

    if isinstance(value, list):
      relative_names = (self._RelativeName(v) for v in value)
      return [name for name in relative_names if name]
    else:
      return self._RelativeName(value)

  def Generate(self):
    return super(UpdateResourceArgumentGenerator, self).Generate(
        self.shared_attribute_flags)


class UpdateDefaultResourceArgumentGenerator(UpdateResourceArgumentGenerator):
  """Update flag generator for resource args."""

  @property
  def _empty_value(self):
    return None

  @property
  def set_arg(self):
    return self._CreateResourceFlag(
        group_help='Set {} to new value.'.format(self.arg_name))

  @property
  def clear_arg(self):
    return self._CreateFlag(
        self.arg_name,
        flag_prefix=update_args.Prefix.CLEAR,
        action='store_true',
        help_text=(
            f'Clear {self.arg_name} value and set '
            f'to {self._GetTextFormatOfEmptyValue(self._empty_value)}.'),
    )

  def ApplySetFlag(self, output, set_val):
    if set_val:
      return set_val
    return output

  def ApplyClearFlag(self, output, clear_flag):
    if clear_flag:
      return self._empty_value
    return output


class UpdateMultitypeResourceArgumentGenerator(UpdateResourceArgumentGenerator):
  """Update flag generator for multitype resource args."""

  @property
  def _empty_value(self):
    return None

  @property
  def set_arg(self):
    return self._CreateResourceFlag(
        group_help='Set {} to new value.'.format(self.arg_name))

  @property
  def clear_arg(self):
    return self._CreateFlag(
        self.arg_name,
        flag_prefix=update_args.Prefix.CLEAR,
        action='store_true',
        help_text=(
            f'Clear {self.arg_name} value and set '
            f'to {self._GetTextFormatOfEmptyValue(self._empty_value)}.'),
    )

  def ApplySetFlag(self, output, set_val):
    if result := (set_val and set_val.result):
      return result
    else:
      return output

  def ApplyClearFlag(self, output, clear_flag):
    if clear_flag:
      return self._empty_value
    else:
      return output


class UpdateListResourceArgumentGenerator(UpdateResourceArgumentGenerator):
  """Update flag generator for list resource args."""

  @property
  def _empty_value(self):
    return []

  @property
  def set_arg(self):
    return self._CreateResourceFlag(
        group_help=f'Set {self.arg_name} to new value.')

  @property
  def clear_arg(self):
    return self._CreateFlag(
        self.arg_name,
        flag_prefix=update_args.Prefix.CLEAR,
        action='store_true',
        help_text=(
            f'Clear {self.arg_name} value and set '
            f'to {self._GetTextFormatOfEmptyValue(self._empty_value)}.'),
    )

  @property
  def update_arg(self):
    return self._CreateResourceFlag(
        flag_prefix=update_args.Prefix.ADD,
        group_help=f'Add new value to {self.arg_name} list.')

  @property
  def remove_arg(self):
    return self._CreateResourceFlag(
        flag_prefix=update_args.Prefix.REMOVE,
        group_help=f'Remove value from {self.arg_name} list.')

  def ApplySetFlag(self, output, set_val):
    if set_val:
      return set_val
    return output

  def ApplyClearFlag(self, output, clear_flag):
    if clear_flag:
      return self._empty_value
    return output

  def ApplyRemoveFlag(self, existing_val, remove_val):
    value = existing_val or self._empty_value
    if remove_val:
      return [x for x in value if x not in remove_val]
    else:
      return value

  def ApplyUpdateFlag(self, existing_val, update_val):
    value = existing_val or self._empty_value
    if update_val:
      return existing_val + [x for x in update_val if x not in value]
    else:
      return value


class UpdateMultitypeListResourceArgumentGenerator(
    UpdateResourceArgumentGenerator):
  """Update flag generator for multitype list resource args."""

  @property
  def _empty_value(self):
    return []

  @property
  def set_arg(self):
    return self._CreateResourceFlag(
        group_help=f'Set {self.arg_name} to new value.')

  @property
  def clear_arg(self):
    return self._CreateFlag(
        self.arg_name,
        flag_prefix=update_args.Prefix.CLEAR,
        action='store_true',
        help_text=(
            f'Clear {self.arg_name} value and set '
            f'to {self._GetTextFormatOfEmptyValue(self._empty_value)}.'),
    )

  @property
  def update_arg(self):
    return self._CreateResourceFlag(
        flag_prefix=update_args.Prefix.ADD,
        group_help=f'Add new value to {self.arg_name} list.')

  @property
  def remove_arg(self):
    return self._CreateResourceFlag(
        flag_prefix=update_args.Prefix.REMOVE,
        group_help=f'Remove value from {self.arg_name} list.')

  def ApplySetFlag(self, output, set_val):
    resource_list = [val.result for val in set_val if val.result]
    if resource_list:
      return resource_list
    else:
      return output

  def ApplyClearFlag(self, output, clear_flag):
    if clear_flag:
      return self._empty_value
    else:
      return output

  def ApplyRemoveFlag(self, existing_val, remove_val):
    value = existing_val or self._empty_value
    if remove_resources := set(val.result for val in remove_val if val.result):
      return [x for x in value if x not in remove_resources]
    else:
      return value

  def ApplyUpdateFlag(self, existing_val, update_val):
    value = existing_val or self._empty_value
    if update_val:
      return value + [
          x.result for x in update_val if x.result not in value]
    else:
      return value