HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/396/lib/googlecloudsdk/core/console/console_io.py
# -*- coding: utf-8 -*- #
# Copyright 2013 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""General console printing utilities used by the Cloud SDK."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import collections
import contextlib
import enum
import getpass
import io
import json
import os
import re
import subprocess
import sys
import textwrap
import threading

from googlecloudsdk.core import exceptions
from googlecloudsdk.core import log
from googlecloudsdk.core import properties
from googlecloudsdk.core.console import console_attr
from googlecloudsdk.core.console import console_pager
from googlecloudsdk.core.console import prompt_completer
from googlecloudsdk.core.util import encoding
from googlecloudsdk.core.util import files
from googlecloudsdk.core.util import platforms

import six
from six.moves import input  # pylint: disable=redefined-builtin
from six.moves import map  # pylint: disable=redefined-builtin
from six.moves import range  # pylint: disable=redefined-builtin


class Error(exceptions.Error):
  """Base exception for the module."""
  pass


class RequiredPromptError(Error):
  """An exception for when a prompt cannot silenced with the --quiet flag."""

  def __init__(self):
    super(RequiredPromptError, self).__init__(
        'This prompt could not be answered because you are not in an '
        'interactive session.  Please re-run the command without the --quiet '
        'flag to respond to the prompts.')


class UnattendedPromptError(Error):
  """An exception for when a prompt cannot be answered."""

  def __init__(self):
    super(UnattendedPromptError, self).__init__(
        'This prompt could not be answered because you are not in an '
        'interactive session.  You can re-run the command with the --quiet '
        'flag to accept default answers for all prompts.')


class OperationCancelledError(Error):
  """An exception for when a prompt cannot be answered."""

  DEFAULT_MESSAGE = 'Aborted by user.'

  def __init__(self, message=None):
    super(OperationCancelledError, self).__init__(
        message or self.DEFAULT_MESSAGE)


# All wrapping is done by this single global wrapper. If you have different
# wrapping needs, consider the _NarrowWrap context manager, below.
_CONSOLE_WIDTH = console_attr.GetConsoleAttr().GetTermSize()[0]
TEXTWRAP = textwrap.TextWrapper(
    replace_whitespace=False,
    drop_whitespace=False,
    break_on_hyphens=False,
    width=_CONSOLE_WIDTH if _CONSOLE_WIDTH > 0 else 80)


def _DoWrap(message):
  """Text wrap the given message and correctly handle newlines in the middle.

  Args:
    message: str, The message to wrap.  It may have newlines in the middle of
      it.

  Returns:
    str, The wrapped message.
  """
  return '\n'.join([TEXTWRAP.fill(line) for line in message.splitlines()])


@contextlib.contextmanager
def _NarrowWrap(narrow_by):
  """Temporarily narrows the global wrapper."""
  TEXTWRAP.width -= narrow_by
  yield TEXTWRAP
  TEXTWRAP.width += narrow_by


def _GetInput():
  try:
    return input()
  except EOFError:
    return None


def ReadFromFileOrStdin(path, binary):
  """Returns the contents of the specified file or stdin if path is '-'.

  Args:
    path: str, The path of the file to read.
    binary: bool, True to open the file in binary mode.

  Raises:
    Error: If the file cannot be read or is larger than max_bytes.

  Returns:
    The contents of the file.
  """
  if path == '-':
    return ReadStdin(binary=binary)
  if binary:
    return files.ReadBinaryFileContents(path)
  return files.ReadFileContents(path)


def ReadStdin(binary=False):
  """Reads data from stdin, correctly accounting for encoding.

  Anything that needs to read sys.stdin must go through this method.

  Args:
    binary: bool, True to read raw bytes, False to read text.

  Returns:
    A text string if binary is False, otherwise a byte string.
  """
  if binary:
    return files.ReadStdinBytes()
  else:
    data = sys.stdin.read()
    if six.PY2:
      # On Python 2, stdin comes in a a byte string. Convert it to text.
      data = console_attr.Decode(data)
    return data


def IsInteractive(output=False, error=False, heuristic=False):
  """Determines if the current terminal session is interactive.

  sys.stdin must be a terminal input stream.

  Args:
    output: If True then sys.stdout must also be a terminal output stream.
    error: If True then sys.stderr must also be a terminal output stream.
    heuristic: If True then we also do some additional heuristics to check if
               we are in an interactive context. Checking home path for example.

  Returns:
    True if the current terminal session is interactive.
  """
  try:
    if not sys.stdin.isatty():
      return False
    if output and not sys.stdout.isatty():
      return False
    if error and not sys.stderr.isatty():
      return False
  except AttributeError:
    # This should only occur when one of the streams is not open.
    return False

  if heuristic:
    # Check the home path. Most startup scripts for example are executed by
    # users that don't have a home path set. Home is OS dependent though, so
    # check everything.
    # *NIX OS usually sets the HOME env variable. It is usually '/home/user',
    # but can also be '/root'. If it's just '/' we are most likely in an init
    # script.
    # Windows usually sets HOMEDRIVE and HOMEPATH. If they don't exist we are
    # probably being run from a task scheduler context. HOMEPATH can be '\'
    # when a user has a network mapped home directory.
    # Cygwin has it all! Both Windows and Linux. Checking both is perfect.
    home = encoding.GetEncodedValue(os.environ, 'HOME')
    homepath = encoding.GetEncodedValue(os.environ, 'HOMEPATH')
    if not homepath and (not home or home == '/'):
      return False
  return True


def IsRunFromShellScript():
  """Check if command is being run from command line or a script."""
  # Commands run from a shell script typically have getppid() == getpgrp()
  if platforms.OperatingSystem.Current() != platforms.OperatingSystem.WINDOWS:
    if os.getppid() == os.getpgrp():
      return True
  return False


def CanPrompt():
  """Returns true if we can prompt the user for information.

  This combines all checks (IsInteractive(), disable_prompts is False) to
  verify that we can prompt the user for information.

  Returns:
    bool, True if we can prompt the user for information.
  """
  return (IsInteractive(error=True) and
          not properties.VALUES.core.disable_prompts.GetBool())


def PromptContinue(message=None, prompt_string=None, default=True,
                   throw_if_unattended=False, cancel_on_no=False,
                   cancel_string=None):
  """Prompts the user a yes or no question and asks if they want to continue.

  Args:
    message: str, The prompt to print before the question.
    prompt_string: str, An alternate yes/no prompt to display.  If None, it
      defaults to 'Do you want to continue'.
    default: bool, What the default answer should be.  True for yes, False for
      no.
    throw_if_unattended: bool, If True, this will throw if there was nothing
      to consume on stdin and stdin is not a tty.
    cancel_on_no: bool, If True and the user answers no, throw an exception to
      cancel the entire operation.  Useful if you know you don't want to
      continue doing anything and don't want to have to raise your own
      exception.
    cancel_string: str, An alternate error to display on No. If None, it
      defaults to 'Aborted by user.'.

  Raises:
    UnattendedPromptError: If there is no input to consume and this is not
      running in an interactive terminal.
    OperationCancelledError: If the user answers no and cancel_on_no is True.

  Returns:
    bool, False if the user said no, True if the user said anything else or if
    prompts are disabled.
  """
  if properties.VALUES.core.disable_prompts.GetBool():
    if not default and cancel_on_no:
      raise OperationCancelledError()
    return default

  style = properties.VALUES.core.interactive_ux_style.Get()
  prompt_generator = (
      _TestPromptContinuePromptGenerator
      if style == properties.VALUES.core.InteractiveUXStyles.TESTING.name
      else _NormalPromptContinuePromptGenerator)

  prompt, reprompt, ending = prompt_generator(
      message=message, prompt_string=prompt_string, default=default,
      throw_if_unattended=throw_if_unattended, cancel_on_no=cancel_on_no,
      cancel_string=cancel_string)
  sys.stderr.write(prompt)

  def GetAnswer(reprompt):
    """Get answer to input prompt."""
    while True:
      answer = _GetInput()
      # pylint:disable=g-explicit-bool-comparison, We explicitly want to
      # distinguish between empty string and None.
      if answer == '':
        # User just hit enter, return default.
        return default
      elif answer is None:
        # This means we hit EOF, no input or user closed the stream.
        if throw_if_unattended and not IsInteractive():
          raise UnattendedPromptError()
        else:
          return default
      elif answer.strip().lower() in ['y', 'yes']:
        return True
      elif answer.strip().lower() in ['n', 'no']:
        return False
      elif reprompt:
        sys.stderr.write(reprompt)

  try:
    answer = GetAnswer(reprompt)
  finally:
    if ending:
      sys.stderr.write(ending)

  if not answer and cancel_on_no:
    raise OperationCancelledError(cancel_string)
  return answer


def _NormalPromptContinuePromptGenerator(
    message, prompt_string, default, throw_if_unattended, cancel_on_no,
    cancel_string):
  """Generates prompts for prompt continue under normal conditions."""
  del throw_if_unattended
  del cancel_on_no
  del cancel_string

  buf = io.StringIO()
  if message:
    buf.write(_DoWrap(message) + '\n\n')

  if not prompt_string:
    prompt_string = 'Do you want to continue'
  if default:
    prompt_string += ' (Y/n)?  '
  else:
    prompt_string += ' (y/N)?  '
  buf.write(_DoWrap(prompt_string))

  return (buf.getvalue(), "Please enter 'y' or 'n':  ", '\n')


def _TestPromptContinuePromptGenerator(
    message, prompt_string, default, throw_if_unattended, cancel_on_no,
    cancel_string):
  """Generates prompts for prompt continue under test."""
  del default
  del throw_if_unattended
  del cancel_on_no
  return (JsonUXStub(
      UXElementType.PROMPT_CONTINUE, message=message,
      prompt_string=prompt_string, cancel_string=cancel_string) + '\n',
          None, None)


def PromptResponse(message=None, choices=None):
  """Prompts the user for a string.

  Args:
    message: str, The prompt to print before the question.
    choices: callable or list, A callable with no arguments that returns the
      list of all choices, or the list of choices.

  Returns:
    str, The string entered by the user, or None if prompts are disabled.
  """
  if properties.VALUES.core.disable_prompts.GetBool():
    return None
  if choices and IsInteractive(error=True):
    return prompt_completer.PromptCompleter(message, choices=choices).Input()
  if (properties.VALUES.core.interactive_ux_style.Get() ==
      properties.VALUES.core.InteractiveUXStyles.TESTING.name):
    sys.stderr.write(JsonUXStub(UXElementType.PROMPT_RESPONSE, message=message))
  else:
    sys.stderr.write(_DoWrap(message))
  return _GetInput()


def PromptWithDefault(message=None, default=None, choices=None):
  """Prompts the user for a string, allowing a default.

  Unlike PromptResponse, this also appends a ':  ' to the prompt.  If 'default'
  is specified, the default is also written written into the prompt (e.g.
  if message is "message" and default is "default", the prompt would be
  "message (default): ").

  The default is returned if the user simply presses enter (no input) or an
  EOF is received.

  Args:
    message: str, The prompt to print before the question.
    default: str, The default value (if any).
    choices: callable or list, A callable with no arguments that returns the
      list of all choices, or the list of choices.

  Returns:
    str, The string entered by the user, or the default if no value was
    entered or prompts are disabled.
  """
  message = message or ''
  if default:
    if message:
      message += ' '
    message += '({default}):  '.format(default=default)
  else:
    message += ':  '
  return PromptResponse(message, choices=choices) or default


def _ParseAnswer(answer, options, allow_freeform):
  """Parses answer and returns 1-based index in options list.

  Args:
    answer: str, The answer input by the user to be parsed as a choice.
    options: [object], A list of objects to select.  Their str()
          method will be used to select them via freeform text.
    allow_freeform: bool, A flag which, if defined, will allow the user to input
          the choice as a str, not just as a number. If not set, only numbers
          will be accepted.

  Returns:
    int, The 1-indexed value in the options list that corresponds to the answer
          that was given, or None if the selection is invalid. Note that this
          function does not do any validation that the value is a valid index
          (in the case that an integer answer was given)
  """
  try:
    # If this fails to parse, will throw a ValueError
    return int(answer)
  except ValueError:
    # Answer is not an int
    pass

  # If the user has specified that they want to allow freeform selections,
  # we will attempt to find a match.
  if not allow_freeform:
    return None

  try:
    return list(map(str, options)).index(answer) + 1
  except ValueError:
    # Answer not an entry in the options list
    pass

  # Couldn't interpret the user's input
  return None


def _SuggestFreeformAnswer(suggester, answer, options):
  """Checks if there is a suitable close choice to suggest.

  Args:
    suggester: object, An object which has methods AddChoices and
      GetSuggestion which is used to detect if an answer which is not present
      in the options list is a likely typo, and to provide a suggestion
      accordingly.
    answer: str, The freeform answer input by the user as a choice.
    options: [object], A list of objects to select.  Their str()
          method will be used to compare them to answer.

  Returns:
    str, the closest option in options to answer, or None otherwise.
  """
  suggester.AddChoices(map(str, options))
  return suggester.GetSuggestion(answer)


def _PrintOptions(options, write, limit=None):
  """Prints the options provided to stderr.

  Args:
    options:  [object], A list of objects to print as choices.  Their str()
      method will be used to display them.
    write: f(x)->None, A function to call to write the data.
    limit: int, If set, will only print the first number of options equal
      to the given limit.
  """
  limited_options = options if limit is None else options[:limit]
  for i, option in enumerate(limited_options):
    write(' [{index}] {option}\n'.format(
        index=i + 1, option=six.text_type(option)))


# This defines the point at which, in a PromptChoice, the options
# will not be listed unless the user enters 'list' and hits enter.
# When too many results are displayed they can cease to be useful.
PROMPT_OPTIONS_OVERFLOW = 50


def PromptChoice(options, default=None, message=None,
                 prompt_string=None, allow_freeform=False,
                 freeform_suggester=None, cancel_option=False):
  """Prompt the user to select a choice from a list of items.

  Args:
    options:  [object], A list of objects to print as choices.  Their
      six.text_type() method will be used to display them.
    default: int, The default index to return if prompting is disabled or if
      they do not enter a choice.
    message: str, An optional message to print before the choices are displayed.
    prompt_string: str, A string to print when prompting the user to enter a
      choice.  If not given, a default prompt is used.
    allow_freeform: bool, A flag which, if defined, will allow the user to input
      the choice as a str, not just as a number. If not set, only numbers will
      be accepted.
    freeform_suggester: object, An object which has methods AddChoices and
      GetSuggestion which is used to detect if an answer which is not present
      in the options list is a likely typo, and to provide a suggestion
      accordingly.
    cancel_option: bool, A flag indicating whether an option to cancel the
      operation should be added to the end of the list of choices.

  Raises:
    ValueError: If no options are given or if the default is not in the range of
      available options.
    OperationCancelledError: If a `cancel` option is selected by user.

  Returns:
    The index of the item in the list that was chosen, or the default if prompts
    are disabled.
  """
  if not options:
    raise ValueError('You must provide at least one option.')
  options = options + ['cancel'] if cancel_option else options
  maximum = len(options)
  if default is not None and not 0 <= default < maximum:
    raise ValueError(
        'Default option [{default}] is not a valid index for the options list '
        '[{maximum} options given]'.format(default=default, maximum=maximum))
  if properties.VALUES.core.disable_prompts.GetBool():
    return default

  style = properties.VALUES.core.interactive_ux_style.Get()
  if style == properties.VALUES.core.InteractiveUXStyles.TESTING.name:
    write = lambda x: None
    sys.stderr.write(JsonUXStub(
        UXElementType.PROMPT_CHOICE, message=message,
        prompt_string=prompt_string,
        choices=[six.text_type(o) for o in options]) + '\n')
  else:
    write = sys.stderr.write

  if message:
    write(_DoWrap(message) + '\n')

  if maximum > PROMPT_OPTIONS_OVERFLOW:
    _PrintOptions(options, write, limit=PROMPT_OPTIONS_OVERFLOW)
    truncated = maximum - PROMPT_OPTIONS_OVERFLOW
    write('Did not print [{truncated}] options.\n'.format(truncated=truncated))
    write('Too many options [{maximum}]. Enter "list" at prompt to print '
          'choices fully.\n'.format(maximum=maximum))
  else:
    _PrintOptions(options, write)

  if not prompt_string:
    if allow_freeform:
      prompt_string = ('Please enter numeric choice or text value (must exactly'
                       ' match list item)')
    else:
      prompt_string = 'Please enter your numeric choice'
  if default is None:
    suffix_string = ':  '
  else:
    suffix_string = ' ({default}):  '.format(default=default + 1)

  def _PrintPrompt():
    write(_DoWrap(prompt_string + suffix_string))
  _PrintPrompt()

  while True:
    answer = _GetInput()
    if answer is None or (not answer and default is not None):
      # Return default if we failed to read from stdin.
      # Return default if the user hit enter and there is a valid default,
      # or raise OperationCancelledError if default is the cancel option.
      # Prompt again otherwise
      write('\n')
      if cancel_option and default == maximum - 1:
        raise OperationCancelledError()
      return default
    if answer == 'list':
      _PrintOptions(options, write)
      _PrintPrompt()
      continue
    num_choice = _ParseAnswer(answer, options, allow_freeform)
    if cancel_option and num_choice == maximum:
      raise OperationCancelledError()
    if num_choice is not None and num_choice >= 1 and num_choice <= maximum:
      write('\n')
      return num_choice - 1

    # Arriving here means that there is no choice matching the answer that
    # was given. We now will provide a suggestion, if one exists.
    if allow_freeform and freeform_suggester:
      suggestion = _SuggestFreeformAnswer(freeform_suggester,
                                          answer,
                                          options)
      if suggestion is not None:
        write('[{answer}] not in list. Did you mean [{suggestion}]?'
              .format(answer=answer, suggestion=suggestion))
        write('\n')

    if allow_freeform:
      write('Please enter a value between 1 and {maximum}, or a value present '
            'in the list:  '.format(maximum=maximum))
    else:
      write('Please enter a value between 1 and {maximum}:  '
            .format(maximum=maximum))


def PromptWithValidator(validator, error_message, prompt_string,
                        message=None, allow_invalid=False):
  """Prompts the user for a string that must pass a validator.

  Args:
    validator: function, A validation function that accepts a string and returns
      a boolean value indicating whether or not the user input is valid.
    error_message: str, Error message to display when user input does not pass
      in a valid string.
    prompt_string: str, A string to print when prompting the user to enter a
      choice.  If not given, a default prompt is used.
    message: str, An optional message to print before prompting.
    allow_invalid: bool, Allow returning the answer if validation fails.

  Returns:
    str, The string entered by the user, or the default if no value was
    entered or prompts are disabled.
  """
  if properties.VALUES.core.disable_prompts.GetBool():
    return None

  # Display prompts in detailed JSON format when running tests.
  style = properties.VALUES.core.interactive_ux_style.Get()
  if style == properties.VALUES.core.InteractiveUXStyles.TESTING.name:
    write = lambda x: None
    sys.stderr.write(JsonUXStub(
        UXElementType.PROMPT_WITH_VALIDATOR, error_message=error_message,
        prompt_string=prompt_string, message=message,
        allow_invalid=allow_invalid) + '\n')
  else:
    write = sys.stderr.write

  if message:
    write(_DoWrap(message) + '\n')

  while True:
    write(_DoWrap(prompt_string))
    answer = _GetInput()
    if validator(answer):
      return answer
    else:
      write(_DoWrap(error_message) + '\n')
      if allow_invalid and PromptContinue(default=False):
        return answer


def LazyFormat(s, **kwargs):
  """Expands {key} => value for key, value in kwargs.

  Details:
    * {{<identifier>}} expands to {<identifier>}
    * {<unknown-key>} expands to {<unknown-key>}
    * {key} values are recursively expanded before substitution into the result

  Args:
    s: str, The string to format.
    **kwargs: {str:str}, A dict of strings for named parameters.

  Returns:
    str, The lazily-formatted string.
  """

  def _Replacement(match):
    """Returns one replacement string for LazyFormat re.sub()."""
    prefix = match.group(1)[1:]
    name = match.group(2)
    suffix = match.group(3)[1:]
    if prefix and suffix:
      # {{name}} => {name}
      return prefix + name + suffix
    value = kwargs.get(name)
    if value is None:
      # {unknown} => {unknown}
      return match.group(0)
    if callable(value):
      value = value()
    # The substituted value is expanded too.
    return prefix + LazyFormat(value, **kwargs) + suffix

  return re.sub(r'(\{+)(\w+)(\}+)', _Replacement, s)


def FormatRequiredUserAction(s):
  """Formats an action a user must initiate to complete a command.

  Some actions can't be prompted or initiated by gcloud itself, but they must
  be completed to accomplish the task requested of gcloud; the canonical example
  is that after installation or update, the user must restart their shell for
  all aspects of the update to take effect. Unlike most console output, such
  instructions need to be highlighted in some way. Using this function ensures
  that all such instances are highlighted the *same* way.

  Args:
    s: str, The message to format. It shouldn't begin or end with newlines.

  Returns:
    str, The formatted message. This should be printed starting on its own
      line, and followed by a newline.
  """
  with _NarrowWrap(4) as wrapper:
    separator = '\n==> '
    prefix = separator
    screen_reader = properties.VALUES.accessibility.screen_reader.GetBool()
    if screen_reader:
      separator = '\n '
      prefix = '\n'
    return prefix + separator.join(wrapper.wrap(s)) + '\n'


def ProgressBar(label, stream=log.status, total_ticks=60, first=True,
                last=True, screen_reader=False):
  """A simple progress bar for tracking completion of an action.

  This progress bar works without having to use any control characters.  It
  prints the action that is being done, and then fills a progress bar below it.
  You should not print anything else on the output stream during this time as it
  will cause the progress bar to break on lines.

  Progress bars can be stacked into a group. first=True marks the first bar in
  the group and last=True marks the last bar in the group. The default assumes
  a singleton bar with first=True and last=True.

  This class can also be used in a context manager.

  Args:
    label: str, The action that is being performed.
    stream: The output stream to write to, stderr by default.
    total_ticks: int, The number of ticks wide to make the progress bar.
    first: bool, True if this is the first bar in a stacked group.
    last: bool, True if this is the last bar in a stacked group.
    screen_reader: bool, override for screen reader accessibility property
      toggle.

  Returns:
    The progress bar.
  """
  style = properties.VALUES.core.interactive_ux_style.Get()
  if style == properties.VALUES.core.InteractiveUXStyles.OFF.name:
    return NoOpProgressBar()
  elif style == properties.VALUES.core.InteractiveUXStyles.TESTING.name:
    return _StubProgressBar(label, stream)
  elif screen_reader or properties.VALUES.accessibility.screen_reader.GetBool():
    return _TextPercentageProgressBar(label, stream)
  else:
    return _NormalProgressBar(label, stream, total_ticks, first, last)


def SplitProgressBar(original_callback, weights):
  """Splits a progress bar into logical sections.

  Wraps the original callback so that each of the subsections can use the full
  range of 0 to 1 to indicate its progress.  The overall progress bar will
  display total progress based on the weights of the tasks.

  Args:
    original_callback: f(float), The original callback for the progress bar.
    weights: [float], The weights of the tasks to create.  These can be any
      numbers you want and the split will be based on their proportions to
      each other.

  Raises:
    ValueError: If the weights don't add up to 1.

  Returns:
    (f(float), ), A tuple of callback functions, in order, for the subtasks.
  """
  if (original_callback is None or
      original_callback == DefaultProgressBarCallback):
    return tuple([DefaultProgressBarCallback for _ in range(len(weights))])

  def MakeCallback(already_done, weight):
    def Callback(done_fraction):
      original_callback(already_done + (done_fraction * weight))
    return Callback

  total = sum(weights)
  callbacks = []
  already_done = 0
  for weight in weights:
    normalized_weight = weight / total
    callbacks.append(MakeCallback(already_done, normalized_weight))
    already_done += normalized_weight

  return tuple(callbacks)


def DefaultProgressBarCallback(progress_factor):
  del progress_factor


class _NormalProgressBar(object):
  """A simple progress bar for tracking completion of an action.

  This progress bar works without having to use any control characters.  It
  prints the action that is being done, and then fills a progress bar below it.
  You should not print anything else on the output stream during this time as it
  will cause the progress bar to break on lines.

  Progress bars can be stacked into a group. first=True marks the first bar in
  the group and last=True marks the last bar in the group. The default assumes
  a singleton bar with first=True and last=True.

  This class can also be used in a context manager.
  """

  def __init__(self, label, stream, total_ticks, first, last):
    """Creates a progress bar for the given action.

    Args:
      label: str, The action that is being performed.
      stream: The output stream to write to, stderr by default.
      total_ticks: int, The number of ticks wide to make the progress bar.
      first: bool, True if this is the first bar in a stacked group.
      last: bool, True if this is the last bar in a stacked group.
    """
    self._raw_label = label
    self._stream = stream
    self._ticks_written = 0
    self._total_ticks = total_ticks
    self._first = first
    self._last = last
    attr = console_attr.ConsoleAttr()
    self._box = attr.GetBoxLineCharacters()
    self._redraw = (self._box.d_dr != self._box.d_vr or
                    self._box.d_dl != self._box.d_vl)
    # If not interactive, we can't use carriage returns, so treat every progress
    # bar like it is independent, do not try to merge and redraw them. We only
    # need to do this if it was going to attempt to redraw them in the first
    # place (there is no redraw if the character set does not support different
    # characters for the corners).
    if self._redraw and not IsInteractive(error=True):
      self._first = True
      self._last = True

    max_label_width = self._total_ticks - 4
    if len(label) > max_label_width:
      label = label[:max_label_width - 3] + '...'
    elif len(label) < max_label_width:
      diff = max_label_width - len(label)
      label += ' ' * diff
    left = self._box.d_vr + self._box.d_h
    right = self._box.d_h + self._box.d_vl
    self._label = '{left} {label} {right}'.format(
        left=left, label=label, right=right)

  def Start(self):
    """Starts the progress bar by writing the top rule and label."""
    if self._first or self._redraw:
      left = self._box.d_dr if self._first else self._box.d_vr
      right = self._box.d_dl if self._first else self._box.d_vl
      rule = '{left}{middle}{right}\n'.format(
          left=left, middle=self._box.d_h * self._total_ticks, right=right)
      self._Write(rule)
    self._Write(self._label + '\n')
    self._Write(self._box.d_ur)
    self._ticks_written = 0

  def SetProgress(self, progress_factor):
    """Sets the current progress of the task.

    This method has no effect if the progress bar has already progressed past
    the progress you call it with (since the progress bar cannot back up).

    Args:
      progress_factor: float, The current progress as a float between 0 and 1.
    """
    expected_ticks = int(self._total_ticks * progress_factor)
    new_ticks = expected_ticks - self._ticks_written
    # Don't allow us to go over 100%.
    new_ticks = min(new_ticks, self._total_ticks - self._ticks_written)

    if new_ticks > 0:
      self._Write(self._box.d_h * new_ticks)
      self._ticks_written += new_ticks
      if expected_ticks == self._total_ticks:
        end = '\n' if self._last or not self._redraw else '\r'
        self._Write(self._box.d_ul + end)
      self._stream.flush()

  def Finish(self):
    """Mark the progress as done."""
    self.SetProgress(1)

  def _Write(self, msg):
    self._stream.write(msg)

  def __enter__(self):
    self.Start()
    return self

  def __exit__(self, *args):
    self.Finish()


class _TextPercentageProgressBar(object):
  """A progress bar that outputs nothing at all."""

  def __init__(self, label, stream, percentage_display_increments=5.0):
    """Creates a progress bar for the given action.

    Args:
      label: str, The action that is being performed.
      stream: The output stream to write to, stderr by default.
      percentage_display_increments: Minimum change in percetnage to display new
        progress
    """
    self._label = label
    self._stream = stream
    self._last_percentage = 0
    self._percentage_display_increments = percentage_display_increments / 100.0

  def Start(self):
    self._Write(self._label)

  def SetProgress(self, progress_factor):
    progress_factor = min(progress_factor, 1.0)
    should_update_progress = (
        progress_factor >
        self._last_percentage + self._percentage_display_increments)
    if (should_update_progress or progress_factor == 1.0):
      self._Write('{0:.0f}%'.format(progress_factor * 100.0))
      self._last_percentage = progress_factor

  def Finish(self):
    """Mark the progress as done."""
    self.SetProgress(1)

  def _Write(self, msg):
    self._stream.write(msg + '\n')

  def __enter__(self):
    self.Start()
    return self

  def __exit__(self, *args):
    self.Finish()


class NoOpProgressBar(object):
  """A progress bar that outputs nothing at all."""

  def __init__(self):
    pass

  def Start(self):
    pass

  def SetProgress(self, progress_factor):
    pass

  def Finish(self):
    """Mark the progress as done."""
    self.SetProgress(1)

  def __enter__(self):
    self.Start()
    return self

  def __exit__(self, *args):
    self.Finish()


class _StubProgressBar(object):
  """A progress bar that only prints deterministic start and end points.

  No UX about progress should be exposed here. This is strictly for being able
  to tell that the progress bar was invoked, not what it actually looks like.
  """

  def __init__(self, label, stream):
    self._raw_label = label
    self._stream = stream

  def Start(self):
    self._stream.write(
        JsonUXStub(UXElementType.PROGRESS_BAR, message=self._raw_label))

  def SetProgress(self, progress_factor):
    pass

  def Finish(self):
    """Mark the progress as done."""
    self.SetProgress(1)
    self._stream.write('\n')

  def __enter__(self):
    self.Start()
    return self

  def __exit__(self, *args):
    self.Finish()


def More(contents, out=None, prompt=None, check_pager=True):
  """Run a user specified pager or fall back to the internal pager.

  Args:
    contents: The entire contents of the text lines to page.
    out: The output stream, log.out (effectively) if None.
    prompt: The page break prompt.
    check_pager: Checks the PAGER env var and uses it if True.
  """
  if not IsInteractive(output=True):
    if not out:
      out = log.out
    out.write(contents)
    return
  if not out:
    # Rendered help to the log file.
    log.file_only_logger.info(contents)
    # Paging shenanigans to stdout.
    out = sys.stdout
  if check_pager:
    pager = encoding.GetEncodedValue(os.environ, 'PAGER', None)
    if pager == '-':
      # Use the fallback Pager.
      pager = None
    elif not pager:
      # Search for a pager that handles ANSI escapes.
      for command in ('less', 'pager'):
        if files.FindExecutableOnPath(command):
          pager = command
          break
    if pager:
      # If the pager is less(1) then instruct it to display raw ANSI escape
      # sequences to enable colors and font embellishments.
      less_orig = encoding.GetEncodedValue(os.environ, 'LESS', None)
      less = '-R' + (less_orig or '')
      encoding.SetEncodedValue(os.environ, 'LESS', less)
      p = subprocess.Popen(pager, stdin=subprocess.PIPE, shell=True)
      enc = console_attr.GetConsoleAttr().GetEncoding()
      p.communicate(input=contents.encode(enc))
      p.wait()
      if less_orig is None:
        encoding.SetEncodedValue(os.environ, 'LESS', None)
      return
  # Fall back to the internal pager.
  console_pager.Pager(contents, out, prompt).Run()


class TickableProgressBar(object):
  """A thread safe progress bar with a discrete number of tasks."""

  def __init__(self, total, *args, **kwargs):
    self.completed = 0
    self.total = total
    self._progress_bar = ProgressBar(*args, **kwargs)
    self._lock = threading.Lock()

  def __enter__(self):
    self._progress_bar.__enter__()
    return self

  def __exit__(self, exc_type, exc_value, traceback):
    self._progress_bar.__exit__(exc_type, exc_value, traceback)

  def Tick(self):
    with self._lock:
      self.completed += 1
      self._progress_bar.SetProgress(self.completed / self.total)


def JsonUXStub(ux_type, **kwargs):
  """Generates a stub message for UX console output."""
  output = collections.OrderedDict()
  output['ux'] = ux_type.name
  extra_args = list(set(kwargs) - set(ux_type.GetDataFields()))
  if extra_args:
    raise ValueError('Extraneous args for Ux Element {}: {}'.format(
        ux_type.name, extra_args))
  for field in ux_type.GetDataFields():
    val = kwargs.get(field, None)
    if val:
      output[field] = val
  return json.dumps(output)


class UXElementType(enum.Enum):
  """Describes the type of a ux element."""
  PROGRESS_BAR = (0, 'message')
  PROGRESS_TRACKER = (1, 'message', 'aborted_message', 'status')
  STAGED_PROGRESS_TRACKER = (2, 'message', 'status',
                             'succeeded_stages', 'failed_stage')
  PROMPT_CONTINUE = (3, 'message', 'prompt_string', 'cancel_string')
  PROMPT_RESPONSE = (4, 'message')
  PROMPT_CHOICE = (5, 'message', 'prompt_string', 'choices')
  PROMPT_WITH_VALIDATOR = (6, 'error_message', 'prompt_string', 'message',
                           'allow_invalid')

  def __init__(self, ordinal, *data_fields):
    # We need to pass in something unique here because if two event types
    # happen to have the same attributes, the Enum class interprets them to be
    # the same and sets one value as an alias of the other.
    del ordinal
    self._data_fields = data_fields

  def GetDataFields(self):
    """Returns the ordered list of additional fields in the UX Element."""
    return self._data_fields


def PromptPassword(prompt,
                   error_message='Invalid Password',
                   validation_callable=None,
                   encoder_callable=None):
  """Prompt user for password with optional validation."""
  if properties.VALUES.core.disable_prompts.GetBool():
    return None

  str_prompt = six.ensure_str(prompt)
  pass_wd = getpass.getpass(str_prompt)
  encoder = encoder_callable if callable(encoder_callable) else six.ensure_str
  if callable(validation_callable):
    while not validation_callable(pass_wd):
      sys.stderr.write(_DoWrap(error_message) + '\n')
      pass_wd = getpass.getpass(str_prompt)

  return encoder(pass_wd)