HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/command_lib/meta/generate_cli_trees.py
# -*- coding: utf-8 -*- #
# Copyright 2017 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""gcloud CLI tree generators for non-gcloud CLIs.

A CLI tree for a supported command is generated by using the root command plus
`help` or `--help` arguments to do a DFS traversal. Each node is generated
from a man-ish style runtime document.

Supported CLI commands have their own runtime help document quirks, so each is
handled by an ad-hoc parser. The parsers rely on consistency within commands
and between command releases.

The CLI tree for an unsupported command is generated from the output of
`man the-command` and contains only the command root node.
"""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import abc
import json
import os
import re
import shlex
import subprocess
import tarfile
import textwrap

from googlecloudsdk.calliope import cli_tree
from googlecloudsdk.command_lib.static_completion import generate as generate_static
from googlecloudsdk.command_lib.static_completion import lookup
from googlecloudsdk.core import exceptions
from googlecloudsdk.core import log
from googlecloudsdk.core import requests
from googlecloudsdk.core.console import progress_tracker
from googlecloudsdk.core.resource import resource_printer
from googlecloudsdk.core.util import encoding
from googlecloudsdk.core.util import files

import six
from six.moves import range


class Error(exceptions.Error):
  """Exceptions for this module."""


class CommandInvocationError(Error):
  """Command could not be invoked."""


class NoCliTreeForCommandError(Error):
  """Command does not have a CLI tree."""


class CliTreeGenerationError(Error):
  """CLI tree generation failed for command."""


class NoManPageTextForCommandError(Error):
  """Could not get man page text for command."""


def _NormalizeSpace(text):
  """Returns text dedented and multiple non-indent spaces replaced by one."""
  return re.sub('([^ ])   *', r'\1 ', textwrap.dedent(text)).strip('\n')


def _Flag(name, description='', value=None, default=None, type_='string',
          category='', is_global=False, is_required=False, nargs=None):
  """Initializes and returns a flag dict node."""
  return {
      cli_tree.LOOKUP_ATTR: {},
      cli_tree.LOOKUP_CATEGORY: category,
      cli_tree.LOOKUP_DEFAULT: default,
      cli_tree.LOOKUP_DESCRIPTION: _NormalizeSpace(description),
      cli_tree.LOOKUP_GROUP: '',
      cli_tree.LOOKUP_IS_GLOBAL: is_global,
      cli_tree.LOOKUP_IS_HIDDEN: False,
      cli_tree.LOOKUP_IS_REQUIRED: is_required,
      cli_tree.LOOKUP_NAME: name,
      cli_tree.LOOKUP_NARGS: nargs or ('0' if type_ == 'bool' else '1'),
      cli_tree.LOOKUP_VALUE: value,
      cli_tree.LOOKUP_TYPE: type_,
  }


def _Positional(name, description='', default=None, nargs='0'):
  """Initializes and returns a positional dict node."""
  return {
      cli_tree.LOOKUP_DEFAULT: default,
      cli_tree.LOOKUP_DESCRIPTION: _NormalizeSpace(description),
      cli_tree.LOOKUP_NAME: name,
      cli_tree.LOOKUP_NARGS: nargs,
  }


def _Command(path):
  """Initializes and returns a command/group dict node."""
  return {
      cli_tree.LOOKUP_CAPSULE: '',
      cli_tree.LOOKUP_COMMANDS: {},
      cli_tree.LOOKUP_FLAGS: {},
      cli_tree.LOOKUP_GROUPS: {},
      cli_tree.LOOKUP_IS_GROUP: False,
      cli_tree.LOOKUP_IS_HIDDEN: False,
      cli_tree.LOOKUP_PATH: path,
      cli_tree.LOOKUP_POSITIONALS: [],
      cli_tree.LOOKUP_RELEASE: 'GA',
      cli_tree.LOOKUP_SECTIONS: {},
  }


def _GetDirectories(directory=None, warn_on_exceptions=False):
  """Returns the list of directories to search for CLI trees.

  Args:
    directory: The directory containing the CLI tree JSON files. If None
      then the default installation and config directories are used.
    warn_on_exceptions: Emits warning messages in lieu of exceptions.
  """
  # Initialize the list of directories to search for CLI tree files. The default
  # CLI tree is only searched for and generated in directories[0]. Other
  # existing trees are updated in the directory in which they were found. New
  # trees are generated in directories[-1].
  directories = []
  if directory:
    directories.append(directory)
  else:
    try:
      directories.append(cli_tree.CliTreeDir())
    except cli_tree.SdkRootNotFoundError as e:
      if not warn_on_exceptions:
        raise
      log.warning(six.text_type(e))
    directories.append(cli_tree.CliTreeConfigDir())
  return directories


class CliTreeGenerator(six.with_metaclass(abc.ABCMeta, object)):
  """Base CLI tree generator.

  Attributes:
    command_name: str, The name of the CLI tree command.
  """

  _FAILURES = None

  @classmethod
  def MemoizeFailures(cls, enable):
    """Memoizes failed attempts and doesn't repeat them if enable is True."""
    cls._FAILURES = set() if enable else None

  @classmethod
  def AlreadyFailed(cls, command):
    """Returns True if man page request for command already failed."""
    return command in cls._FAILURES if cls._FAILURES else False

  @classmethod
  def AddFailure(cls, command):
    """Add command to the set of failed man generations."""
    if cls._FAILURES is not None:
      cls._FAILURES.add(command)

  def __init__(self, command_name, root_command_args=None):
    """Initializes the CLI tree generator.

    Args:
      command_name: str, The name of the CLI tree command (e.g. 'gsutil').
      root_command_args: [str], The argument list to invoke the root CLI tree
        command. Examples:
        * ['gcloud']
        * ['python', '/tmp/tarball_dir/gsutil/gsutil']
    Raises:
      CommandInvocationError: If the provided root command cannot be invoked.
    """
    if root_command_args:
      with files.FileWriter(os.devnull) as devnull:
        try:
          # We don't actually care about whether the root command succeeds here;
          # we just want to see if it can be invoked.
          subprocess.Popen(root_command_args, stdin=devnull, stdout=devnull,
                           stderr=devnull).communicate()
        except OSError as e:
          raise CommandInvocationError(e)

    self.command_name = command_name
    self._root_command_args = root_command_args or [command_name]
    self._cli_version = None  # For memoizing GetVersion()

  def Run(self, cmd):
    """Runs the root command with args given by cmd and returns the output.

    Args:
      cmd: [str], List of arguments to the root command.
    Returns:
      str, Output of the given command.
    """
    return encoding.Decode(
        subprocess.check_output(self._root_command_args + cmd))

  def GetVersion(self):
    """Returns the CLI_VERSION string."""
    if not self._cli_version:
      try:
        self._cli_version = self.Run(['version']).split()[-1]
      except:  # pylint: disable=bare-except
        self._cli_version = cli_tree.CLI_VERSION_UNKNOWN
    return self._cli_version

  @abc.abstractmethod
  def Generate(self):
    """Generates and returns the CLI tree dict."""
    return None

  def FindTreeFile(self, directories):
    """Returns (path,f) open for read for the first CLI tree in directories."""
    for directory in directories or _GetDirectories(warn_on_exceptions=True):
      path = os.path.join(directory or '.', self.command_name) + '.json'
      try:
        return path, files.FileReader(path)
      except files.Error:
        pass
    return path, None

  def IsUpToDate(self, tree, verbose=False):
    """Returns a bool tuple (readonly, up_to_date)."""

    actual_cli_version = tree.get(cli_tree.LOOKUP_CLI_VERSION)
    readonly = actual_cli_version == cli_tree.CLI_VERSION_READONLY

    # Check the schema version.
    actual_tree_version = tree.get(cli_tree.LOOKUP_VERSION)
    if actual_tree_version != cli_tree.VERSION:
      return readonly, False

    # Check the CLI version.
    expected_cli_version = self.GetVersion()
    if readonly:
      # READONLY trees are always up to date.
      pass
    elif expected_cli_version == cli_tree.CLI_VERSION_UNKNOWN:
      # Don't know how to regenerate but we have one in hand -- accept it.
      pass
    elif actual_cli_version != expected_cli_version:
      return readonly, False

    # Schema and CLI versions are up to date.
    if verbose:
      log.status.Print('[{}] CLI tree version [{}] is up to date.'.format(
          self.command_name, actual_cli_version))
    return readonly, True

  def LoadOrGenerate(self, directories=None, force=False, generate=True,
                     ignore_out_of_date=False, tarball=False, verbose=False,
                     warn_on_exceptions=False):
    """Loads the CLI tree or generates it if necessary, and returns the tree."""
    f = None
    try:
      path, f = self.FindTreeFile(directories)
      if f:
        up_to_date = False
        try:
          tree = json.load(f)
        except ValueError:
          # Corrupt JSON -- could have been interrupted.
          tree = None
        if tree:
          readonly, up_to_date = self.IsUpToDate(tree, verbose=verbose)
          if readonly:
            return tree
          elif up_to_date:
            if not force:
              return tree
          elif ignore_out_of_date:
            return None
    finally:
      if f:
        f.close()

    def _Generate():
      """Helper that generates a CLI tree and writes it to a JSON file."""
      tree = self.Generate()
      if tree:
        try:
          f = files.FileWriter(path)
        except files.Error as e:
          # CLI data config dir may not be initialized yet.
          directory, _ = os.path.split(path)
          try:
            files.MakeDir(directory)
            f = files.FileWriter(path)
          except files.Error:
            if not warn_on_exceptions:
              raise
            log.warning(six.text_type(e))
            return None
        with f:
          resource_printer.Print(tree, print_format='json', out=f)
      return tree

    # At this point:
    #   (1) the tree is not found or is out of date
    #   (2) the tree is not readonly
    #   (3) we have a generator for the tree

    if not generate:
      raise NoCliTreeForCommandError(
          'No CLI tree for [{}].'.format(self.command_name))
    if not verbose:
      return _Generate()
    with progress_tracker.ProgressTracker(
        '{} the [{}] CLI tree'.format(
            'Updating' if f else 'Generating', self.command_name)):
      return _Generate()


class _BqCollector(object):
  """bq help document section collector."""

  def __init__(self, text):
    self.text = text.split('\n')
    self.heading = 'DESCRIPTION'
    self.lookahead = None
    self.ignore_trailer = False

  def Collect(self, strip_headings=False):
    """Returns the heading and content lines from text."""
    content = []
    if self.lookahead:
      if not strip_headings:
        content.append(self.lookahead)
      self.lookahead = None
    heading = self.heading
    self.heading = None
    while self.text:
      line = self.text.pop(0)
      if line.startswith(' ') or not strip_headings and not self.ignore_trailer:
        content.append(line.rstrip())
    while content and not content[0]:
      content.pop(0)
    while content and not content[-1]:
      content.pop()
    self.ignore_trailer = True
    return heading, content


class BqCliTreeGenerator(CliTreeGenerator):
  """bq CLI tree generator."""

  def Run(self, cmd):
    """Runs the root command with args given by cmd and returns the output.

    Args:
      cmd: [str], List of arguments to the root command.
    Returns:
      str, Output of the given command.
    """
    try:
      output = subprocess.check_output(self._root_command_args + cmd)
    except subprocess.CalledProcessError as e:
      # bq exit code is 1 for help and --help. How do you know if help failed?
      if e.returncode != 1:
        raise
      output = e.output
    return encoding.Decode(output).replace('bq.py', 'bq')

  def AddFlags(self, command, content, is_global=False):
    """Adds flags in content lines to command."""
    while content:
      line = content.pop(0)
      name, description = line.strip().split(':', 1)
      paragraph = [description.strip()]
      default = ''
      while content and not content[0].startswith('  --'):
        line = content.pop(0).strip()
        if line.startswith('(default: '):
          default = line[10:-1]
        else:
          paragraph.append(line)
      description = ' '.join(paragraph).strip()
      if name.startswith('--[no]'):
        name = '--' + name[6:]
        type_ = 'bool'
        value = ''
      else:
        value = 'VALUE'
        type_ = 'string'
      command[cli_tree.LOOKUP_FLAGS][name] = _Flag(
          name=name,
          description=description,
          type_=type_,
          value=value,
          default=default,
          is_required=False,
          is_global=is_global,
      )

  def SubTree(self, path):
    """Generates and returns the CLI subtree rooted at path."""
    command = _Command(path)
    command[cli_tree.LOOKUP_IS_GROUP] = True
    text = self.Run(['help'] + path[1:])

    # `bq help` lists help for all commands. Command flags are "defined"
    # by example. We don't attempt to suss that out.
    content = text.split('\n')
    while content:
      line = content.pop(0)
      if not line or not line[0].islower():
        continue
      name, text = line.split(' ', 1)
      description = [text.strip()]
      examples = []
      arguments = []
      paragraph = description
      while content and (not content[0] or not content[0][0].islower()):
        line = content.pop(0).strip()
        if line == 'Arguments:':
          paragraph = arguments
        elif line == 'Examples:':
          paragraph = examples
        else:
          paragraph.append(line)
      subcommand = _Command(path + [name])
      command[cli_tree.LOOKUP_COMMANDS][name] = subcommand
      if description:
        subcommand[cli_tree.LOOKUP_SECTIONS]['DESCRIPTION'] = '\n'.join(
            description)
      if examples:
        subcommand[cli_tree.LOOKUP_SECTIONS]['EXAMPLES'] = '\n'.join(
            examples)

    return command

  def Generate(self):
    """Generates and returns the CLI tree rooted at self.command_name."""

    # Construct the tree minus the global flags.
    tree = self.SubTree([self.command_name])

    # Add the global flags to the root.
    text = self.Run(['--help'])
    collector = _BqCollector(text)
    _, content = collector.Collect(strip_headings=True)
    self.AddFlags(tree, content, is_global=True)

    # Finally add the version stamps.
    tree[cli_tree.LOOKUP_CLI_VERSION] = self.GetVersion()
    tree[cli_tree.LOOKUP_VERSION] = cli_tree.VERSION

    return tree


class _GsutilCollector(object):
  """gsutil help document section collector."""

  UNKNOWN, ROOT, MAN, TOPIC = list(range(4))

  def __init__(self, text):
    self.text = text.split('\n')
    self.heading = 'CAPSULE'
    self.page_type = self.UNKNOWN

  def Collect(self, strip_headings=False):
    """Returns the heading and content lines from text."""
    content = []
    heading = self.heading
    self.heading = None
    while self.text:
      line = self.text.pop(0)
      if self.page_type == self.UNKNOWN:
        # The first heading distinguishes the document page type.
        if line.startswith('Usage:'):
          self.page_type = self.ROOT
          continue
        elif line == 'NAME':
          self.page_type = self.MAN
          heading = 'CAPSULE'
          continue
        elif not line.startswith(' '):
          continue
      elif self.page_type == self.ROOT:
        # The root help page.
        if line == 'Available commands:':
          heading = 'COMMANDS'
          continue
        elif line == 'Additional help topics:':
          self.heading = 'TOPICS'
          break
        elif not line.startswith(' '):
          continue
      elif self.page_type == self.MAN:
        # A command/subcommand man style page.
        if line == 'OVERVIEW':
          self.page_type = self.TOPIC
          self.heading = 'DESCRIPTION'
          break
        elif line == 'SYNOPSIS':
          self.heading = line
          break
        elif line.endswith('OPTIONS'):
          self.heading = 'FLAGS'
          break
        elif line and line[0].isupper():
          self.heading = line.split(' ', 1)[-1]
          break
      elif self.page_type == self.TOPIC:
        # A topic man style page.
        if line and line[0].isupper():
          self.heading = line
          break
      if line.startswith(' ') or not strip_headings:
        content.append(line.rstrip())
    while content and not content[0]:
      content.pop(0)
    while content and not content[-1]:
      content.pop()
    return heading, content


class GsutilCliTreeGenerator(CliTreeGenerator):
  """gsutil CLI tree generator."""

  def __init__(self, *args, **kwargs):
    super(GsutilCliTreeGenerator, self).__init__(*args, **kwargs)
    self.topics = []

  def Run(self, cmd):
    """Runs the root command with args given by cmd and returns the output.

    Args:
      cmd: [str], List of arguments to the root command.
    Returns:
      str, Output of the given command.
    """
    try:
      output = subprocess.check_output(self._root_command_args + cmd)
    except subprocess.CalledProcessError as e:
      # gsutil exit code is 1 for --help depending on the context.
      if e.returncode != 1:
        raise
      output = e.output
    return encoding.Decode(output)

  def AddFlags(self, command, content, is_global=False):
    """Adds flags in content lines to command."""

    def _Add(name, description):
      value = ''
      type_ = 'bool'
      default = ''
      command[cli_tree.LOOKUP_FLAGS][name] = _Flag(
          name=name,
          description=description,
          type_=type_,
          value=value,
          default=default,
          is_required=False,
          is_global=is_global,
      )

    parse = re.compile(' *((-[^ ]*,)* *(-[^ ]*) *)(.*)')
    name = None
    description = []
    for line in content:
      if line.startswith('  -'):
        if name:
          _Add(name, '\n'.join(description))
        match = parse.match(line)
        name = match.group(3)
        description = [match.group(4).rstrip()]
      elif len(line) > 16:
        description.append(line[16:].rstrip())
    if name:
      _Add(name, '\n'.join(description))

  def SubTree(self, path):
    """Generates and returns the CLI subtree rooted at path."""
    command = _Command(path)
    is_help_command = len(path) > 1 and path[1] == 'help'
    if is_help_command:
      cmd = path[1:]
    else:
      cmd = path[1:] + ['--help']
    text = self.Run(cmd)
    collector = _GsutilCollector(text)

    while True:
      heading, content = collector.Collect()
      if not heading:
        break
      elif heading == 'CAPSULE':
        if content:
          command[cli_tree.LOOKUP_CAPSULE] = content[0].split('-', 1)[1].strip()
      elif heading == 'COMMANDS':
        if is_help_command:
          continue
        for line in content:
          try:
            name = line.split()[0]
          except IndexError:
            continue
          if name == 'update':
            continue
          command[cli_tree.LOOKUP_IS_GROUP] = True
          command[cli_tree.LOOKUP_COMMANDS][name] = self.SubTree(path + [name])
      elif heading == 'FLAGS':
        self.AddFlags(command, content)
      elif heading == 'SYNOPSIS':
        commands = []
        for line in content:
          if not line:
            break
          cmd = line.split()
          if len(cmd) <= len(path):
            continue
          if cmd[:len(path)] == path:
            name = cmd[len(path)]
            if name[0].islower() and name not in ('off', 'on', 'false', 'true'):
              commands.append(name)
        if len(commands) > 1:
          command[cli_tree.LOOKUP_IS_GROUP] = True
          for name in commands:
            command[cli_tree.LOOKUP_COMMANDS][name] = self.SubTree(
                path + [name])
      elif heading == 'TOPICS':
        for line in content:
          try:
            self.topics.append(line.split()[0])
          except IndexError:
            continue
      elif heading.isupper():
        if heading.lower() == path[-1]:
          heading = 'DESCRIPTION'
        command[cli_tree.LOOKUP_SECTIONS][heading] = '\n'.join(
            [line[2:] for line in content])

    return command

  def Generate(self):
    """Generates and returns the CLI tree rooted at self.command_name."""
    tree = self.SubTree([self.command_name])

    # Add the global flags to the root.
    text = self.Run(['help', 'options'])
    collector = _GsutilCollector(text)
    while True:
      heading, content = collector.Collect()
      if not heading:
        break
      if heading == 'FLAGS':
        self.AddFlags(tree, content, is_global=True)

    # Add the help topics.
    help_command = tree[cli_tree.LOOKUP_COMMANDS]['help']
    help_command[cli_tree.LOOKUP_IS_GROUP] = True
    for topic in self.topics:
      help_command[cli_tree.LOOKUP_COMMANDS][topic] = self.SubTree(
          help_command[cli_tree.LOOKUP_PATH] + [topic])

    # Finally add the version stamps.
    tree[cli_tree.LOOKUP_CLI_VERSION] = self.GetVersion()
    tree[cli_tree.LOOKUP_VERSION] = cli_tree.VERSION

    return tree


class _KubectlCollector(object):
  """Kubectl help document section collector."""

  def __init__(self, text):
    self.text = text.split('\n')
    self.heading = 'DESCRIPTION'
    self.lookahead = None
    self.ignore_trailer = False

  def Collect(self, strip_headings=False):
    """Returns the heading and content lines from text."""
    content = []
    if self.lookahead:
      if not strip_headings:
        content.append(self.lookahead)
      self.lookahead = None
    heading = self.heading
    self.heading = None
    while self.text:
      line = self.text.pop(0)
      usage = 'Usage:'
      if line.startswith(usage):
        line = line[len(usage):].strip()
        if line:
          self.lookahead = line
        self.heading = 'USAGE'
        break
      if line.endswith(':'):
        if 'Commands' in line:
          self.heading = 'COMMANDS'
          break
        if 'Examples' in line:
          self.heading = 'EXAMPLES'
          break
        if 'Options' in line:
          self.heading = 'FLAGS'
          break
      if line.startswith(' ') or not strip_headings and not self.ignore_trailer:
        content.append(line.rstrip())
    while content and not content[0]:
      content.pop(0)
    while content and not content[-1]:
      content.pop()
    self.ignore_trailer = True
    return heading, content


class KubectlCliTreeGenerator(CliTreeGenerator):
  """kubectl CLI tree generator."""

  def AddFlags(self, command, content, is_global=False):
    """Adds flags in content lines to command."""
    for line in content:
      flags, description = line.strip().split(':', 1)
      flag = flags.split(', ')[-1]
      name, value = flag.split('=')
      if value in ('true', 'false'):
        value = ''
        type_ = 'bool'
      else:
        value = 'VALUE'
        type_ = 'string'
      default = ''
      command[cli_tree.LOOKUP_FLAGS][name] = _Flag(
          name=name,
          description=description,
          type_=type_,
          value=value,
          default=default,
          is_required=False,
          is_global=is_global,
      )

  def SubTree(self, path):
    """Generates and returns the CLI subtree rooted at path."""
    command = _Command(path)
    text = self.Run(path[1:] + ['--help'])
    collector = _KubectlCollector(text)

    while True:
      heading, content = collector.Collect()
      if not heading:
        break
      elif heading == 'COMMANDS':
        for line in content:
          try:
            name = line.split()[0]
          except IndexError:
            continue
          command[cli_tree.LOOKUP_IS_GROUP] = True
          command[cli_tree.LOOKUP_COMMANDS][name] = self.SubTree(path + [name])
      elif heading in ('DESCRIPTION', 'EXAMPLES'):
        command[cli_tree.LOOKUP_SECTIONS][heading] = '\n'.join(content)
      elif heading == 'FLAGS':
        self.AddFlags(command, content)
    return command

  def GetVersion(self):
    """Returns the CLI_VERSION string."""
    if not self._cli_version:
      try:
        verbose_version = self.Run(['version', '--client'])
        match = re.search('GitVersion:"([^"]*)"', verbose_version)
        self._cli_version = match.group(1)
      except:  # pylint: disable=bare-except
        self._cli_version = cli_tree.CLI_VERSION_UNKNOWN
    return self._cli_version

  def Generate(self):
    """Generates and returns the CLI tree rooted at self.command_name."""

    # Construct the tree minus the global flags.
    tree = self.SubTree([self.command_name])

    # Add the global flags to the root.
    text = self.Run(['options'])
    collector = _KubectlCollector(text)
    _, content = collector.Collect(strip_headings=True)
    content.append('  --help=true: List detailed command help.')
    self.AddFlags(tree, content, is_global=True)

    # Finally add the version stamps.
    tree[cli_tree.LOOKUP_CLI_VERSION] = self.GetVersion()
    tree[cli_tree.LOOKUP_VERSION] = cli_tree.VERSION

    return tree


class _ManPageCollector(six.with_metaclass(abc.ABCMeta, object)):
  """man page help document section collector base class.

  Attributes:
    command_name: The man page command name.
    content_indent: A string of space characters representing the indent of
      the first line of content for any section.
    heading: The heading for the next call to Collect().
    text: The list of man page lines.
    version: The collector CLI_VERSION string.
  """

  def __init__(self, command_name):
    self.command_name = command_name
    self.content_indent = None
    self.heading = None
    self.text = self.GetManPageText().split('\n')

  @classmethod
  @abc.abstractmethod
  def GetVersion(cls):
    """Returns the CLI_VERSION string."""
    return None

  @abc.abstractmethod
  def _GetRawManPageText(self):
    """Returns the raw man page text."""
    return None

  @abc.abstractmethod
  def GetManPageText(self):
    """Returns the preprocessed man page text."""
    return None

  def Collect(self):
    """Returns the heading and content lines from text."""
    content = []
    heading = self.heading
    self.heading = None
    while self.text:
      line = self.text.pop(0)
      if not heading:
        # No NAME no man page.
        if line == 'NAME':
          heading = line
        continue
      elif not line:
        pass
      elif line[0] == ' ':
        if not self.content_indent:
          self.content_indent = re.sub('[^ ].*', '', line)
        if len(line) > len(self.content_indent):
          indented_char = line[len(self.content_indent)]
          if not line.startswith(self.content_indent):
            # Subsection heading or category.
            line = '### ' + line.strip()
          elif heading == 'DESCRIPTION' and indented_char == '-':
            # Some man pages, like GNU ls(1), inline flags in DESCRIPTION.
            self.text.insert(0, line)
            self.heading = 'FLAGS'
            break
          elif heading == 'FLAGS' and indented_char not in (' ', '-'):
            self.text.insert(0, line)
            self.heading = 'DESCRIPTION'
            break
      elif line in ('SYNOPSIS', 'DESCRIPTION', 'EXIT STATUS', 'SEE ALSO'):
        self.heading = line
        break
      elif 'FLAGS' in line or 'OPTIONS' in line:
        self.heading = 'FLAGS'
        break
      elif line and line[0].isupper():
        self.heading = line.split(' ', 1)[-1]
        break
      content.append(line.rstrip())
    while content and not content[0]:
      content.pop(0)
    while content and not content[-1]:
      content.pop()
    return heading, content


class _ManCommandCollector(_ManPageCollector):
  """man command help document section collector."""

  _CLI_VERSION = 'man-0.1'

  @classmethod
  def GetVersion(cls):
    return cls._CLI_VERSION

  def _GetRawManPageText(self):
    """Returns the raw man page text."""
    try:
      with files.FileWriter(os.devnull) as f:
        return encoding.Decode(subprocess.check_output(
            ['man', self.command_name], stderr=f))
    except (OSError, subprocess.CalledProcessError):
      raise NoManPageTextForCommandError(
          'Cannot get man(1) command man page text for [{}].'.format(
              self.command_name))

  def GetManPageText(self):
    """Returns the preprocessed man page text."""
    text = self._GetRawManPageText()
    return re.sub(
        '.\b', '', re.sub(
            '(\u2010|\\u2010)\n *', '', text))


class _ManUrlCollector(_ManPageCollector):
  """man URL help document section collector."""

  _CLI_VERSION = 'man7.org-0.1'

  @classmethod
  def GetVersion(cls):
    return cls._CLI_VERSION

  def _GetRawManPageText(self):
    """Returns the raw man page text."""
    session = requests.GetSession()
    url = 'http://man7.org/linux/man-pages/man1/{}.1.html'.format(
        self.command_name)
    response = session.get(url)
    if response.status_code != 200:
      raise NoManPageTextForCommandError(
          'Cannot get URL man page text for [{}].'.format(self.command_name))
    return response.text

  def GetManPageText(self):
    """Returns the text man page for self.command_name from a URL."""
    text = self._GetRawManPageText()

    # A little sed(1) to clean up the html header/trailer and anchors.
    # A mispplaced .* or (...) group match could affect timing. Keep track
    # of that if you change any of the pattern,replacement tuples.
    for pattern, replacement in (
        ('<span class="footline">.*', ''),
        ('<h2><a id="([^"]*)"[^\n]*\n', '\\1\n'),
        ('<b>( +)', '\\1*'),
        ('( +)</b>', '*\\1'),
        ('<i>( +)', '\\1_'),
        ('( +)</i>', '_\\1'),
        ('</?b>', '*'),
        ('</?i>', '_'),
        ('</pre>', ''),
        ('<a href="([^"]*)">([^\n]*)</a>', '[\\1](\\2)'),
        ('&amp;', '\\&'),
        ('&gt;', '>'),
        ('&lt;', '<'),
        ('&#39;', "'"),
    ):
      text = re.sub(pattern, replacement, text, flags=re.DOTALL)

    # ... and some non-regular edits to finish up.
    lines = []
    top = 'NAME'
    flags = False
    paragraph = False
    for line in text.split('\n'):
      if top and line == 'NAME':
        # Ignore all lines before the NAME section.
        top = None
        lines = []
      if line.startswith('       *-'):  # NOTICE: NOT a regex!
        # Drop font embellishment markdown from flag definition list lines.
        flags = True
        if paragraph:
          # Blank line was really a paragraph.
          paragraph = False
          lines.append('')
        if '  ' in line[7:]:
          head, tail = line[7:].split('  ', 1)
          head = re.sub('\\*', '', head)
          line = '  '.join(['     ', head, tail])
        else:
          line = re.sub('\\*', '', line)
      elif flags:
        if not line:
          paragraph = True
          continue
        elif not line.startswith('       '):
          flags = False
          paragraph = False
        elif paragraph:
          if not line[0].lower():
            # A real paragraph
            lines.append('+')
          paragraph = False
      lines.append(line)
    return '\n'.join(lines)


class ManPageCliTreeGenerator(CliTreeGenerator):
  """man page CLI tree generator."""

  @classmethod
  def _GetManPageCollectorType(cls):
    """Returns the man page collector type."""
    if files.FindExecutableOnPath('man'):
      return _ManCommandCollector
    return _ManUrlCollector

  def __init__(self, command_name):
    super(ManPageCliTreeGenerator, self).__init__(command_name)
    self.collector_type = self._GetManPageCollectorType()

  def GetVersion(self):
    """Returns the CLI_VERSION string."""
    if not self.collector_type:
      return cli_tree.CLI_VERSION_UNKNOWN
    return self.collector_type.GetVersion()

  def AddFlags(self, command, content, is_global=False):
    """Adds flags in content lines to command."""

    def _NameTypeValueNargs(name, type_=None, value=None, nargs=None):
      """Returns the (name, type, value-metavar, nargs) for flag name."""
      if name.startswith('--'):
        # --foo
        if '=' in name:
          name, value = name.split('=', 1)
          if name[-1] == '[':
            name = name[:-1]
            if value.endswith(']'):
              value = value[:-1]
            nargs = '?'
          else:
            nargs = '1'
          type_ = 'string'
      elif len(name) > 2:
        # -b VALUE
        value = name[2:]
        if value[0].isspace():
          value = value[1:]
        if value.startswith('['):
          value = value[1:]
          if value.endswith(']'):
            value = value[:-1]
          nargs = '?'
        else:
          nargs = '1'
        name = name[:2]
        type_ = 'string'
      if type_ is None or value is None or nargs is None:
        type_ = 'bool'
        value = ''
        nargs = '0'
      return (name, type_, value, nargs)

    def _Add(name, description, category, type_, value, nargs):
      """Adds a flag."""
      name, type_, value, nargs = _NameTypeValueNargs(name, type_, value, nargs)
      default = ''
      command[cli_tree.LOOKUP_FLAGS][name] = _Flag(
          name=name,
          description='\n'.join(description),
          type_=type_,
          value=value,
          nargs=nargs,
          category=category,
          default=default,
          is_required=False,
          is_global=is_global,
      )

    def _AddNames(names, description, category):
      """Add a flag name list."""
      if names:
        _, type_, value, nargs = _NameTypeValueNargs(names[-1])
        for name in names:
          _Add(name, description, category, type_, value, nargs)

    names = []
    description = []
    category = ''
    for line in content:
      if line.lstrip().startswith('-'):
        _AddNames(names, description, category)
        line = line.lstrip()
        names = line.strip().replace(', -', ', --').split(', -')
        if ' ' in names[-1]:
          names[-1], text = names[-1].split(' ', 1)
          description = [text.strip()]
        else:
          description = []
      elif line.startswith('### '):
        category = line[4:]
      else:
        description.append(line)
    _AddNames(names, description, category)

  def _Generate(self, path, collector):
    """Generates and returns the CLI subtree rooted at path."""
    command = _Command(path)

    while True:
      heading, content = collector.Collect()
      if not heading:
        break
      elif heading == 'NAME':
        if content:
          command[cli_tree.LOOKUP_CAPSULE] = re.sub(
              '.* -+  *', '', content[0]).strip()
      elif heading == 'FLAGS':
        self.AddFlags(command, content)
      elif heading not in ('BUGS', 'COLOPHON', 'COMPATIBILITY', 'HISTORY',
                           'STANDARDS', 'SYNOPSIS'):
        blocks = []
        begin = 0
        end = 0
        while end < len(content):
          if content[end].startswith('###'):
            if begin < end:
              blocks.append(_NormalizeSpace('\n'.join(content[begin:end])))
            blocks.append(content[end])
            begin = end + 1
          end += 1
        if begin < end:
          blocks.append(_NormalizeSpace('\n'.join(content[begin:end])))
        text = '\n'.join(blocks)
        if heading in command[cli_tree.LOOKUP_SECTIONS]:
          command[cli_tree.LOOKUP_SECTIONS][heading] += '\n\n' + text
        else:
          command[cli_tree.LOOKUP_SECTIONS][heading] = text
    return command

  def Generate(self):
    """Generates and returns the CLI tree rooted at self.command_name."""
    if not self.collector_type:
      return None
    collector = self.collector_type(self.command_name)
    if not collector:
      return None
    tree = self._Generate([self.command_name], collector)
    if not tree:
      return None

    # Add the version stamps.
    tree[cli_tree.LOOKUP_CLI_VERSION] = self.GetVersion()
    tree[cli_tree.LOOKUP_VERSION] = cli_tree.VERSION

    return tree


GENERATORS = {
    'bq': BqCliTreeGenerator,
    'gsutil': GsutilCliTreeGenerator,
    'kubectl': KubectlCliTreeGenerator,
}


def LoadOrGenerate(command, directories=None, tarball=None, force=False,
                   generate=True, ignore_out_of_date=False, verbose=False,
                   warn_on_exceptions=False):
  """Returns the CLI tree for command, generating it if it does not exist.

  Args:
    command: The CLI root command. This has the form:
      [executable] [tarball_path/]command_name
      where [executable] and [tarball_path/] are used only for packaging CLI
      trees and may be empty. Examples:
      * gcloud
      * kubectl/kubectl
      * python gsutil/gsutil
    directories: The list of directories containing the CLI tree JSON files.
      If None then the default installation directories are used.
    tarball: For packaging CLI trees. --commands specifies one command that is
      a relative path in this tarball. The tarball is extracted to a temporary
      directory and the command path is adjusted to point to the temporary
      directory.
    force: Update all exitsing trees by forcing them to be out of date if True.
    generate: Generate the tree if it is out of date or does not exist.
    ignore_out_of_date: Ignore out of date trees instead of regenerating.
    verbose: Display a status line for up to date CLI trees if True.
    warn_on_exceptions: Emits warning messages in lieu of generator exceptions.
      Used during installation.

  Returns:
    The CLI tree for command or None if command not found or there is no
      generator.
  """
  parts = shlex.split(command)
  command_executable_args, command_relative_path = parts[:-1], parts[-1]
  _, command_name = os.path.split(command_relative_path)

  # Handle package time command names.
  if command_name.endswith('_lite'):
    command_name = command_name[:-5]

  if command_name.endswith('.py'):
    command_name = command_name[:-3]

  # Don't repeat failed attempts.
  if CliTreeGenerator.AlreadyFailed(command_name):
    if verbose:
      log.status.Print(
          'Skipping CLI tree generation for [{}] due to previous '
          'failure.'.format(command_name))
    return None

  def _LoadOrGenerate(command_path, command_name):
    """Helper."""

    # Instantiate the appropriate generator.
    if command_name in GENERATORS:
      command_args = command_executable_args + [command_path]
      try:
        generator = GENERATORS[command_name](
            command_name, root_command_args=command_args)
      except CommandInvocationError as e:
        if verbose:
          log.status.Print('Command [{}] could not be invoked:\n{}'.format(
              command, e))
        return None
    else:
      generator = ManPageCliTreeGenerator(command_name)

    # Load or (re)generate the CLI tree if possible.
    try:
      return generator.LoadOrGenerate(
          directories=directories,
          force=force,
          generate=generate,
          ignore_out_of_date=ignore_out_of_date,
          verbose=verbose,
          warn_on_exceptions=warn_on_exceptions)
    except NoManPageTextForCommandError:
      pass

    return None

  if command_name == cli_tree.DEFAULT_CLI_NAME:
    tree = cli_tree.Load()
  elif tarball:
    with files.TemporaryDirectory() as tmp:
      tar = tarfile.open(tarball)
      tar.extractall(tmp)
      tree = _LoadOrGenerate(
          os.path.join(tmp, command_relative_path),
          command_name)
  else:
    tree = _LoadOrGenerate(command_relative_path, command_name)
  if not tree:
    CliTreeGenerator.AddFailure(command_name)
  return tree


def UpdateCliTrees(
    cli=None,
    commands=None,
    directory=None,
    tarball=None,
    force=False,
    verbose=False,
    warn_on_exceptions=False,
    skip_completions=False,
):
  """(re)generates the CLI trees in directory if non-existent or out of date.

  This function uses the progress tracker because some of the updates can
  take ~minutes.

  Args:
    cli: The default CLI. If not None then the default CLI is also updated.
    commands: Update only the commands in this list.
    directory: The directory containing the CLI tree JSON files. If None then
      the default installation directories are used.
    tarball: For packaging CLI trees. --commands specifies one command that is a
      relative path in this tarball. The tarball is extracted to a temporary
      directory and the command path is adjusted to point to the temporary
      directory.
    force: Update all exitsing trees by forcing them to be out of date if True.
    verbose: Display a status line for up to date CLI trees if True.
    warn_on_exceptions: Emits warning messages in lieu of exceptions. Used
      during installation.
    skip_completions: Skip updating the static completion CLI tree.

  Raises:
    CliTreeGenerationError: CLI tree generation failed for a command in
      commands.
  """
  directories = _GetDirectories(
      directory=directory, warn_on_exceptions=warn_on_exceptions)
  if not commands:
    commands = set([cli_tree.DEFAULT_CLI_NAME] + list(GENERATORS.keys()))

  failed = []
  for command in sorted(commands):
    if command != cli_tree.DEFAULT_CLI_NAME:
      tree = LoadOrGenerate(command,
                            directories=directories,
                            tarball=tarball,
                            force=force,
                            verbose=verbose,
                            warn_on_exceptions=warn_on_exceptions)
      if not tree:
        failed.append(command)
    elif cli:

      def _Mtime(path):
        try:
          return os.path.getmtime(path)
        except OSError:
          return 0

      # Update the CLI tree.
      cli_tree_path = cli_tree.CliTreeConfigPath(directory=directories[-1])
      cli_tree.Load(cli=cli, path=cli_tree_path, force=force, verbose=verbose)

      # Update the static completion CLI tree if older than the CLI tree. To
      # keep static completion startup lightweight we don't track the release
      # in the tree data. Using the modify time is a minimal sanity check.
      if not skip_completions:
        completion_tree_path = lookup.CompletionCliTreePath(
            directory=directories[0])
        cli_tree_mtime = _Mtime(cli_tree_path)
        completion_tree_mtime = _Mtime(completion_tree_path)
        if (force or not completion_tree_mtime or
            completion_tree_mtime < cli_tree_mtime):
          files.MakeDir(os.path.dirname(completion_tree_path))
          with files.FileWriter(completion_tree_path) as f:
            generate_static.ListCompletionTree(cli, out=f)
        elif verbose:
          log.status.Print(
              '[{}] static completion CLI tree is up to date.'.format(command))

  if failed:
    message = 'CLI tree generation failed for [{}].'.format(
        ', '.join(sorted(failed)))
    if not warn_on_exceptions:
      raise CliTreeGenerationError(message)
    log.warning(message)


def LoadAll(directory=None, ignore_out_of_date=False, root=None,
            warn_on_exceptions=True):
  """Loads all CLI trees in directory and adds them to tree.

  Args:
    directory: The config directory containing the CLI tree modules.
    ignore_out_of_date: Ignore out of date trees instead of regenerating.
    root: dict, The CLI root to update. A new root is created if None.
    warn_on_exceptions: Warn on exceptions instead of raising if True.

  Raises:
    CliTreeVersionError: loaded tree version mismatch
    ImportModuleError: import errors

  Returns:
    The CLI tree.
  """
  # Create the root node if needed.
  if root is None:
    root = cli_tree.Node(description='The CLI tree root.')

  # Load the default CLI if available.
  if cli_tree.DEFAULT_CLI_NAME not in root[cli_tree.LOOKUP_COMMANDS]:
    try:
      root[cli_tree.LOOKUP_COMMANDS][cli_tree.DEFAULT_CLI_NAME] = (
          cli_tree.Load())
    except cli_tree.CliTreeLoadError:
      pass

  # Load extra CLIs by searching directories in order. .json files are treated
  # as CLI modules/data, where the file base name is the name of the CLI root
  # command.
  directories = _GetDirectories(
      directory=directory, warn_on_exceptions=warn_on_exceptions)

  loaded = {cli_tree.DEFAULT_CLI_NAME, '__init__'}  # Already loaded this above.
  for directory in directories:
    if not directory or not os.path.exists(directory):
      continue
    for (dirpath, _, filenames) in os.walk(six.text_type(directory)):
      for filename in sorted(filenames):  # For stability across runs.
        command, extension = os.path.splitext(filename)
        if extension != '.json':
          continue
        if command in loaded:
          # Already loaded. Earlier directory hits take precedence.
          continue
        loaded.add(command)
        if command == cli_tree.DEFAULT_CLI_NAME:
          tree = cli_tree.Load(os.path.join(dirpath, filename))
        else:
          tree = LoadOrGenerate(command,
                                directories=[dirpath],
                                ignore_out_of_date=ignore_out_of_date,
                                warn_on_exceptions=warn_on_exceptions)
        if tree:
          root[cli_tree.LOOKUP_COMMANDS][command] = tree
      # Don't search subdirectories.
      break

  return root