HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/396/lib/googlecloudsdk/core/console/progress_tracker.py
# -*- coding: utf-8 -*- #
# Copyright 2013 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Progress Tracker for Cloud SDK."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import abc
import collections
import os
import signal
import sys
import threading
import time

import enum
from googlecloudsdk.core import log
from googlecloudsdk.core import properties
from googlecloudsdk.core.console import console_attr
from googlecloudsdk.core.console import console_io
from googlecloudsdk.core.console import multiline
from googlecloudsdk.core.console.style import parser

import six

collections_abc = collections
if sys.version_info > (3, 8):
  collections_abc = collections.abc


def ProgressTracker(
    message=None,
    autotick=True,
    detail_message_callback=None,
    done_message_callback=None,
    tick_delay=0.2,
    interruptable=True,
    screen_reader=False,
    aborted_message=console_io.OperationCancelledError.DEFAULT_MESSAGE,
    no_spacing=False):
  """A context manager for telling the user about long-running progress.

  Args:
    message: str, The message to show next to the spinner.
    autotick: bool, True to have the spinner tick on its own. Otherwise, you
      need to call Tick() explicitly to move the spinner.
    detail_message_callback: func, A no argument function that will be called
      and the result appended to message each time it needs to be printed.
    done_message_callback: func, A no argument function whose result will be
      appended to message if the progress tracker successfully exits.
    tick_delay: float, The amount of time to wait between ticks, in second.
    interruptable: boolean, True if the user can ctrl-c the operation. If so,
      it will stop and will report as aborted. If False, a message will be
      displayed saying that it cannot be cancelled.
    screen_reader: boolean, override for screen reader accessibility property
      toggle.
    aborted_message: str, A custom message to put in the exception when it is
      cancelled by the user.
    no_spacing: boolean, Removes ellipses and other spacing between text.

  Returns:
    The progress tracker.
  """
  style = properties.VALUES.core.interactive_ux_style.Get()
  if style == properties.VALUES.core.InteractiveUXStyles.OFF.name:
    return NoOpProgressTracker(interruptable, aborted_message)
  elif style == properties.VALUES.core.InteractiveUXStyles.TESTING.name:
    return _StubProgressTracker(message, interruptable, aborted_message)
  else:
    is_tty = console_io.IsInteractive(error=True)
    tracker_cls = (_NormalProgressTracker if is_tty
                   else _NonInteractiveProgressTracker)
    screen_reader = (screen_reader or
                     properties.VALUES.accessibility.screen_reader.GetBool())
    spinner_override_message = None
    if screen_reader:
      tick_delay = 1
      spinner_override_message = 'working'

    return tracker_cls(message, autotick, detail_message_callback,
                       done_message_callback, tick_delay, interruptable,
                       aborted_message, spinner_override_message, no_spacing)


class _BaseProgressTracker(six.with_metaclass(abc.ABCMeta, object)):
  """A context manager for telling the user about long-running progress."""

  def __init__(self, message, autotick, detail_message_callback,
               done_message_callback, tick_delay, interruptable,
               aborted_message, spinner_override_message, no_spacing):
    self._stream = sys.stderr
    if message is None:
      self._spinner_only = True
      self._message = ''
      self._prefix = ''
    else:
      self._spinner_only = False
      self._message = message
      self._prefix = message + ('' if no_spacing else '...')
    self._detail_message_callback = detail_message_callback
    self.spinner_override_message = spinner_override_message
    self._done_message_callback = done_message_callback
    self._ticks = 0
    self._done = False
    self._lock = threading.Lock()
    self._tick_delay = tick_delay
    self._ticker = None
    console_width = console_attr.ConsoleAttr().GetTermSize()[0]
    if console_width < 0:
      # This can happen if we're on a pseudo-TTY. Set it to 0 and also
      # turn off output to prevent it from stopping responding.
      console_width = 0
    self._output_enabled = log.IsUserOutputEnabled() and console_width != 0
    # Don't bother autoticking if we aren't going to print anything.
    self.__autotick = autotick and self._output_enabled
    self._interruptable = interruptable
    self._aborted_message = aborted_message
    self._old_signal_handler = None
    self._symbols = console_attr.GetConsoleAttr().GetProgressTrackerSymbols()
    self._no_spacing = no_spacing
    self._is_tty = console_io.IsInteractive(error=True)

  @property
  def _autotick(self):
    return self.__autotick

  def _GetPrefix(self):
    if self._is_tty and self._detail_message_callback:
      detail_message = self._detail_message_callback()
      if detail_message:
        if self._no_spacing:
          return self._prefix + detail_message
        return self._prefix + ' ' + detail_message + '...'
    return self._prefix

  def _SetUpSignalHandler(self):
    """Sets up a signal handler for handling SIGINT."""
    def _CtrlCHandler(unused_signal, unused_frame):
      if self._interruptable:
        raise console_io.OperationCancelledError(self._aborted_message)
      else:
        with self._lock:
          sys.stderr.write('\n\nThis operation cannot be cancelled.\n\n')
    try:
      self._old_signal_handler = signal.signal(signal.SIGINT, _CtrlCHandler)
      self._restore_old_handler = True
    except ValueError:
      # Only works in the main thread. Gcloud does not run in the main thread
      # in gcloud interactive.
      self._restore_old_handler = False

  def _TearDownSignalHandler(self):
    if self._restore_old_handler:
      try:
        signal.signal(signal.SIGINT, self._old_signal_handler)
      except ValueError:
        pass  # only works in main thread

  def __enter__(self):
    # Setup signal handlers
    self._SetUpSignalHandler()

    log.file_only_logger.info(self._GetPrefix())
    self._Print()
    if self._autotick:
      def Ticker():
        while True:
          _SleepSecs(self._tick_delay)
          if self.Tick():
            return
      self._ticker = threading.Thread(target=Ticker)
      self._ticker.start()
    return self

  def __exit__(self, unused_ex_type, exc_value, unused_traceback):
    with self._lock:
      self._done = True
      # If an exception was raised during progress tracking, exit silently here
      # and let the appropriate exception handler tell the user what happened.
      if exc_value:
        # This is to prevent the tick character from appearing before 'failed.'
        # (ex. 'message...failed' instead of 'message.../failed.')
        if isinstance(exc_value, console_io.OperationCancelledError):
          self._Print('aborted by ctrl-c.\n')
        else:
          self._Print('failed.\n')
      elif not self._spinner_only:
        if self._done_message_callback:
          self._Print(self._done_message_callback())
        else:
          self._Print('done.\n')
    if self._ticker:
      self._ticker.join()
    self._TearDownSignalHandler()

  @abc.abstractmethod
  def Tick(self):
    """Give a visual indication to the user that some progress has been made.

    Output is sent to sys.stderr. Nothing is shown if output is not a TTY.

    Returns:
      Whether progress has completed.
    """
    pass

  @abc.abstractmethod
  def _Print(self, message=''):
    """Prints an update containing message to the output stream."""
    pass


class _NormalProgressTracker(_BaseProgressTracker):
  """A context manager for telling the user about long-running progress."""

  def __enter__(self):
    self._SetupOutput()
    return super(_NormalProgressTracker, self).__enter__()

  def _SetupOutput(self):
    def _FormattedCallback():
      if self._detail_message_callback:
        detail_message = self._detail_message_callback()
        if detail_message:
          if self._no_spacing:
            return detail_message
          return ' ' + detail_message + '...'
      return None

    self._console_output = multiline.SimpleSuffixConsoleOutput(self._stream)
    self._console_message = self._console_output.AddMessage(
        self._prefix, detail_message_callback=_FormattedCallback)

  def Tick(self):
    """Give a visual indication to the user that some progress has been made.

    Output is sent to sys.stderr. Nothing is shown if output is not a TTY.

    Returns:
      Whether progress has completed.
    """
    with self._lock:
      if not self._done:
        self._ticks += 1
        self._Print(self._GetSuffix())
    self._stream.flush()
    return self._done

  def _GetSuffix(self):
    if self.spinner_override_message:
      num_dots = self._ticks % 4  # 3 dots max.
      return self.spinner_override_message + '.' * num_dots
    else:
      return self._symbols.spin_marks[
          self._ticks % len(self._symbols.spin_marks)]

  def _Print(self, message=''):
    """Reprints the prefix followed by an optional message.

    If there is a multiline message, we print the full message and every
    time the Prefix Message is the same, we only reprint the last line to
    account for a different 'message'. If there is a new message, we print
    on a new line.

    Args:
      message: str, suffix of message
    """
    if self._spinner_only or not self._output_enabled:
      return

    self._console_output.UpdateMessage(self._console_message, message)
    self._console_output.UpdateConsole()


class _NonInteractiveProgressTracker(_BaseProgressTracker):
  """A context manager for telling the user about long-running progress."""

  def Tick(self):
    """Give a visual indication to the user that some progress has been made.

    Output is sent to sys.stderr. Nothing is shown if output is not a TTY.

    Returns:
      Whether progress has completed.
    """
    with self._lock:
      if not self._done:
        self._Print('.')
    return self._done

  def _Print(self, message=''):
    """Reprints the prefix followed by an optional message.

    If there is a multiline message, we print the full message and every
    time the Prefix Message is the same, we only reprint the last line to
    account for a different 'message'. If there is a new message, we print
    on a new line.

    Args:
      message: str, suffix of message
    """
    if self._spinner_only or not self._output_enabled:
      return

    # Since we are not in a tty, print will be called twice outside of normal
    # ticking. The first time during __enter__, where the tracker message should
    # be outputted. The second time is during __exit__, where a status updated
    # contained in message will be outputted.
    display_message = self._GetPrefix()
    self._stream.write(message or display_message + '\n')
    return


class NoOpProgressTracker(object):
  """A Progress tracker that doesn't do anything."""

  def __init__(self, interruptable, aborted_message):
    self._interruptable = interruptable
    self._aborted_message = aborted_message
    self._done = False

  def __enter__(self):
    def _CtrlCHandler(unused_signal, unused_frame):
      if self._interruptable:
        raise console_io.OperationCancelledError(self._aborted_message)
    self._old_signal_handler = signal.signal(signal.SIGINT, _CtrlCHandler)
    return self

  def Tick(self):
    return self._done

  def __exit__(self, exc_type, exc_val, exc_tb):
    self._done = True
    signal.signal(signal.SIGINT, self._old_signal_handler)


class _StubProgressTracker(NoOpProgressTracker):
  """A Progress tracker that only prints deterministic start and end points.

  No UX about tracking should be exposed here. This is strictly for being able
  to tell that the tracker ran, not what it actually looks like.
  """

  def __init__(self, message, interruptable, aborted_message):
    super(_StubProgressTracker, self).__init__(interruptable, aborted_message)
    self._message = message or ''
    self._stream = sys.stderr

  def __exit__(self, exc_type, exc_val, exc_tb):
    if not exc_val:
      status = 'SUCCESS'
    elif isinstance(exc_val, console_io.OperationCancelledError):
      status = 'INTERRUPTED'
    else:
      status = 'FAILURE'

    if log.IsUserOutputEnabled():
      self._stream.write(console_io.JsonUXStub(
          console_io.UXElementType.PROGRESS_TRACKER,
          message=self._message, status=status) + '\n')
    return super(_StubProgressTracker, self).__exit__(exc_type, exc_val, exc_tb)


def _SleepSecs(seconds):
  """Sleep int or float seconds. For mocking sleeps in this module."""
  time.sleep(seconds)


def CompletionProgressTracker(ofile=None, timeout=4.0, tick_delay=0.1,
                              background_ttl=60.0, autotick=True):
  """A context manager for visual feedback during long-running completions.

  A completion that exceeds the timeout is assumed to be refreshing the cache.
  At that point the progress tracker displays '?', forks the cache operation
  into the background, and exits.  This gives the background cache update a
  chance finish.  After background_ttl more seconds the update is forcibly
  exited (forced to call exit rather than killed by signal) to prevent not
  responding updates from proliferating in the background.

  Args:
    ofile: The stream to write to.
    timeout: float, The amount of time in second to show the tracker before
      backgrounding it.
    tick_delay: float, The time in second between ticks of the spinner.
    background_ttl: float, The number of seconds to allow the completion to
      run in the background before killing it.
    autotick: bool, True to tick the spinner automatically.

  Returns:
    The completion progress tracker.
  """

  style = properties.VALUES.core.interactive_ux_style.Get()
  if (style == properties.VALUES.core.InteractiveUXStyles.OFF.name or
      style == properties.VALUES.core.InteractiveUXStyles.TESTING.name):
    return _NoOpCompletionProgressTracker()
  else:
    return _NormalCompletionProgressTracker(
        ofile, timeout, tick_delay, background_ttl, autotick)


class _NormalCompletionProgressTracker(object):
  """A context manager for visual feedback during long-running completions.

  A completion that exceeds the timeout is assumed to be refreshing the cache.
  At that point the progress tracker displays '?', forks the cache operation
  into the background, and exits.  This gives the background cache update a
  chance finish.  After background_ttl more seconds the update is forcibly
  exited (forced to call exit rather than killed by signal) to prevent not
  responding updates from proliferating in the background.
  """

  _COMPLETION_FD = 9

  def __init__(self, ofile, timeout, tick_delay, background_ttl, autotick):
    self._ofile = ofile or self._GetStream()
    self._timeout = timeout
    self._tick_delay = tick_delay
    self.__autotick = autotick
    self._background_ttl = background_ttl
    self._ticks = 0
    self._symbols = console_attr.GetConsoleAttr().GetProgressTrackerSymbols()

  def __enter__(self):
    if self._autotick:
      self._old_handler = signal.signal(signal.SIGALRM, self._Spin)
      self._old_itimer = signal.setitimer(
          signal.ITIMER_REAL, self._tick_delay, self._tick_delay)
    return self

  def __exit__(self, unused_type=None, unused_value=True,
               unused_traceback=None):
    if self._autotick:
      signal.setitimer(signal.ITIMER_REAL, *self._old_itimer)
      signal.signal(signal.SIGALRM, self._old_handler)
    if not self._TimedOut():
      self._WriteMark(' ')

  def _TimedOut(self):
    """True if the tracker has timed out."""
    return self._timeout < 0

  def _Spin(self, unused_sig=None, unused_frame=None):
    """Rotates the spinner one tick and checks for timeout."""
    self._ticks += 1
    self._WriteMark(self._symbols.spin_marks[
        self._ticks % len(self._symbols.spin_marks)])
    self._timeout -= self._tick_delay
    if not self._TimedOut():
      return
    # Timed out.
    self._WriteMark('?')
    # Exit the parent process.
    if os.fork():
      os._exit(1)  # pylint: disable=protected-access
    # Allow the child to run in the background for up to self._background_ttl
    # more seconds before being forcefully exited.
    signal.signal(signal.SIGALRM, self._ExitBackground)
    signal.setitimer(
        signal.ITIMER_REAL, self._background_ttl, self._background_ttl)
    # Suppress the explicit completion status channel.  stdout and stderr have
    # already been suppressed.
    self._ofile = None

  def _WriteMark(self, mark):
    """Writes one mark to self._ofile."""
    if self._ofile:
      self._ofile.write(mark + '\b')
      self._ofile.flush()

  @staticmethod
  def _ExitBackground():
    """Unconditionally exits the background completer process after timeout."""
    os._exit(1)  # pylint: disable=protected-access

  @property
  def _autotick(self):
    return self.__autotick

  @staticmethod
  def _GetStream():
    """Returns the completer output stream."""
    return os.fdopen(
        os.dup(_NormalCompletionProgressTracker._COMPLETION_FD), 'w')


class _NoOpCompletionProgressTracker(object):
  """A Progress tracker that prints nothing."""

  def __init__(self):
    pass

  def __enter__(self):
    return self

  def __exit__(self, exc_type, exc_val, exc_tb):
    pass


def StagedProgressTracker(
    message, stages, tracker_id=None, autotick=True, tick_delay=0.1,
    interruptable=True, done_message_callback=None, success_message=None,
    warning_message=None, failure_message=None,
    aborted_message=console_io.OperationCancelledError.DEFAULT_MESSAGE,
    suppress_output=False):
  """A progress tracker for performing actions with multiple stages.

  The progress tracker is a context manager. To start displaying information
  about a running stage, call StartStage within the staged progress tracker
  context. To update the message of a stage, use UpdateStage. When a stage is
  completed/failed there are CompleteStage and FailStage methods on the
  tracker as well.

  Note that stages do not need to be started/completed in order. In
  non-multiline (the only supported mode) output mode, the displayed stage will
  be the earliest started stage that has not been completed.

  Example Usage:
    stages = [
      Stage('Getting bread...', key='bread'),
      Stage('Getting peanut butter...', key='pb'),
      Stage('Making sandwich...', key='make')]
    with StagedProgressTracker(
        'Making sandwich...',
        stages,
        success_message='Time to eat!',
        failure_message='Time to order delivery..!',
        tracker_id='meta.make_sandwich') as tracker:
      tracker.StartStage('bread')
      # Go to pantry
      tracker.UpdateStage('bread', 'Looking for bread in the pantry')
      # Get bread
      tracker.CompleteStage('bread', 'Got some whole wheat bread!')

      tracker.StartStage('pb')
      # Look for peanut butter
      if pb_not_found:
        error = exceptions.NoPeanutButterError('So sad!')
        tracker.FailStage('pb', error)
      elif pb_not_organic:
        tracker.CompleteStageWithWarning('pb', 'The pb is not organic!')
      else:
        tracker.CompleteStage('bread', 'Got some organic pb!')

  Args:
    message: str, The message to show next to the spinner.
    stages: list[Stage], A list of stages for the progress tracker to run. Once
      you pass the stages to a StagedProgressTracker, they're owned by the
      tracker and you should not mutate them.
    tracker_id: str The ID of this tracker that will be used for metrics.
    autotick: bool, True to have the spinner tick on its own. Otherwise, you
      need to call Tick() explicitly to move the spinner.
    tick_delay: float, The amount of time to wait between ticks, in second.
    interruptable: boolean, True if the user can ctrl-c the operation. If so,
      it will stop and will report as aborted. If False,
    done_message_callback: func, A callback to get a more detailed done message.
    success_message: str, A message to display on success of all tasks.
    warning_message: str, A message to display when no task fails but one or
      more tasks complete with a warning and none fail.
    failure_message: str, A message to display on failure of a task.
    aborted_message: str, A custom message to put in the exception when it is
      cancelled by the user.
    suppress_output: bool, True to suppress output from the tracker.

  Returns:
    The progress tracker.
  """
  style = properties.VALUES.core.interactive_ux_style.Get()
  if (suppress_output
      or style == properties.VALUES.core.InteractiveUXStyles.OFF.name):
    return NoOpStagedProgressTracker(stages, interruptable, aborted_message)
  elif style == properties.VALUES.core.InteractiveUXStyles.TESTING.name:
    return _StubStagedProgressTracker(
        message, stages, interruptable, aborted_message)
  else:
    is_tty = console_io.IsInteractive(error=True)
    if is_tty:
      if console_attr.ConsoleAttr().SupportsAnsi():
        tracker_cls = _MultilineStagedProgressTracker
      else:
        tracker_cls = _NormalStagedProgressTracker
    else:
      tracker_cls = _NonInteractiveStagedProgressTracker
    return tracker_cls(
        message, stages, success_message, warning_message, failure_message,
        autotick, tick_delay, interruptable, aborted_message, tracker_id,
        done_message_callback)


class Stage(object):
  """Defines a stage of a staged progress tracker."""

  def __init__(self, header, key=None, task_id=None):
    """Encapsulates a stage in a staged progress tracker.

    A task should contain a message about what it does.

    Args:
      header: (str) The header that describes what the task is doing.
        A high level description like 'Uploading files' would be appropriate.
      key: (str) A key which can be used to access/refer to this stage. Must be
        unique within a StagedProgressTracker. If not provided, the header will
        be used as the key.
      task_id: (str) The ID of this task that will be used for metrics.
      timing metrics. NOTE: Metrics are currently not implemented yet.
    """
    self._header = header
    self._key = key if key is not None else self._header
    self.message = ''
    self.task_id = task_id
    # TODO(b/109928970): Add support for progress bars.
    # TODO(b/109928025): Add support for timing metrics by task id.

    # Task attributes
    self._is_done = False
    self.status = StageCompletionStatus.NOT_STARTED

  @property
  def key(self):
    return self._key

  @property
  def header(self):
    return self._header

  @property
  def is_done(self):
    return self._is_done


class StageCompletionStatus(enum.Enum):
  """Indicates the completion status of a stage."""
  NOT_STARTED = 'not started'
  RUNNING = 'still running'
  SUCCESS = 'done'
  FAILED = 'failed'
  INTERRUPTED = 'interrupted'
  WARNING = 'warning'


class _BaseStagedProgressTracker(collections_abc.Mapping):
  """Base class for staged progress trackers.

  During each tick, the tracker checks if there is a stage being displayed by
  checking if _stage_being_displayed is not None. If it is not none and stage
  has not completed, then the tracker will print an update. If the stage is
  done, then the tracker will write out the status of all completed stages
  in _running_stages_queue.
  """

  def __init__(self, message, stages, success_message, warning_message,
               failure_message, autotick, tick_delay, interruptable,
               aborted_message, tracker_id, done_message_callback,
               console=None):
    self._stages = collections.OrderedDict()
    for stage in stages:
      if stage.key in self._stages:
        raise ValueError('Duplicate stage key: {}'.format(stage.key))
      self._stages[stage.key] = stage
    self._stream = sys.stderr
    self._message = message
    self._success_message = success_message
    self._warning_message = warning_message
    self._failure_message = failure_message
    self._aborted_message = aborted_message
    self._done_message_callback = done_message_callback
    self._tracker_id = tracker_id
    if console is None:
      console = console_attr.GetConsoleAttr()
    console_width = console.GetTermSize()[0]
    if not isinstance(console_width, int) or console_width < 0:
      # This can happen if we're on a pseudo-TTY. Set it to 0 and also
      # turn off output to prevent it from stopping responding.
      console_width = 0
    self._output_enabled = log.IsUserOutputEnabled() and console_width != 0
    # Don't bother autoticking if we aren't going to print anything.
    self.__autotick = autotick and self._output_enabled
    self._interruptable = interruptable
    self._tick_delay = tick_delay

    self._symbols = console.GetProgressTrackerSymbols()
    self._done = False
    self._exception_is_uncaught = True
    self._ticks = 0
    self._ticker = None
    self._running_stages = set()
    self._completed_stages = []
    self._completed_with_warnings_stages = []
    self._exit_output_warnings = []
    self._lock = threading.Lock()

  def __getitem__(self, key):
    return self._stages[key]

  def __iter__(self):
    return iter(self._stages)

  def __len__(self):
    return len(self._stages)

  @property
  def _autotick(self):
    return self.__autotick

  def IsComplete(self, stage):
    """Returns True if the stage is complete."""
    return not (self.IsRunning(stage) or self.IsWaiting(stage))

  def IsRunning(self, stage):
    """Returns True if the stage is running."""
    stage = self._ValidateStage(stage, allow_complete=True)
    return stage.status == StageCompletionStatus.RUNNING

  def HasWarning(self):
    """Returns True if this tracker has encountered at least one warning."""
    return bool(self._exit_output_warnings)

  def IsWaiting(self, stage):
    """Returns True if the stage is not yet started."""
    stage = self._ValidateStage(stage, allow_complete=True)
    return stage.status == StageCompletionStatus.NOT_STARTED

  def _SetUpSignalHandler(self):
    """Sets up a signal handler for handling SIGINT."""
    def _CtrlCHandler(unused_signal, unused_frame):
      if self._interruptable:
        raise console_io.OperationCancelledError(self._aborted_message)
      else:
        self._NotifyUninterruptableError()
    try:
      self._old_signal_handler = signal.signal(signal.SIGINT, _CtrlCHandler)
      self._restore_old_handler = True
    except ValueError:
      # Only works in the main thread. Gcloud does not run in the main thread
      # in gcloud interactive.
      self._restore_old_handler = False

  def _NotifyUninterruptableError(self):
    with self._lock:
      sys.stderr.write('\n\nThis operation cannot be cancelled.\n\n')

  def _TearDownSignalHandler(self):
    if self._restore_old_handler:
      try:
        signal.signal(signal.SIGINT, self._old_signal_handler)
      except ValueError:
        pass  # only works in main thread

  def __enter__(self):
    self._SetupOutput()
    # Setup signal handlers
    self._SetUpSignalHandler()

    log.file_only_logger.info(self._message)
    self._Print()
    if self._autotick:
      def Ticker():
        while True:
          _SleepSecs(self._tick_delay)
          if self.Tick():
            return
      self._ticker = threading.Thread(target=Ticker)
      self._ticker.daemon = True
      self._ticker.start()
    return self

  def __exit__(self, unused_ex_type, exc_value, unused_traceback):
    with self._lock:
      self._done = True
      # If an exception was raised during progress tracking, exit silently here
      # and let the appropriate exception handler tell the user what happened.
      if exc_value:
        if self._exception_is_uncaught:
          self._HandleUncaughtException(exc_value)
      else:
        self._PrintExitOutput(warned=self.HasWarning())
    if self._ticker:
      self._ticker.join()
    self._TearDownSignalHandler()
    for warning_message in self._exit_output_warnings:
      log.status.Print('  %s' %  warning_message)

  def _HandleUncaughtException(self, exc_value):
    # The first print is to signal exiting the stage. The second print
    # handles the output for exiting the progress tracker.
    if isinstance(exc_value, console_io.OperationCancelledError):
      self._PrintExitOutput(aborted=True)
    else:
      # This means this was an uncaught exception. This ideally
      # should be handled by the implementer
      self._PrintExitOutput(failed=True)

  @abc.abstractmethod
  def _SetupOutput(self):
    """Sets up the output for the tracker. Gets called during __enter__."""
    pass

  def UpdateHeaderMessage(self, message):
    """Updates the header messsage if supported."""
    pass

  @abc.abstractmethod
  def Tick(self):
    """Give a visual indication to the user that some progress has been made.

    Output is sent to sys.stderr. Nothing is shown if output is not a TTY.

    Returns:
      Whether progress has completed.
    """
    pass

  def _GetTickMark(self, ticks):
    """Returns the next tick mark."""
    return self._symbols.spin_marks[self._ticks % len(self._symbols.spin_marks)]

  def _GetStagedCompletedSuffix(self, status):
    return status.value

  def _ValidateStage(self, key, allow_complete=False):
    """Validates the stage belongs to the tracker.

    Args:
      key: the key of the stage to validate.
      allow_complete: whether to error on an already-complete stage

    Returns:
      The validated Stage object, even if we were passed a key.
    """
    if key not in self:
      raise ValueError('This stage does not belong to this progress tracker.')
    stage = self.get(key)
    if not allow_complete and stage.status not in {
        StageCompletionStatus.NOT_STARTED, StageCompletionStatus.RUNNING}:
      raise ValueError('This stage has already completed.')
    return stage

  def StartStage(self, key):
    """Informs the progress tracker that this stage has started."""
    stage = self._ValidateStage(key)
    with self._lock:
      self._running_stages.add(key)
      stage.status = StageCompletionStatus.RUNNING
      self._StartStage(stage)
    self.Tick()

  def _StartStage(self, stage):
    """Override to customize behavior on starting a stage."""
    return

  def _FailStage(self, stage, failure_exception, message):
    """Override to customize behavior on failing a stage."""
    pass

  def _PrintExitOutput(self, aborted=False, warned=False, failed=False):
    """Override to customize behavior on printing exit output."""
    pass

  def UpdateStage(self, key, message):
    """Updates a stage in the progress tracker."""
    # TODO(b/109928970): Add support for progress bars.
    stage = self._ValidateStage(key)
    with self._lock:
      stage.message = message
    self.Tick()

  def CompleteStage(self, key, message=None):
    """Informs the progress tracker that this stage has completed."""
    stage = self._ValidateStage(key)
    with self._lock:
      stage.status = StageCompletionStatus.SUCCESS
      stage._is_done = True  # pylint: disable=protected-access
      self._running_stages.discard(key)
      if message is not None:
        stage.message = message
      self._CompleteStage(stage)
    self.Tick()  # This ensures output is properly flushed out.

  def _CompleteStage(self, stage):
    return

  def CompleteStageWithWarning(self, key, warning_message):
    self.CompleteStageWithWarnings(key, [warning_message])

  def CompleteStageWithWarnings(self, key, warning_messages):
    """Informs the progress tracker that this stage completed with warnings.

    Args:
      key: str, key for the stage to fail.
      warning_messages: list of str, user visible warning messages.
    """
    stage = self._ValidateStage(key)
    with self._lock:
      stage.status = StageCompletionStatus.WARNING
      stage._is_done = True  # pylint: disable=protected-access
      self._running_stages.discard(key)
      self._exit_output_warnings.extend(warning_messages)
      self._completed_with_warnings_stages.append(stage.key)
      self._CompleteStageWithWarnings(stage, warning_messages)
    self.Tick()  # This ensures output is properly flushed out.

  def _CompleteStageWithWarnings(self, stage, warning_messages):
    """Override to customize behavior on completing a stage with warnings."""
    pass

  def FailStage(self, key, failure_exception, message=None):
    """Informs the progress tracker that this stage has failed.

    Args:
      key: str, key for the stage to fail.
      failure_exception: Exception, raised by __exit__.
      message: str, user visible message for failure.
    """
    stage = self._ValidateStage(key)
    with self._lock:
      stage.status = StageCompletionStatus.FAILED
      stage._is_done = True  # pylint: disable=protected-access
      self._running_stages.discard(key)
      if message is not None:
        stage.message = message
      self._FailStage(stage, failure_exception, message)
    self.Tick()  # This ensures output is properly flushed out.
    if failure_exception:
      self._PrintExitOutput(failed=True)
      self._exception_is_uncaught = False
      raise failure_exception  # pylint: disable=raising-bad-type

  def AddWarning(self, warning_message):
    """Add a warning message independent of any particular stage.

    This warning message will be printed on __exit__.

    Args:
      warning_message: str, user visible warning message.
    """
    with self._lock:
      self._exit_output_warnings.append(warning_message)


class _NormalStagedProgressTracker(_BaseStagedProgressTracker):
  """A context manager for telling the user about long-running progress.

  This class uses the core.console.multiline.ConsoleOutput interface for
  outputting. The header and each stage is defined as a message object
  contained by the ConsoleOutput message.
  """

  def __init__(self, *args, **kwargs):
    self._running_stages_queue = []
    self._stage_being_displayed = None
    super(_NormalStagedProgressTracker, self).__init__(*args, **kwargs)

  def _SetupOutput(self):
    # Console outputting objects
    self._console_output = multiline.SimpleSuffixConsoleOutput(self._stream)
    self._header_message = self._console_output.AddMessage(self._message)
    self._current_stage_message = self._header_message

  def _FailStage(self, stage, failure_exception, message=None):
    for running_stage in self._running_stages_queue:
      if stage != running_stage:
        running_stage.status = StageCompletionStatus.INTERRUPTED
      running_stage._is_done = True  # pylint: disable=protected-access

  def _StartStage(self, stage):
    """Informs the progress tracker that this stage has started."""
    self._running_stages_queue.append(stage)
    if self._stage_being_displayed is None:
      self._LoadNextStageForDisplay()

  def _LoadNextStageForDisplay(self):
    if self._running_stages_queue:
      self._stage_being_displayed = self._running_stages_queue[0]
      self._SetUpOutputForStage(self._stage_being_displayed)
      return True

  def Tick(self):
    """Give a visual indication to the user that some progress has been made.

    Output is sent to sys.stderr. Nothing is shown if output is not a TTY.
    This method also handles loading new stages and flushing out completed
    stages.

    Returns:
      Whether progress has completed.
    """
    with self._lock:
      if not self._done:
        self._ticks += 1

        # Flush output for any stages that may already be finished
        if self._stage_being_displayed is None:
          self._LoadNextStageForDisplay()
        else:
          while (self._running_stages_queue and
                 self._running_stages_queue[0].is_done):
            completed_stage = self._running_stages_queue.pop(0)
            self._completed_stages.append(completed_stage.key)
            completion_status = self._GetStagedCompletedSuffix(
                self._stage_being_displayed.status)
            self._Print(completion_status)
            if not self._LoadNextStageForDisplay():
              self._stage_being_displayed = None

        if self._stage_being_displayed:
          self._Print(self._GetTickMark(self._ticks))
    return self._done

  def _PrintExitOutput(self, aborted=False, warned=False, failed=False):
    """Handles the final output for the progress tracker."""
    self._SetupExitOutput()
    if aborted:
      msg = self._aborted_message or 'Aborted.'
    elif failed:
      msg = self._failure_message or 'Failed.'
    elif warned:
      msg = self._warning_message or 'Completed with warnings:'
    else:
      msg = self._success_message or 'Done.'
    if self._done_message_callback:
      msg += ' ' + self._done_message_callback()
    self._Print(msg + '\n')

  def _SetupExitOutput(self):
    """Sets up output to print out the closing line."""
    self._current_stage_message = self._console_output.AddMessage('')

  def _HandleUncaughtException(self, exc_value):
    # The first print is to signal exiting the stage. The second print
    # handles the output for exiting the progress tracker.
    if isinstance(exc_value, console_io.OperationCancelledError):
      self._Print('aborted by ctrl-c')
      self._PrintExitOutput(aborted=True)
    else:
      # This means this was an uncaught exception. This ideally
      # should be handled by the implementer
      self._Print(
          self._GetStagedCompletedSuffix(StageCompletionStatus.FAILED))
      self._PrintExitOutput(failed=True)

  def _SetUpOutputForStage(self, stage):
    def _FormattedCallback():
      if stage.message:
        return ' ' + stage.message + '...'
      return None
    self._current_stage_message = self._console_output.AddMessage(
        stage.header,
        indentation_level=1,
        detail_message_callback=_FormattedCallback)

  def _Print(self, message=''):
    """Prints an update containing message to the output stream.

    Args:
      message: str, suffix of message
    """
    if not self._output_enabled:
      return
    if self._current_stage_message:
      self._console_output.UpdateMessage(self._current_stage_message, message)
      self._console_output.UpdateConsole()


class _NonInteractiveStagedProgressTracker(_NormalStagedProgressTracker):
  """A context manager for telling the user about long-running progress."""

  def _SetupExitOutput(self):
    """Sets up output to print out the closing line."""
    # Not necessary for non-interactive implementation
    return

  def _SetupOutput(self):
    self._Print(self._message + '\n')

  def _GetTickMark(self, ticks):
    """Returns the next tick mark."""
    return '.'

  def _GetStagedCompletedSuffix(self, status):
    return status.value + '\n'

  def _SetUpOutputForStage(self, stage):
    message = stage.header
    if stage.message:
      message += ' ' + stage.message + '...'
    self._Print(message)

  def _Print(self, message=''):
    """Prints an update containing message to the output stream.

    Args:
      message: str, suffix of message
    """
    if not self._output_enabled:
      return
    self._stream.write(message)


class _MultilineStagedProgressTracker(_BaseStagedProgressTracker):
  """A context manager for telling the user about long-running progress.

  This class uses the core.console.multiline.ConsoleOutput interface for
  outputting. The header and each stage is defined as a message object
  contained by the ConsoleOutput message.
  """

  def __init__(self, *args, **kwargs):
    self._parser = parser.GetTypedTextParser()
    super(_MultilineStagedProgressTracker, self).__init__(*args, **kwargs)

  def UpdateHeaderMessage(self, message):
    # Next tick will handle actually updating the message. Using tick here to
    # update the message will cause a deadlock when _NotifyUninterruptableError
    # is called.
    self._header_stage.message = message

  def _UpdateHeaderMessage(self, prefix):
    message = prefix + self._message
    if self._header_stage.message:
      message += ' ' + self._header_stage.message
    self._UpdateMessage(self._header_message, message)

  def _UpdateStageTickMark(self, stage, tick_mark=''):
    prefix = self._GenerateStagePrefix(stage.status, tick_mark)
    message = stage.header
    if stage.message:
      message += ' ' + stage.message
    self._UpdateMessage(self._stage_messages[stage], prefix + message)

  def _UpdateMessage(self, stage, message):
    message = self._parser.ParseTypedTextToString(message)
    self._console_output.UpdateMessage(stage, message)

  def _AddMessage(self, message, indentation_level=0):
    message = self._parser.ParseTypedTextToString(message)
    return self._console_output.AddMessage(message,
                                           indentation_level=indentation_level)

  def _NotifyUninterruptableError(self):
    with self._lock:
      self.UpdateHeaderMessage('This operation cannot be cancelled.')
    self.Tick()

  def _SetupExitOutput(self):
    """Sets up output to print out the closing line."""
    return self._console_output.AddMessage('')

  def _PrintExitOutput(self, aborted=False, warned=False, failed=False):
    """Handles the final output for the progress tracker."""
    output_message = self._SetupExitOutput()
    if aborted:
      msg = self._aborted_message or 'Aborted.'
      # Aborted is the same as overall failed progress.
      self._header_stage.status = StageCompletionStatus.FAILED
    elif failed:
      msg = self._failure_message or 'Failed.'
      self._header_stage.status = StageCompletionStatus.FAILED
    elif warned:
      msg = self._warning_message or 'Completed with warnings:'
      self._header_stage.status = StageCompletionStatus.FAILED
    else:
      msg = self._success_message or 'Done.'
      self._header_stage.status = StageCompletionStatus.SUCCESS
    if self._done_message_callback:
      msg += ' ' + self._done_message_callback()
    self._UpdateMessage(output_message, msg)
    # If for some reason some stage did not complete, mark it as interrupted.
    self._Print(self._symbols.interrupted)

  def _SetupOutput(self):
    # Console outputting objects
    self._maintain_queue = False
    self._console_output = multiline.MultilineConsoleOutput(self._stream)
    self._header_message = self._AddMessage(self._message)
    self._header_stage = Stage('')  # Use a Stage object to hold header state.
    self._header_stage.status = StageCompletionStatus.RUNNING
    self._stage_messages = dict()
    for stage in self._stages.values():
      self._stage_messages[stage] = self._AddMessage(stage.header,
                                                     indentation_level=1)
      self._UpdateStageTickMark(stage)
    self._console_output.UpdateConsole()

  def _GenerateStagePrefix(self, stage_status, tick_mark):
    if stage_status == StageCompletionStatus.NOT_STARTED:
      tick_mark = self._symbols.not_started
    elif stage_status == StageCompletionStatus.SUCCESS:
      tick_mark = self._symbols.success
    elif stage_status == StageCompletionStatus.FAILED:
      tick_mark = self._symbols.failed
    elif stage_status == StageCompletionStatus.INTERRUPTED:
      tick_mark = self._symbols.interrupted
    return tick_mark + ' ' * (self._symbols.prefix_length - len(tick_mark))

  def _FailStage(self, stage, exception, message=None):
    """Informs the progress tracker that this stage has failed."""
    self._UpdateStageTickMark(stage)
    if exception:
      for other_stage in self._stages.values():
        if (other_stage != stage and
            other_stage.status == StageCompletionStatus.RUNNING):
          other_stage.status = StageCompletionStatus.INTERRUPTED
        other_stage._is_done = True  # pylint: disable=protected-access

  def _CompleteStage(self, stage):
    self._UpdateStageTickMark(stage)

  def _CompleteStageWithWarnings(self, stage, warning_messages):
    self._UpdateStageTickMark(stage)

  def Tick(self):
    """Give a visual indication to the user that some progress has been made.

    Output is sent to sys.stderr. Nothing is shown if output is not a TTY.
    This method also handles loading new stages and flushing out completed
    stages.

    Returns:
      Whether progress has completed.
    """
    with self._lock:
      if not self._done:
        self._ticks += 1
        self._Print(self._GetTickMark(self._ticks))
    return self._done

  def _Print(self, tick_mark=''):
    """Prints an update containing message to the output stream.

    Args:
      tick_mark: str, suffix of message
    """
    if not self._output_enabled:
      return
    header_prefix = self._GenerateStagePrefix(
        self._header_stage.status, tick_mark)
    self._UpdateHeaderMessage(header_prefix)
    for key in self._running_stages:
      self._UpdateStageTickMark(self[key], tick_mark)
    self._console_output.UpdateConsole()


class NoOpStagedProgressTracker(_BaseStagedProgressTracker):
  """A staged progress tracker that doesn't do anything."""

  def __init__(self, stages, interruptable=False, aborted_message=''):
    super(NoOpStagedProgressTracker, self).__init__(
        message='',
        stages=stages,
        success_message='',
        warning_message='',
        failure_message='',
        autotick=False,
        tick_delay=0,
        interruptable=interruptable,
        aborted_message=aborted_message,
        tracker_id='',
        done_message_callback=None,
        console=console_attr.ConsoleAttr(
            encoding='ascii', suppress_output=True))
    self._aborted_message = aborted_message
    self._done = False

  def __enter__(self):
    def _CtrlCHandler(unused_signal, unused_frame):
      if self._interruptable:
        raise console_io.OperationCancelledError(self._aborted_message)
    self._old_signal_handler = signal.signal(signal.SIGINT, _CtrlCHandler)
    return self

  def _Print(self, message=''):
    # Non-interactive progress tracker should not print anything.
    return

  def Tick(self):
    return self._done

  def __exit__(self, exc_type, exc_val, exc_tb):
    self._done = True
    signal.signal(signal.SIGINT, self._old_signal_handler)

  def _SetupOutput(self):
    pass

  def UpdateHeaderMessage(self, message):
    pass


class _StubStagedProgressTracker(NoOpStagedProgressTracker):
  """Staged tracker that only prints deterministic start and end points.

  No UX about tracking should be exposed here. This is strictly for being able
  to tell that the tracker ran, not what it actually looks like.
  """

  def __init__(self, message, stages, interruptable, aborted_message):
    super(_StubStagedProgressTracker, self).__init__(
        stages, interruptable, aborted_message)
    self._message = message
    self._succeeded_stages = []
    self._failed_stage = None
    self._stream = sys.stderr

  def _CompleteStage(self, stage):
    self._succeeded_stages.append(stage.header)

  def _FailStage(self, stage, exception, message=None):
    self._failed_stage = stage.header
    raise exception

  def __exit__(self, exc_type, exc_val, exc_tb):
    if exc_val and isinstance(exc_val, console_io.OperationCancelledError):
      status_message = 'INTERRUPTED'
    elif exc_val:
      status_message = 'FAILURE'
    elif self.HasWarning():
      status_message = 'WARNING'
    else:
      status_message = 'SUCCESS'

    if log.IsUserOutputEnabled():
      self._stream.write(console_io.JsonUXStub(
          console_io.UXElementType.STAGED_PROGRESS_TRACKER,
          message=self._message,
          status=status_message,
          succeeded_stages=self._succeeded_stages,
          failed_stage=self._failed_stage) + '\n')
    return super(
        _StubStagedProgressTracker, self).__exit__(exc_type, exc_val, exc_tb)