HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/396/lib/googlecloudsdk/command_lib/static_completion/lookup.py
# -*- coding: utf-8 -*- #
# Copyright 2016 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Methods for looking up completions from the static CLI tree."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import os
import shlex
import sys
from googlecloudsdk.core.util import encoding
from googlecloudsdk.core.util import platforms
import six


LINE_ENV_VAR = 'COMP_LINE'
POINT_ENV_VAR = 'COMP_POINT'
IFS_ENV_VAR = '_ARGCOMPLETE_IFS'
IFS_ENV_DEFAULT = '\013'
COMPLETIONS_OUTPUT_FD = 8

FLAG_PREFIX = '--'

FLAG_BOOLEAN = 'bool'
FLAG_DYNAMIC = 'dynamic'
FLAG_VALUE = 'value'
ENV_VAR = 'env_var'

LOOKUP_COMMANDS = 'commands'
LOOKUP_FLAGS = 'flags'

_EMPTY_STRING = ''
_VALUE_SEP = '='
_SPACE = ' '


class CannotHandleCompletionError(Exception):
  """Error for when completions cannot be handled."""
  pass


def _GetCmdLineFromEnv():
  """Gets the command line from the environment.

  Returns:
    str, Command line.
  """
  cmd_line = encoding.GetEncodedValue(os.environ, LINE_ENV_VAR)
  completion_point = int(encoding.GetEncodedValue(os.environ, POINT_ENV_VAR))
  cmd_line = cmd_line[:completion_point]
  return cmd_line


def _GetCmdWordQueue(cmd_line):
  """Converts the given cmd_line to a queue of command line words.

  Args:
    cmd_line: str, full command line before parsing.

  Returns:
    [str], Queue of command line words.
  """
  cmd_words = shlex.split(cmd_line)[1:]  # First word should always be 'gcloud'

  # We need to know if last word was empty. Shlex removes trailing whitespaces.
  if cmd_line[-1] == _SPACE:
    cmd_words.append(_EMPTY_STRING)

  # Reverse so we can use as a queue
  cmd_words.reverse()
  return cmd_words


def GetEnvVarPrefix():
  # TODO(b/207384119) support powershell environment variables
  return '%' if platforms.OperatingSystem.IsWindows() else '$'


def MatchEnvVars(word, env_vars):
  """Returns environment variables beginning with `word`.

  Args:
    word: The word that is being compared to environment variables.
    env_vars: The list of environment variables.

  Returns:
    []: No completions.
    [completions]: List, all possible sorted completions.
  """
  completions = []
  prefix = word[1:]  # exclude '$' or '%' and only use the variable name
  for child in env_vars:
    if child.startswith(prefix):
      if platforms.OperatingSystem.IsWindows():
        completions.append('%' + child + '%')
      else:
        completions.append('$' + child)
  return completions


def _FindCompletions(root, cmd_line):
  """Try to perform a completion based on the static CLI tree.

  Args:
    root: The root of the tree that will be traversed to find completions.
    cmd_line: [str], original command line.

  Raises:
    CannotHandleCompletionError: If FindCompletions cannot handle completion.

  Returns:
    []: No completions.
    [completions]: List, all possible sorted completions.
  """
  words = _GetCmdWordQueue(cmd_line)
  node = root

  global_flags = node[LOOKUP_FLAGS]

  completions = []
  flag_mode = FLAG_BOOLEAN

  env_var_prefix = GetEnvVarPrefix()
  env_vars = os.environ
  while words:
    word = words.pop()

    if word.startswith(FLAG_PREFIX):
      is_flag_word = True
      child_nodes = node.get(LOOKUP_FLAGS, {})
      child_nodes.update(global_flags)
      # Add the value part back to the queue if it exists
      if _VALUE_SEP in word:
        word, flag_value = word.split(_VALUE_SEP, 1)
        # This predates the env var completion but is necessary for completing
        # environment variables that are flag values.
        words.append(flag_value)
    elif word.startswith(env_var_prefix):
      is_flag_word = False
      child_nodes = env_vars
      flag_mode = ENV_VAR
    else:
      is_flag_word = False
      child_nodes = node.get(LOOKUP_COMMANDS, {})

    # Consume word
    if words:
      if word in child_nodes:
        if is_flag_word:
          flag_mode = child_nodes[word]
        else:
          flag_mode = FLAG_BOOLEAN
          node = child_nodes[word]  # Progress to next command node
      elif flag_mode == ENV_VAR:
        continue
      elif flag_mode != FLAG_BOOLEAN:
        flag_mode = FLAG_BOOLEAN
        continue  # Just consume if we are expecting a flag value
      elif not is_flag_word and not node.get(LOOKUP_COMMANDS):
        # If we're at a leaf command node, this could be a positional arg, so
        # consume it and move on.
        flag_mode = FLAG_BOOLEAN
        continue
      else:
        return []  # Non-existing command/flag, so nothing to do

    # Complete word
    else:
      if flag_mode == FLAG_DYNAMIC:
        raise CannotHandleCompletionError(
            'Dynamic completions are not handled by this module')
      elif flag_mode == FLAG_VALUE:
        return []  # Cannot complete, so nothing to do
      elif flag_mode == ENV_VAR:
        completions += MatchEnvVars(word, child_nodes)
      elif flag_mode != FLAG_BOOLEAN:  # Must be list of choices
        for value in flag_mode:
          if value.startswith(word):
            completions.append(value)
      elif not child_nodes:
        raise CannotHandleCompletionError(
            'Positional completions are not handled by this module')
      else:  # Command/flag completion
        for child, value in six.iteritems(child_nodes):
          if not child.startswith(word):
            continue
          if is_flag_word and value != FLAG_BOOLEAN:
            child += _VALUE_SEP
          completions.append(child)
  return sorted(completions)


def _GetInstallationRootDir():
  """Returns the SDK installation root dir."""
  # Intentionally ignoring config path abstraction imports.
  return os.path.sep.join(__file__.split(os.path.sep)[:-5])


def _GetCompletionCliTreeDir():
  """Returns the SDK static completion CLI tree dir."""
  # Intentionally ignoring config path abstraction imports.
  return os.path.join(_GetInstallationRootDir(), 'data', 'cli')


def CompletionCliTreePath(directory=None):
  """Returns the SDK static completion CLI tree path."""
  # Intentionally ignoring config path abstraction imports.
  return os.path.join(
      directory or _GetCompletionCliTreeDir(), 'gcloud_completions.py')


def LoadCompletionCliTree():
  """Loads and returns the static completion CLI tree."""
  try:
    sys_path = sys.path[:]
    sys.path.append(_GetCompletionCliTreeDir())
    import gcloud_completions  # pylint: disable=g-import-not-at-top
    tree = gcloud_completions.STATIC_COMPLETION_CLI_TREE
  except ImportError:
    raise CannotHandleCompletionError(
        'Cannot find static completion CLI tree module.')
  finally:
    sys.path = sys_path
  return tree


def _OpenCompletionsOutputStream():
  """Returns the completions output stream."""
  return os.fdopen(COMPLETIONS_OUTPUT_FD, 'wb')


def _GetCompletions():
  """Returns the static completions, None if there are none."""
  root = LoadCompletionCliTree()
  cmd_line = _GetCmdLineFromEnv()
  return _FindCompletions(root, cmd_line)


def Complete():
  """Attempts completions and writes them to the completion stream."""
  completions = _GetCompletions()
  if completions:
    # The bash/zsh completion scripts set IFS_ENV_VAR to one character.
    ifs = encoding.GetEncodedValue(os.environ, IFS_ENV_VAR, IFS_ENV_DEFAULT)
    # Write completions to stream
    f = None
    try:
      f = _OpenCompletionsOutputStream()
      # the other side also uses the console encoding
      f.write(ifs.join(completions).encode())
    finally:
      if f:
        f.close()