File: //snap/google-cloud-cli/current/lib/googlecloudsdk/calliope/parser_extensions.py
# -*- coding: utf-8 -*- #
# Copyright 2013 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Calliope argparse intercepts and extensions.
Calliope uses the argparse module for command line argument definition and
parsing. It intercepts some argparse methods to provide enhanced runtime help
document generation, command line usage help, error handling and argument group
conflict analysis.
The parser and intercepts are in these modules:
  parser_extensions (this module)
    Extends and intercepts argparse.ArgumentParser and the parser args
    namespace to support Command.Run() method access to info added in the
    Command.Args() method.
  parser_arguments
    Intercepts the basic argument objects and collects data for command flag
    metrics reporting.
  parser_errors
    Error/exception classes for all Calliope arg parse errors. Errors derived
    from ArgumentError have a payload used for metrics reporting.
Intercepted argument definitions for a command and all its ancestor command
groups are kept in a tree of ArgumentInterceptor nodes. Inner nodes have
is_group==True and an arguments list of child nodes. Leaf nodes have
is_group==False. ArgumentInterceptor keeps track of the arguments and flags
specified on the command line in a set that is queried to verify the specified
arguments against their definitions. For example, that a required argument has
been specified, or that at most one flag in a mutually exclusive group has been
specified.
The collected info is also used to generate help markdown documents. The
markdown is annotated with extra text that collates and describes argument
attributes and groupings. For example, mutually exclusive, required, and nested
groups.
The intercepted args namespace object passed to the Command.Run() method adds
methods to access/modify info collected during the parse.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals
import abc
import argparse
import collections
import io
import itertools
import os
import re
import sys
from googlecloudsdk.calliope import arg_parsers
from googlecloudsdk.calliope import base  # pylint: disable=unused-import
from googlecloudsdk.calliope import parser_arguments
from googlecloudsdk.calliope import parser_errors
from googlecloudsdk.calliope import suggest_commands
from googlecloudsdk.calliope import usage_text
from googlecloudsdk.core import argv_utils
from googlecloudsdk.core import config
from googlecloudsdk.core import log
from googlecloudsdk.core import metrics
from googlecloudsdk.core.console import console_attr
from googlecloudsdk.core.console import console_io
import six
_HELP_SEARCH_HINT = """\
To search the help text of gcloud commands, run:
  gcloud help -- SEARCH_TERMS"""
class Namespace(argparse.Namespace):
  """A custom subclass for parsed args.
  Attributes:
    _deepest_parser: ArgumentParser, The deepest parser for the last command
      part.
    _parsers: ArgumentParser, The list of all parsers for the command.
    _specified_args: {dest: arg-name}, A map of dest names for known args
      specified on the command line to arg names that have been scrubbed for
      metrics. This dict accumulate across all subparsers.
  """
  def __init__(self, **kwargs):
    self._deepest_parser = None
    self._parsers = []
    self._specified_args = {}
    super(Namespace, self).__init__(**kwargs)
  def _SetParser(self, parser):
    """Sets the parser for the first part of the command."""
    self._deepest_parser = parser
  def _GetParser(self):
    """Returns the deepest parser for the command."""
    return self._deepest_parser
  def _GetCommand(self):
    """Returns the command for the deepest parser."""
    # pylint: disable=protected-access
    return self._GetParser()._calliope_command
  def _Execute(self, command, call_arg_complete=False):
    """Executes command in the current CLI.
    Args:
      command: A list of command args to execute.
      call_arg_complete: Enable arg completion if True.
    Returns:
      Returns the list of resources from the command.
    """
    call_arg_complete = False
    # pylint: disable=protected-access
    return self._GetCommand()._cli_generator.Generate().Execute(
        command, call_arg_complete=call_arg_complete)
  def GetDisplayInfo(self):
    """Returns the parser display_info."""
    # pylint: disable=protected-access
    return self._GetCommand().ai.display_info
  @property
  def CONCEPTS(self):  # pylint: disable=invalid-name
    """The holder for concepts v1 arguments."""
    handler = self._GetCommand().ai.concept_handler
    if handler is None:
      return handler
    handler.parsed_args = self
    return handler
  @property
  def CONCEPT_ARGS(self):  # pylint: disable=invalid-name
    """The holder for concepts v2 arguments."""
    handler = self._GetCommand().ai.concepts
    if handler is None:
      return handler
    handler.parsed_args = self
    return handler
  def GetSpecifiedArgNames(self):
    """Returns the scrubbed names for args specified on the command line.
    For example,
      `$ {command} positional_value --foo=bar, --lorem-ipsum=hello`
    returns ['POSITIONAL_NAME', '--foo', '--lorem-ipsum'].
    """
    return sorted(self._specified_args.values())
  def GetSpecifiedArgs(self):
    """Gets the argument names and values that were actually specified.
    For example,
      `$ {command} positional_value --foo=bar, --lorem-ipsum=1,2,3`
    returns
      {
        'POSITIONAL_NAME': 'positional_value'
        '--foo': 'bar',
        '--lorem-ipsum': [1,2,3],
      }
    In the returned dictionary, the keys are the specified arguments, including
    positional argument names and flag names, in string type; the corresponding
    values are the user-specified flag values, converted according to the type
    defined by each flag.
    Returns:
      {str: any}, A mapping of argument name to value.
    """
    return {
        name: getattr(self, dest, 'UNKNOWN')
        for dest, name in six.iteritems(self._specified_args)
    }
  def GetSpecifiedArgsDict(self):
    """Returns the _specified_args dictionary.
    For example,
      $ {command} positional_value --foo=bar, --lorem-ipsum=hello --async,
    returns
      {
        'positional_name', 'POSITIONAL_NAME'
        'foo': '--foo',
        'lorem_ipsum': '--lorem-ipsum',
        'async_': '--async',
      }.
    In the returned dictionary, the keys are destinations in the argparse
    namespace object.
    In the above example, the destination of `--async` is set to 'async_' in its
    flag definition, other flags use underscore separated flag names as their
    default destinations.
    """
    return self._specified_args
  def IsSpecified(self, dest):
    """Returns True if args.dest was specified on the command line.
    Args:
      dest: str, The dest name for the arg to check.
    Raises:
      UnknownDestinationException: If there is no registered arg for dest.
    Returns:
      True if args.dest was specified on the command line.
    """
    if not hasattr(self, dest):
      raise parser_errors.UnknownDestinationException(
          'No registered arg for destination [{}].'.format(dest))
    return dest in self._specified_args
  def IsKnownAndSpecified(self, dest):
    """Returns True if dest is a known and args.dest was specified.
    Args:
      dest: str, The dest name for the arg to check.
    Returns:
      True if args.dest is a known argument was specified on the command line.
    """
    return hasattr(self, dest) and (dest in self._specified_args)
  def GetFlagArgument(self, name):
    """Returns the flag argument object for name.
    Args:
      name: The flag name or Namespace destination.
    Raises:
      UnknownDestinationException: If there is no registered flag arg for name.
    Returns:
      The flag argument object for name.
    """
    if name.startswith('--'):
      dest = name[2:].replace('-', '_')
      flag = name
    else:
      dest = name
      flag = '--' + name.replace('_', '-')
    ai = self._GetCommand().ai
    for arg in ai.flag_args + ai.ancestor_flag_args:
      if (dest == arg.dest or
          arg.option_strings and flag == arg.option_strings[0]):
        return arg
    raise parser_errors.UnknownDestinationException(
        'No registered flag arg for [{}].'.format(name))
  def GetPositionalArgument(self, name):
    """Returns the positional argument object for name.
    Args:
      name: The Namespace metavar or destination.
    Raises:
      UnknownDestinationException: If there is no registered positional arg
        for name.
    Returns:
      The positional argument object for name.
    """
    dest = name.replace('-', '_').lower()
    meta = name.replace('-', '_').upper()
    for arg in self._GetCommand().ai.positional_args:
      if isinstance(arg, type):
        continue
      if dest == arg.dest or meta == arg.metavar:
        return arg
    raise parser_errors.UnknownDestinationException(
        'No registered positional arg for [{}].'.format(name))
  def GetFlag(self, dest):
    """Returns the flag name registered to dest or None is dest is a positional.
    Args:
      dest: The dest of a registered argument.
    Raises:
      UnknownDestinationException: If no arg is registered for dest.
    Returns:
      The flag name registered to dest or None if dest is a positional.
    """
    arg = self.GetFlagArgument(dest)
    return arg.option_strings[0] if arg.option_strings else None
  def GetValue(self, dest):
    """Returns the value of the argument registered for dest.
    Args:
      dest: The dest of a registered argument.
    Raises:
      UnknownDestinationException: If no arg is registered for dest.
    Returns:
      The value of the argument registered for dest.
    """
    try:
      return getattr(self, dest)
    except AttributeError:
      raise parser_errors.UnknownDestinationException(
          'No registered arg for destination [{}].'.format(dest))
  def MakeGetOrRaise(self, flag_name):
    """Returns a function to get given flag value or raise if it is not set.
    This is useful when given flag becomes required when another flag
    is present.
    Args:
      flag_name: str, The flag_name name for the arg to check.
    Raises:
      parser_errors.RequiredError: if flag is not specified.
      UnknownDestinationException: If there is no registered arg for flag_name.
    Returns:
      Function for accessing given flag value.
    """
    def _Func():
      flag = flag_name[2:] if flag_name.startswith('--') else flag_name
      flag_value = getattr(self, flag)
      if flag_value is None and not self.IsSpecified(flag):
        raise parser_errors.RequiredError(argument=flag_name)
      return flag_value
    return _Func
class _ErrorContext(object):
  """Context from the most recent ArgumentParser.error() call.
  The context can be saved and used to reproduce the error() method call later
  in the execution.  Used to probe argparse errors for different argument
  combinations.
  Attributes:
    message: The error message string.
    parser: The parser where the error occurred.
    error: The exception error value.
  """
  def __init__(self, message, parser, error):
    self.message = re.sub(r"\bu'", "'", message)
    self.parser = parser
    self.error = error
    self.flags_locations = parser.flags_locations
  def AddLocations(self, arg):
    """Adds locaton info from context for arg if specified."""
    locations = self.flags_locations.get(arg)
    if locations:
      arg = '{} ({})'.format(arg, ','.join(sorted(locations)))
    return arg
class _HandleLaterError(Exception):
  """Error to be handled in a subsequent call to self.error.
  This error exists to provide a way to break out of self.error so that we can
  deduce a better error later; it will always be caught in parser_extensions and
  never surfaced as a user-facing error (at least in theory; if that does happen
  then it's a bug.)
  """
  pass
class DryRunError(Exception):
  """Error to pack for dry run cases without causing system exit."""
  pass
class ArgumentParser(argparse.ArgumentParser):
  """A custom subclass for arg parsing behavior.
  This overrides the default argparse parser.
  Attributes:
    _args: Original argv passed to argparse.
    _calliope_command: base._Command, The Calliope command or group for this
      parser.
    _error_context: The most recent self.error() method _ErrorContext.
    _is_group: bool, True if _calliope_command is a group.
    _probe_error: bool, True when parse_known_args() is probing argparse errors
      captured in the self.error() method.
    _remainder_action: action, The argument action for a -- ... remainder
      argument, added by AddRemainderArgument.
    _specified_args: {dest: arg-name}, A map of dest names for known args
      specified on the command line to arg names that have been scrubbed for
      metrics. This value is initialized and propagated to the deepest parser
      namespace in parse_known_args() from specified args collected in
      _get_values().
      raise_error: This is set to true if we want to raise an exception for
      errors.
  """
  _args = None
  def __init__(self, *args, **kwargs):
    self._calliope_command = kwargs.pop('calliope_command')
    # Would rather isinstance(self._calliope_command, CommandGroup) here but
    # that would introduce a circular dependency on calliope.backend.
    self._is_group = hasattr(self._calliope_command, 'commands')
    self._remainder_action = None
    self._specified_args = {}
    self._error_context = None
    self._probe_error = False
    self.flags_locations = collections.defaultdict(set)
    self.raise_error = False
    super(ArgumentParser, self).__init__(*args, **kwargs)
  def _Error(self, error):
    # self.error() wraps the standard argparse error() method.
    self.error(context=_ErrorContext(console_attr.SafeText(error), self, error))
  def AddRemainderArgument(self, *args, **kwargs):
    """Add an argument representing '--' followed by anything.
    This argument is bound to the parser, so the parser can use its helper
    methods to parse.
    Args:
      *args: The arguments for the action.
      **kwargs: They keyword arguments for the action.
    Raises:
      ArgumentException: If there already is a Remainder Action bound to this
      parser.
    Returns:
      The created action.
    """
    if self._remainder_action:
      self._Error(parser_errors.ArgumentException(
          'There can only be one pass through argument.'))
    kwargs['action'] = arg_parsers.RemainderAction
    # pylint:disable=protected-access
    self._remainder_action = self.add_argument(*args, **kwargs)
    return self._remainder_action
  def GetSpecifiedArgNames(self):
    """Returns the scrubbed names for args specified on the command line."""
    return sorted(self._specified_args.values())
  def _AddLocations(self, arg, value=None):
    """Adds file and line info from context for arg if specified."""
    if value and '=' not in arg:
      argval = '{}={}'.format(arg, value)
    else:
      argval = arg
    locations = self.flags_locations.get(argval)
    if locations:
      arg = '{} ({})'.format(argval, ','.join(sorted(locations)))
    return arg
  def _Suggest(self, unknown_args):
    """Error out with a suggestion based on text distance for each unknown."""
    messages = []
    suggester = usage_text.TextChoiceSuggester()
    # pylint:disable=protected-access, This is an instance of this class.
    for flag in self._calliope_command.GetAllAvailableFlags(
        include_hidden=False):
      options = flag.option_strings
      if options:
        # This is a flag, add all its names as choices.
        suggester.AddChoices(options)
        # Add any aliases as choices as well, but suggest the primary name.
        aliases = getattr(flag, 'suggestion_aliases', None)
        if aliases:
          suggester.AddAliases(aliases, options[0])
    suggestions = {}
    for arg in unknown_args:
      # Only do this for flag names.
      if not isinstance(arg, six.string_types):
        continue
      # Strip the flag value if any from the suggestion.
      flag = arg.split('=')[0]
      value = arg.split('=')[1] if len(arg.split('=')) > 1 else None
      if flag.startswith('--') or value:
        suggestion = suggester.GetSuggestion(flag)
        arg = self._AddLocations(arg)
      else:
        suggestion = None
      if arg in messages:
        continue
      if self._ExistingFlagAlternativeReleaseTracks(flag):
        existing_alternatives = self._ExistingFlagAlternativeReleaseTracks(flag)
        messages.append('\n {} flag is available in one or more alternate '
                        'release tracks. Try:\n'.format(flag))
        messages.append('\n  '.join(existing_alternatives) +'\n')
      if suggestion:
        suggestions[arg] = suggestion
        messages.append(arg + " (did you mean '{0}'?)".format(suggestion))
      else:
        messages.append(arg)
    # If there is a single arg, put it on the same line.  If there are multiple
    # add each on its own line for better clarity.
    if len(messages) > 1:
      separator, prefix = '\n  ', ''
    else:
      separator, prefix = ' ', '\n\n'
    # Always add a final message suggesting gcloud help. Set off with new line
    # if this will be the only new line.
    messages.append('{}{}'.format(prefix, _HELP_SEARCH_HINT))
    self._Error(parser_errors.UnrecognizedArgumentsError(
        'unrecognized arguments:{0}{1}'.format(
            separator, separator.join(messages)),
        parser=self,
        total_unrecognized=len(unknown_args),
        total_suggestions=len(suggestions),
        suggestions=suggestions))
  def _SetErrorContext(self, context):
    """Sets the current error context to context -- called by self.error()."""
    self._error_context = context
  def _ParseKnownArgs(self, args, namespace, wrapper=True):
    """Calls parse_known_args() and adds error_context to the return.
    Args:
      args: The list of command line args.
      namespace: The parsed args namespace.
      wrapper: Calls the parse_known_args() wrapper if True, otherwise the
        wrapped argparse parse_known_args().
    Returns:
      namespace: The parsed arg namespace.
      unknown_args: The list of unknown args.
      error_context: The _ErrorContext if there was an error, None otherwise.
    """
    self._error_context = None
    parser = self if wrapper else super(ArgumentParser, self)
    try:
      namespace, unknown_args = parser.parse_known_args(args, namespace)
    except _HandleLaterError:
      unknown_args = []
    error_context = self._error_context
    self._error_context = None
    if not unknown_args and hasattr(parser, 'flags_locations'):
      parser.flags_locations = collections.defaultdict(set)
    return namespace, unknown_args, error_context
  def _DeduceBetterError(self, context, args, namespace):
    """There is an argparse error in context, see if we can do better.
    We are committed to an argparse error. See if we can do better than the
    observed error in context by isolating each flag arg to determine if the
    argparse error complained about a flag arg value instead of a positional.
    Args:
      context: The _ErrorContext containing the error to improve.
      args: The subset of the command lines args that triggered the argparse
        error in context.
      namespace: The namespace for the current parser.
    """
    self._probe_error = True
    for arg in args:
      try:
        if not arg.startswith('-'):
          break
      except AttributeError:
        break
      _, _, error_context = self._ParseKnownArgs([arg], namespace)
      if not error_context:
        continue
      context = error_context
      break
    self._probe_error = False
    context.error.argument = context.AddLocations(context.error.argument)
    context.parser.error(context=context, reproduce=True)
  @staticmethod
  def GetDestinations(args):
    """Returns the set of 'dest' attributes (or the arg if no dest)."""
    return set([getattr(a, 'dest', a) for a in args])
  # pylint: disable=invalid-name, argparse style
  def validate_specified_args(self, ai, specified_args, namespace,
                              is_required=True, top=True):
    """Validate specified args against the arg group constraints.
    Each group may be mutually exclusive and/or required. Each argument may be
    required.
    Args:
      ai: ArgumentInterceptor, The argument interceptor containing the
        ai.arguments argument group.
      specified_args: set, The dests of the specified args.
      namespace: object, The parsed args namespace.
      is_required: bool, True if all containing groups are required.
      top: bool, True if ai.arguments is the top level group.
    Raises:
      ModalGroupError: If modal arg not specified.
      OptionalMutexError: On optional mutex group conflict.
      RequiredError: If required arg not specified.
      RequiredMutexError: On required mutex group conflict.
    Returns:
      True if the subgroup was specified.
    """
    # TODO(b/120132521) Replace and eliminate argparse extensions
    also_optional = []  # The optional args in group that were not specified.
    have_optional = []  # The specified optional (not required) args.
    have_required = []  # The specified required args.
    need_required = []  # The required args in group that must be specified.
    arguments = (
        sorted(ai.arguments, key=usage_text.GetArgSortKey)
        if ai.sort_args else ai.arguments)
    for arg in arguments:
      if arg.is_group:
        arg_was_specified = self.validate_specified_args(
            arg,
            specified_args,
            namespace,
            is_required=is_required and arg.is_required,
            top=False)
      else:
        arg_was_specified = arg.dest in specified_args
      if arg_was_specified:
        if arg.is_required:
          have_required.append(arg)
        else:
          have_optional.append(arg)
      elif arg.is_required:
        if not isinstance(arg, DynamicPositionalAction):
          need_required.append(arg)
      else:
        also_optional.append(arg)
    if need_required:
      if top or have_required and not (have_optional or also_optional):
        need_args = usage_text.ArgumentWrapper(
            arguments=need_required, is_group=True, sort_args=ai.sort_args)
        self._Error(parser_errors.RequiredError(
            parser=self,
            argument=usage_text.GetArgUsage(
                need_args, value=False, hidden=True, top=top)))
      if have_optional or have_required:
        have_args = usage_text.ArgumentWrapper(
            arguments=have_optional + have_required, is_group=True,
            sort_args=ai.sort_args)
        need_args = usage_text.ArgumentWrapper(
            arguments=need_required, is_group=True,
            sort_args=ai.sort_args)
        self._Error(parser_errors.ModalGroupError(
            parser=self,
            argument=usage_text.GetArgUsage(
                have_args, value=False, hidden=True, top=top),
            conflict=usage_text.GetArgUsage(
                need_args, value=False, hidden=True, top=top)))
    # Multiple args with the same dest are counted as 1 arg.
    count = (len(self.GetDestinations(have_required)) +
             len(self.GetDestinations(have_optional)))
    if ai.is_mutex:
      conflict = usage_text.GetArgUsage(ai, value=False, hidden=True, top=top)
      if is_required and ai.is_required:
        if count != 1:
          if count:
            argument = usage_text.GetArgUsage(
                sorted(have_required + have_optional,
                       key=usage_text.GetArgSortKey)[0],
                value=False, hidden=True, top=top)
            try:
              flag = namespace.GetFlagArgument(argument)
            except parser_errors.UnknownDestinationException:
              flag = None
            if flag:
              value = namespace.GetValue(flag.dest)
              if not isinstance(value, (bool, dict, list)):
                argument = self._AddLocations(argument, value)
          else:
            argument = None
          self._Error(parser_errors.RequiredMutexError(
              parser=self, argument=argument, conflict=conflict))
      elif count > 1:
        argument = usage_text.GetArgUsage(
            sorted(have_required + have_optional,
                   key=usage_text.GetArgSortKey)[0],
            value=False, hidden=True, top=top)
        self._Error(parser_errors.OptionalMutexError(
            parser=self, argument=argument, conflict=conflict))
    return bool(count)
  def parse_known_args(self, args=None, namespace=None):
    """Overrides argparse.ArgumentParser's .parse_known_args method."""
    if args is None:
      args = argv_utils.GetDecodedArgv()[1:]
    if namespace is None:
      namespace = Namespace()
    namespace._SetParser(self)  # pylint: disable=protected-access
    try:
      if self._remainder_action:
        # Remove remainder_action if still there so it is not parsed regularly.
        try:
          self._actions.remove(self._remainder_action)
        except ValueError:
          pass
        # Split on first -- if it exists
        namespace, args = self._remainder_action.ParseKnownArgs(args, namespace)
      # _get_values() updates self._specified_args.
      self._specified_args = namespace._specified_args  # pylint: disable=protected-access
      namespace, unknown_args, error_context = self._ParseKnownArgs(
          args, namespace, wrapper=False)
      # Propagate _specified_args.
      namespace._specified_args.update(self._specified_args)  # pylint: disable=protected-access
      if unknown_args:
        self._Suggest(unknown_args)
      elif error_context:
        if self._probe_error:
          raise _HandleLaterError()
        error_context.parser._DeduceBetterError(  # pylint: disable=protected-access
            error_context, args, namespace)
      namespace._parsers.append(self)  # pylint: disable=protected-access
    finally:
      # Replace action for help message and ArgumentErrors.
      if self._remainder_action:
        self._actions.append(self._remainder_action)
    return (namespace, unknown_args)
  @classmethod
  def _SaveOriginalArgs(cls, original_args):
    if original_args:
      cls._args = original_args[:]
    else:
      cls._args = None
  @classmethod
  def _ClearOriginalArgs(cls):
    cls._args = None
  @classmethod
  def _GetOriginalArgs(cls):
    return cls._args
  def parse_args(self, args=None, namespace=None, raise_error=False):
    """Overrides argparse.ArgumentParser's .parse_args method."""
    self._SaveOriginalArgs(args)
    self.raise_error = raise_error
    namespace, unknown_args, _ = self._ParseKnownArgs(args, namespace)
    # pylint:disable=protected-access
    deepest_parser = namespace._GetParser()
    deepest_parser._specified_args = namespace._specified_args
    if not unknown_args:
      # All of the specified args from all of the subparsers are now known.
      # Check for argument/group conflicts and error out from the deepest
      # parser so the resulting error message has the correct command context.
      for parser in namespace._parsers:
        try:
          # pylint: disable=protected-access
          parser.validate_specified_args(
              parser.ai, namespace._specified_args, namespace)
        except argparse.ArgumentError as e:
          deepest_parser._Error(e)
      if namespace._GetCommand().is_group:
        deepest_parser.error('Command name argument expected.')
      # No argument/group conflicts.
      return namespace
    if deepest_parser._remainder_action:
      # Assume the user wanted to pass all arguments after last recognized
      # arguments into _remainder_action. Either do this with a warning or
      # fail depending on strictness.
      # pylint:disable=protected-access
      try:
        namespace, unknown_args = (
            deepest_parser._remainder_action.ParseRemainingArgs(
                unknown_args, namespace, args))
        # There still may be unknown_args that came before the last known arg.
        if not unknown_args:
          return namespace
      except parser_errors.UnrecognizedArgumentsError:
        # In the case of UnrecognizedArgumentsError, we want to just let it
        # continue so that we can get the nicer error handling.
        pass
    deepest_parser._Suggest(unknown_args)
  def _check_value(self, action, value):
    """Overrides argparse.ArgumentParser's ._check_value(action, value) method.
    Args:
      action: argparse.Action, The action being checked against this value.
      value: The parsed command line argument provided that needs to correspond
          to this action.
    Raises:
      argparse.ArgumentError: If the action and value don't work together.
    """
    is_subparser = isinstance(action, CloudSDKSubParsersAction)
    # When using tab completion, argcomplete monkey patches various parts of
    # argparse and interferes with the normal argument parsing flow.  Here, we
    # need to set self._orig_class because argcomplete compares this
    # directly to argparse._SubParsersAction to see if it should recursively
    # patch this parser.  It should really check to see if it is a subclass
    # but alas, it does not.  If we don't set this, argcomplete will not patch,
    # our subparser and completions below this point won't work.  Normally we
    # would just set this in action.IsValidChoice() but sometimes this
    # sub-element has already been loaded and is already in action.choices.  In
    # either case, we still need argcomplete to patch this subparser so it
    # can compute completions below this point.
    if is_subparser and '_ARGCOMPLETE' in os.environ:
      # pylint:disable=protected-access, Required by argcomplete.
      action._orig_class = argparse._SubParsersAction
    # This is copied from this method in argparse's version of this method.
    if action.choices is None or value in action.choices:
      return
    if isinstance(value, six.string_types):
      arg = value
    else:
      arg = six.text_type(value)
    # We add this to check if we can lazy load the element.
    if is_subparser and action.IsValidChoice(arg):
      return
    # Not something we know, raise an error.
    # pylint:disable=protected-access
    cli_generator = self._calliope_command._cli_generator
    missing_components = cli_generator.ComponentsForMissingCommand(
        self._calliope_command.GetPath() + [arg])
    if missing_components:
      # pylint: disable=g-import-not-at-top
      from googlecloudsdk.core.updater import update_manager
      # pylint: enable=g-import-not-at-top
      msg = ('You do not currently have this command group installed.  Using '
             'it requires the installation of components: '
             '[{missing_components}]'.format(
                 missing_components=', '.join(missing_components)))
      update_manager.UpdateManager.EnsureInstalledAndRestart(
          missing_components, msg=msg)
    if is_subparser:
      # We are going to show the usage anyway, which requires loading
      # everything.  Do this here so that choices gets populated.
      action.LoadAllChoices()
    # Command is not valid, see what we can suggest as a fix...
    message = "Invalid choice: '{0}'.".format(value)
    # Determine if the requested command is available in another release track.
    existing_alternatives = self._ExistingCommandAlternativeReleaseTracks(arg)
    if existing_alternatives:
      message += ('\nThis command is available in one or more alternate '
                  'release tracks.  Try:\n  ')
      message += '\n  '.join(existing_alternatives)
      # Log to analytics the attempt to execute a command.
      # We know the user entered 'value' is a valid command in a different
      # release track. It's safe to include it.
      self._Error(parser_errors.WrongTrackError(
          message,
          parser=self,
          extra_path_arg=arg,
          suggestions=existing_alternatives))
    # If we are dealing with flags, see if the spelling was close to something
    # else that exists here.
    suggestion = None
    hidden_choices = getattr(action, 'hidden_choices', [])
    choices = sorted(c for c in action.choices if c not in hidden_choices)
    if not is_subparser:
      suggester = usage_text.TextChoiceSuggester(choices)
      suggestion = suggester.GetSuggestion(arg)
      if suggestion:
        message += " Did you mean '{0}'?".format(suggestion)
      else:
        # Command group choices will be displayed in the usage message.
        message += '\n\nValid choices are [{0}].'.format(
            ', '.join([six.text_type(c) for c in choices]))
    # Log to analytics the attempt to execute a command.
    # We don't know if the user entered 'value' is a mistyped command or
    # some resource name that the user entered and we incorrectly thought it's
    # a command. We can't include it since it might be PII.
    self._Error(parser_errors.UnknownCommandError(
        message,
        argument=action.option_strings[0] if action.option_strings else None,
        total_unrecognized=1,
        total_suggestions=1 if suggestion else 0,
        suggestions=[suggestion] if suggestion else choices))
  def _CommandAlternativeReleaseTracks(self, value=None):
    """Gets alternatives for the command in other release tracks.
    Args:
      value: str, The value being parsed.
    Returns:
      [CommandCommon]: The alternatives for the command in other release tracks.
    """
    existing_alternatives = []
    # pylint:disable=protected-access
    cli_generator = self._calliope_command._cli_generator
    alternates = cli_generator.ReplicateCommandPathForAllOtherTracks(
        self._calliope_command.GetPath() + ([value] if value else []))
    if alternates:
      top_element = self._calliope_command._TopCLIElement()
      for _, command_path in sorted(six.iteritems(alternates),
                                    key=lambda x: x[0].prefix or ''):
        alternative_cmd = top_element.LoadSubElementByPath(command_path[1:])
        if alternative_cmd and not alternative_cmd.IsHidden():
          existing_alternatives.append(alternative_cmd)
    return existing_alternatives
  def _ExistingFlagAlternativeReleaseTracks(self, arg):
    """Checks whether the arg exists in other tracks of the command.
    Args:
      arg: str, The argument being parsed.
    Returns:
      [str]: The names of alternate commands that the user may use.
    """
    res = []
    for alternate in self._CommandAlternativeReleaseTracks():
      if arg in [f.option_strings[0] for f in alternate.GetAllAvailableFlags(
          include_hidden=False)]:
        res.append(' '.join(alternate.GetPath()) + ' ' + arg)
    return res
  def _ExistingCommandAlternativeReleaseTracks(self, value):
    """Gets the path of alternatives for the command in other release tracks.
    Args:
      value: str, The value being parsed.
    Returns:
      [str]:  The names of alternate commands that the user may use.
    """
    return [' '.join(alternate.GetPath()) for alternate in
            self._CommandAlternativeReleaseTracks(value=value)]
  def _ReportErrorMetricsHelper(self, dotted_command_path, error,
                                error_extra_info=None):
    """Logs `Commands` and `Error` Google Analytics events for an error.
    Args:
      dotted_command_path: str, The dotted path to as much of the command as we
          can identify before an error. Example: gcloud.projects
      error: class, The class (not the instance) of the Exception for an error.
      error_extra_info: {str: json-serializable}, A json serializable dict of
        extra info that we want to log with the error. This enables us to write
        queries that can understand the keys and values in this dict.
    """
    specified_args = self.GetSpecifiedArgNames()
    metrics.Commands(
        dotted_command_path,
        config.CLOUD_SDK_VERSION,
        specified_args,
        error=error,
        error_extra_info=error_extra_info)
    metrics.Error(
        dotted_command_path,
        error,
        specified_args,
        error_extra_info=error_extra_info)
  def ReportErrorMetrics(self, error, message):
    """Reports Command and Error metrics in case of argparse errors.
    Args:
      error: Exception, The Exception object.
      message: str, The exception error message.
    """
    dotted_command_path = '.'.join(self._calliope_command.GetPath())
    # Check for parser_errors.ArgumentError with metrics payload.
    if isinstance(error, parser_errors.ArgumentError):
      if error.extra_path_arg:
        dotted_command_path = '.'.join([dotted_command_path,
                                        error.extra_path_arg])
      self._ReportErrorMetricsHelper(dotted_command_path,
                                     error.__class__,
                                     error.error_extra_info)
      return
    # No specific exception with metrics, try to detect error from message.
    if 'too few arguments' in message:
      self._ReportErrorMetricsHelper(dotted_command_path,
                                     parser_errors.TooFewArgumentsError)
      return
    # Catchall for any error we didn't explicitly detect.
    self._ReportErrorMetricsHelper(dotted_command_path,
                                   parser_errors.OtherParsingError)
  def error(self, message='', context=None, reproduce=False):
    """Overrides argparse.ArgumentParser's .error(message) method.
    Specifically, it avoids reprinting the program name and the string
    "error:".
    Args:
      message: str, The error message to print.
      context: _ErrorContext, An error context with affected parser.
      reproduce: bool, Reproduce a previous call to this method from context.
    Raises:
      _HandleLaterError: if the error should be handled in a subsequent call to
        this method.
      DryRunError: If CLOUDSDK_CORE_DRY_RUN is set to 1.
    """
    # Ignore errors better handled by validate_specified_args().
    if '_ARGCOMPLETE' not in os.environ:
      if re.search('too few arguments', message):
        return
      if (re.search('arguments? .* required', message) and
          not re.search('in dict arg but not provided', message) and
          not re.search(r'\[.*\brequired\b.*\]', message)):
        return
    if reproduce and context:
      # Reproduce a previous call to this method from the info in context.
      message = context.message
      parser = context.parser
      error = context.error
      if not error:
        error = parser_errors.ArgumentError(message, parser=self)
    else:
      if context:
        message = context.message
        parser = context.parser
        error = context.error
      else:
        if 'Invalid choice:' in message:
          exc = parser_errors.UnrecognizedArgumentsError
        else:
          exc = parser_errors.ArgumentError
        if message:
          message = re.sub(r"\bu'", "'", message)
        error = exc(message, parser=self)
        parser = self
      if ('_ARGCOMPLETE' not in os.environ and
          not isinstance(error, parser_errors.DetailedArgumentError) and
          (
              self._probe_error or
              'Invalid choice' in message
          )
         ):
        if self._probe_error and 'expected one argument' in message:
          raise _HandleLaterError()
        # Save this context for later. We may be able to deduce a better error
        # message. For instance, argparse might complain about an invalid
        # command choice 'flag-value' for '--unknown-flag flag-value', but
        # with a little finagling in parse_known_args() we can verify that
        # '--unknown-flag' is in fact an unknown flag and error out on that.
        self._SetErrorContext(context or _ErrorContext(message, parser, error))
        raise _HandleLaterError()
    # Add file/line info if specified.
    prefix = 'argument '
    if context and message.startswith(prefix):
      parts = message.split(':', 1)
      arg = context.AddLocations(parts[0][len(prefix):])
      message = '{}{}:{}'.format(prefix, arg, parts[1])
    # No need to output help/usage text if we are in completion mode. However,
    # we do need to populate group/command level choices. These choices are not
    # loaded when there is a parser error since we do lazy loading.
    if '_ARGCOMPLETE' in os.environ:
      # pylint:disable=protected-access
      if self._calliope_command._sub_parser:
        self._calliope_command.LoadAllSubElements()
    elif not self.raise_error:
      message = console_attr.SafeText(message)
      log.error('({prog}) {message}'.format(prog=self.prog, message=message))
      if (
          'CLOUDSDK_CORE_DRY_RUN' in os.environ
          and os.environ['CLOUDSDK_CORE_DRY_RUN'] == '1'
      ):
        raise DryRunError(message)
      # multi-line message means hints already added, no need for usage.
      # pylint: disable=protected-access
      if '\n' not in message:
        # Provide "Maybe you meant" suggestions if we are dealing with an
        # invalid command.
        suggestions = None
        # "Valid choices" would imply this is an arg issue not a command issue.
        is_invalid_command = ('Invalid choice' in message and
                              'Valid choices' not in message)
        if is_invalid_command:
          suggestions = suggest_commands.GetCommandSuggestions(
              self._GetOriginalArgs())
          self._ClearOriginalArgs()
        if suggestions:
          argparse._sys.stderr.write(
              '\n  '.join(['Maybe you meant:'] + suggestions) + '\n')
          argparse._sys.stderr.write('\n' + _HELP_SEARCH_HINT + '\n')
          error.error_extra_info = {
              'suggestions': suggestions,
              'total_suggestions': len(suggestions),
              'total_unrecognized': 1,
          }
        # Otherwise print out usage string.
        elif 'Command name argument expected.' == message:
          # pylint: disable=g-import-not-at-top
          from googlecloudsdk.core.document_renderers import render_document
          # pylint: enable=g-import-not-at-top
          usage_string = self._calliope_command.GetCategoricalUsage()
          # The next if clause is executed if there were no categories to
          # display.
          uncategorized_usage = False
          if not usage_string:
            uncategorized_usage = True
            usage_string = self._calliope_command.GetUncategorizedUsage()
          interactive = False
          if not uncategorized_usage:
            interactive = console_io.IsInteractive(error=True)
          if interactive:
            out = io.StringIO()
            out.write('{message}\n'.format(message=message))
          else:
            out = argparse._sys.stderr
          out.write('\n')
          render_document.RenderDocument(
              fin=io.StringIO(usage_string), out=out)
          if uncategorized_usage:
            out.write(self._calliope_command.GetHelpHint())
          if interactive:
            console_io.More(out.getvalue(), out=argparse._sys.stderr)
        else:
          usage_string = self._calliope_command.GetUsage()
          argparse._sys.stderr.write(usage_string)
    parser.ReportErrorMetrics(error, message)
    self.exit(2, exception=error)
  def exit(self, status=0, message=None, exception=None):
    """Overrides argparse.ArgumentParser's .exit() method.
    Args:
      status: int, The exit status.
      message: str, The error message to print.
      exception: Exception, The exception that caused the exit, if any.
    """
    del message  # self.error() handles all messaging
    if self.raise_error:
      raise exception
    sys.exit(status)
  def _parse_optional(self, arg_string):
    """Overrides argparse.ArgumentParser's ._parse_optional method.
    This allows the parser to have leading flags included in the grabbed
    arguments and stored in the namespace.
    Args:
      arg_string: str, The argument string.
    Returns:
      The normal return value of argparse.ArgumentParser._parse_optional.
    """
    if not isinstance(arg_string, six.string_types):
      # Flag value injected by --flags-file.
      return None
    positional_actions = self._get_positional_actions()
    option_tuple = super(ArgumentParser, self)._parse_optional(arg_string)
    # If parse_optional finds an action for this arg_string, use that option.
    # Note: option_tuple = (action, option_string, explicit_arg) or None
    known_option = option_tuple and option_tuple[0]
    if (len(positional_actions) == 1 and
        positional_actions[0].nargs == argparse.REMAINDER and
        not known_option):
      return None
    return option_tuple
  def _get_values(self, action, arg_strings):
    """Intercepts argparse.ArgumentParser's ._get_values method.
    This intercept does not actually change any behavior.  We use this hook to
    grab the flags and arguments that are actually seen at parse time.  The
    resulting namespace has entries for every argument (some with defaults) so
    we can't know which the user actually typed.
    Args:
      action: Action, the action that is being processed.
      arg_strings: [str], The values provided for this action.
    Returns:
      Whatever the parent method returns.
    """
    if action.dest != argparse.SUPPRESS:  # argparse SUPPRESS usage
      # Don't look at the action unless it is a real argument or flag. The
      # suppressed destination indicates that it is a SubParsers action.
      name = None
      if action.option_strings:
        # This is a flag, save the first declared name of the flag.
        name = action.option_strings[0]
      elif arg_strings:
        # This is a positional and there are arguments to consume.  Optional
        # positionals will always get to this method, so we need to ignore the
        # ones for which a value was not actually provided.  If it is provided,
        # save the metavar name or the destination name.
        name = action.metavar if action.metavar else action.dest
        if action.nargs and action.nargs != '?':
          # This arg takes in multiple values, record how many were provided.
          # (? means 0 or 1, so treat that as an arg that takes a single value.
          name += ':' + six.text_type(len(arg_strings))
      if name:
        self._specified_args[action.dest] = name
    return super(ArgumentParser, self)._get_values(action, arg_strings)
  def _get_option_tuples(self, option_string):
    """Intercepts argparse.ArgumentParser's ._get_option_tuples method.
    Cloud SDK no longer supports flag abbreviations, so it always returns []
    for the non-arg-completion case to indicate no abbreviated flag matches.
    Args:
      option_string: The option string to match.
    Returns:
      A list of matching flag tuples.
    """
    if '_ARGCOMPLETE' in os.environ:
      return super(ArgumentParser, self)._get_option_tuples(option_string)
    return []  # This effectively disables abbreviations.
# pylint:disable=protected-access
class CloudSDKSubParsersAction(six.with_metaclass(abc.ABCMeta,
                                                  argparse._SubParsersAction)):
  """A custom subclass for arg parsing behavior.
  While the above ArgumentParser overrides behavior for parsing the flags
  associated with a specific group or command, this class overrides behavior
  for loading those sub parsers.
  """
  @abc.abstractmethod
  def IsValidChoice(self, choice):
    """Determines if the given arg is a valid sub group or command.
    Args:
      choice: str, The name of the sub element to check.
    Returns:
      bool, True if the given item is a valid sub element, False otherwise.
    """
    pass
  @abc.abstractmethod
  def LoadAllChoices(self):
    """Load all the choices because we need to know the full set."""
    pass
class CommandGroupAction(CloudSDKSubParsersAction):
  """A subparser for loading calliope command groups on demand.
  We use this to intercept the parsing right before it needs to start parsing
  args for sub groups and we then load the specific sub group it needs.
  """
  def __init__(self, *args, **kwargs):
    self._calliope_command = kwargs.pop('calliope_command')
    super(CommandGroupAction, self).__init__(*args, **kwargs)
  def IsValidChoice(self, choice):
    # When using tab completion, argcomplete monkey patches various parts of
    # argparse and interferes with the normal argument parsing flow.  Usually
    # it is sufficient to check if the given choice is valid here, but delay
    # the loading until __call__ is invoked later during the parsing process.
    # During completion time, argcomplete tries to patch the subparser before
    # __call__ is called, so nothing has been loaded yet.  We need to force
    # load things here so that there will be something loaded for it to patch.
    if '_ARGCOMPLETE' in os.environ:
      self._calliope_command.LoadSubElement(choice)
    return self._calliope_command.IsValidSubElement(choice)
  def LoadAllChoices(self):
    self._calliope_command.LoadAllSubElements()
  def __call__(self, parser, namespace, values, option_string=None):
    # This is the name of the arg that is the sub element that needs to be
    # loaded.
    parser_name = values[0]
    # Load that element if it's there.  If it's not valid, nothing will be
    # loaded and normal error handling will take over.
    if self._calliope_command:
      self._calliope_command.LoadSubElement(parser_name)
    super(CommandGroupAction, self).__call__(
        parser, namespace, values, option_string=option_string)
class DynamicPositionalAction(six.with_metaclass(abc.ABCMeta,
                                                 CloudSDKSubParsersAction)):
  """An argparse action that adds new flags to the parser when it is called.
  We need to use a subparser for this because for a given parser, argparse
  collects all the arg information before it starts parsing. Adding in new flags
  on the fly doesn't work. With a subparser, it is independent so we can load
  flags into here on the fly before argparse loads this particular parser.
  """
  def __init__(self, *args, **kwargs):
    self.hidden = kwargs.pop('hidden', False)
    self._parent_ai = kwargs.pop('parent_ai')
    super(DynamicPositionalAction, self).__init__(*args, **kwargs)
  def IsValidChoice(self, choice):
    # We need to actually create the parser or else check_value will fail if the
    # given choice is not present. We just add it no matter what it is because
    # we don't have access to the namespace to be able to figure out if the
    # choice is actually valid. Invalid choices will raise exceptions once
    # called. We also don't actually care what the values are in here because we
    # register an explicit completer to use for completions, so the list of
    # parsers is not actually used other than to bypass the check_value
    # validation.
    self._AddParser(choice)
    # By default, don't do any checking of the argument. If it is bad, raise
    # an exception when it is called. We don't need to do any on-demand loading
    # here because there are no subparsers of this one, so the above argcomplete
    # issue doesn't matter.
    return True
  def LoadAllChoices(self):
    # We don't need to do this because we will use an explicit completer to
    # complete the names of the options rather than relying on correctly
    # populating the choices.
    pass
  def _AddParser(self, choice):
    # Create a new parser and pass in the calliope_command of the original so
    # that things like help and error reporting continue to work.
    return self.add_parser(
        choice, add_help=False, prog=self._parent_ai.parser.prog,
        calliope_command=self._parent_ai.parser._calliope_command)
  @abc.abstractmethod
  def GenerateArgs(self, namespace, choice):
    pass
  @abc.abstractmethod
  def Completions(self, prefix, parsed_args, **kwargs):
    pass
  def __call__(self, parser, namespace, values, option_string=None):
    choice = values[0]
    args = self.GenerateArgs(namespace, choice)
    sub_parser = self._name_parser_map[choice]
    # This is tricky. When we create a new parser above, that parser does not
    # have any of the flags from the parent command. We need to propagate them
    # all down to this parser like we do in calliope. We also want to add new
    # flags. In order for those to show up in the help, they need to be
    # registered with an ArgumentInterceptor. Here, we create one and seed it
    # with the data of the parent. This actually means that every flag we add
    # to our new parser will show up in the help of the parent parser, even
    # though those flags are not actually on that parser. This is ok because
    # help is always run on the parent ArgumentInterceptor and we want it to
    # show the full set of args.
    ai = parser_arguments.ArgumentInterceptor(
        sub_parser, is_global=False, cli_generator=None,
        allow_positional=True, data=self._parent_ai.data)
    for flag in itertools.chain(self._parent_ai.flag_args,
                                self._parent_ai.ancestor_flag_args):
      # Propagate the flags down except the ones we are not supposed to. Note
      # that we *do* copy the help action unlike we usually do because this
      # subparser is going to share the help action of the parent.
      if flag.do_not_propagate or flag.is_required:
        continue
      # We add the flags directly to the parser instead of the
      # ArgumentInterceptor because if we didn't the flags would be duplicated
      # in the help, since we reused the data object from the parent.
      sub_parser._add_action(flag)
    # Update parent display_info in children, children take precedence.
    ai.display_info.AddLowerDisplayInfo(self._parent_ai.display_info)
    # Add args to the parser and remove any collisions if arguments are
    # already registered with the same name.
    for arg in args:
      arg.RemoveFromParser(ai)
      added_arg = arg.AddToParser(ai)
      # Argcomplete patches parsers and actions before call() is called. Since
      # we generate these args at call() time, they have not been patched and
      # causes completion to fail. Since we know that we are not going to be
      # adding any subparsers (the only thing that actually needs to be patched)
      # we fake it here to make argcomplete think it did the patching so it
      # doesn't crash.
      if '_ARGCOMPLETE' in os.environ and not hasattr(added_arg, '_orig_class'):
        added_arg._orig_class = added_arg.__class__
    super(DynamicPositionalAction, self).__call__(
        parser, namespace, values, option_string=option_string)
    # Running two dynamic commands in a row using the same CLI object is a
    # problem because the argparse parsers are saved in between invocations.
    # This is usually fine because everything is static, but in this case two
    # invocations could actually have different dynamic args generated. We
    # have to do two things to get this to work. First we need to clear the
    # parser from the map. If we don't do this, this class doesn't even get
    # called again because the choices are already defined. Second, we need
    # to remove the arguments we added from the ArgumentInterceptor. The
    # parser itself is thrown out, but because we are sharing an
    # ArgumentInterceptor with our parent, it remembers the args that we
    # added. Later, they are propagated back down to us even though they no
    # longer actually exist. When completing, we know we will only be running
    # a single invocation and we need to leave the choices around so that the
    # completer can read them after the command fails to run.
    if '_ARGCOMPLETE' not in os.environ:
      self._name_parser_map.clear()
      # Detaching the argument interceptors here makes the help text work by
      # preventing the accumlation of duplicate entries with each command
      # execution on this CLI.  However, it also foils the ability to map arg
      # dest names back to the original argument, needed for the flag completion
      # style.  It's commented out here just in case help text wins out over
      # argument lookup down the road.
      # for _, arg in args.iteritems():
      #   arg.RemoveFromParser(ai)