HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/396/lib/googlecloudsdk/command_lib/storage/tasks/task_status.py
# -*- coding: utf-8 -*- #
# Copyright 2021 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tools for monitoring and reporting task statuses."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import abc
import collections
import datetime
import enum
import threading

from googlecloudsdk.command_lib.storage import errors
from googlecloudsdk.command_lib.storage import manifest_util
from googlecloudsdk.command_lib.storage import metrics_util
from googlecloudsdk.command_lib.storage import thread_messages
from googlecloudsdk.core import log
from googlecloudsdk.core.console import progress_tracker
from googlecloudsdk.core.util import scaled_integer
import six


# Recalculate throughput everytime last message time - window_start_time
# is greater than this time threshold.
_THROUGHPUT_WINDOW_THRESHOLD_SECONDS = 3


class OperationName(enum.Enum):
  DOWNLOADING = 'Downloading'
  INTRA_CLOUD_COPYING = 'Intra-Cloud Copying'
  DAISY_CHAIN_COPYING = 'Daisy Chain Copying'
  UPLOADING = 'Uploading'


class IncrementType(enum.Enum):
  INTEGER = 'INTEGER'
  FILES_AND_BYTES = 'FILES_AND_BYTES'


ProgressManagerArgs = collections.namedtuple(
    'ProgressManagerArgs', ['increment_type', 'manifest_path'])


class FileProgress:
  """Holds progress information for file being copied.

  Attributes:
    component_progress (dict<int,int>): Records bytes copied per component. If
      not multi-component copy (e.g. "sliced download"), there will only be one
      component.
    start_time (datetime|None): Needed if writing file copy results to manifest.
    total_bytes_copied (int|None): Sum of bytes copied for each component.
      Needed because components are popped when completed, but we don't want to
      lose info on them if writing to the manifest.
    error_occurred (bool): Whether an error occurred during the operation.
  """

  def __init__(
      self,
      component_count,
      start_time=None,
      total_bytes_copied=None,
      error_occurred=False,
  ):
    self.component_progress = {i: 0 for i in range(component_count)}
    self.start_time = start_time
    self.total_bytes_copied = total_bytes_copied
    self.error_occurred = error_occurred


def _get_formatted_throughput(bytes_processed, time_delta):
  throughput_bytes = max(bytes_processed / time_delta, 0)
  return scaled_integer.FormatBinaryNumber(
      throughput_bytes, decimal_places=1) + '/s'


class _StatusTracker(six.with_metaclass(abc.ABCMeta, object)):
  """Abstract class for tracking and displaying operation progress."""

  @abc.abstractmethod
  def _get_status_string(self):
    """Generates string to illustrate progress to the user."""
    pass

  def _get_done_string(self):
    """Generates string for when StatusTracker exits."""
    return '\n'

  @abc.abstractmethod
  def add_message(self, status_message):
    """Processes task status message for printing and aggregation.

    Args:
      status_message (thread_messages.*): Message to process.
    """
    pass

  def start(self):
    self._progress_tracker = progress_tracker.ProgressTracker(
        message='  ',
        detail_message_callback=self._get_status_string,
        done_message_callback=self._get_done_string,
        no_spacing=True)
    self._progress_tracker.__enter__()
    return self

  def stop(self, exc_type, exc_val, exc_tb):
    if self._progress_tracker:
      self._progress_tracker.__exit__(exc_type, exc_val, exc_tb)


class _IntegerStatusTracker(_StatusTracker):
  """See super class. Tracks both file count and byte amount."""

  def __init__(self):
    super(_IntegerStatusTracker, self).__init__()
    self._completed = 0
    self._total_estimation = 0

  def _get_status_string(self):
    """See super class."""
    if self._total_estimation:
      file_progress_string = '{}/{}'.format(self._completed,
                                            self._total_estimation)
    else:
      file_progress_string = self._completed

    return 'Completed {}\r'.format(file_progress_string)

  def add_message(self, status_message):
    """See super class."""
    if isinstance(status_message, thread_messages.WorkloadEstimatorMessage):
      self._total_estimation += status_message.item_count
    elif isinstance(status_message, thread_messages.IncrementProgressMessage):
      self._completed += 1


class _FilesAndBytesStatusTracker(_StatusTracker, metrics_util.MetricsReporter):
  """See super class. Tracks both file count and byte amount."""

  def __init__(self, manifest_path=None):
    super(_FilesAndBytesStatusTracker, self).__init__()
    # For displaying progress.
    self._completed_files = 0
    self._processed_bytes = 0
    self._total_files_estimation = 0
    self._total_bytes_estimation = 0

    # For calculating average throughput.
    self._first_operation_time = None
    self._last_operation_time = None
    self._total_processed_bytes = 0

    # For calculating window throughput.
    self._window_start_time = None
    self._window_processed_bytes = 0
    # String for on-the-fly display.
    self._window_throughput = None

    # For keeping track of progress on different files.
    self._tracked_file_progress = {}

    if manifest_path:
      self._manifest_manager = manifest_util.ManifestManager(manifest_path)
    else:
      self._manifest_manager = None

  def _get_status_string(self):
    """See super class."""
    scaled_processed_bytes = scaled_integer.FormatBinaryNumber(
        self._processed_bytes, decimal_places=1)
    if self._total_files_estimation:
      file_progress_string = '{}/{}'.format(self._completed_files,
                                            self._total_files_estimation)
    else:
      file_progress_string = self._completed_files
    if self._total_bytes_estimation:
      scaled_total_bytes_estimation = scaled_integer.FormatBinaryNumber(
          self._total_bytes_estimation, decimal_places=1)
      bytes_progress_string = '{}/{}'.format(scaled_processed_bytes,
                                             scaled_total_bytes_estimation)
    else:
      bytes_progress_string = scaled_processed_bytes

    if self._window_throughput:
      throughput_addendum_string = ' | ' + self._window_throughput
    else:
      throughput_addendum_string = ''

    return 'Completed files {} | {}{}\r'.format(file_progress_string,
                                                bytes_progress_string,
                                                throughput_addendum_string)

  def _update_throughput(self, status_message, processed_bytes):
    """Updates stats and recalculates throughput if past threshold."""
    if self._first_operation_time is None:
      self._first_operation_time = status_message.time
      self._window_start_time = status_message.time
    else:
      self._last_operation_time = status_message.time

    self._window_processed_bytes += processed_bytes

    time_delta = status_message.time - self._window_start_time
    if time_delta > _THROUGHPUT_WINDOW_THRESHOLD_SECONDS:
      self._window_throughput = _get_formatted_throughput(
          self._window_processed_bytes, time_delta)
      self._window_start_time = status_message.time
      self._window_processed_bytes = 0

  def _add_to_workload_estimation(self, status_message):
    """Adds WorloadEstimatorMessage info to total workload estimation."""
    self._total_files_estimation += status_message.item_count
    self._total_bytes_estimation += status_message.size

  def _add_progress(self, status_message):
    """Track progress of a multipart file operation."""
    file_url_string = status_message.source_url.url_string
    if file_url_string not in self._tracked_file_progress:
      if status_message.total_components:
        self._tracked_file_progress[file_url_string] = FileProgress(
            component_count=status_message.total_components)
      else:
        self._tracked_file_progress[file_url_string] = FileProgress(
            component_count=1)
      if self._manifest_manager:
        self._tracked_file_progress[file_url_string].start_time = (
            datetime.datetime.fromtimestamp(status_message.time,
                                            datetime.timezone.utc))
        self._tracked_file_progress[file_url_string].total_bytes_copied = 0

    component_tracker = self._tracked_file_progress[
        file_url_string].component_progress

    if status_message.component_number:
      component_number = status_message.component_number
    else:
      component_number = 0

    processed_component_bytes = (
        status_message.current_byte - status_message.offset)
    # status_message.current_byte includes bytes from past messages.
    newly_processed_bytes = (
        processed_component_bytes - component_tracker.get(component_number, 0))
    self._processed_bytes += newly_processed_bytes
    self._update_throughput(status_message, newly_processed_bytes)

    if self._manifest_manager:
      # Keep track of total bytes per file for writing to manifest.
      self._tracked_file_progress[
          file_url_string].total_bytes_copied += newly_processed_bytes

    if status_message.error_occurred:
      # If an error occurred, mark the file as failed.
      self._tracked_file_progress[file_url_string].error_occurred = True

    if processed_component_bytes == status_message.length:
      # Operation complete.
      component_tracker.pop(component_number, None)
      if not component_tracker:
        if not self._tracked_file_progress[file_url_string].error_occurred:
          # Count as completed, if no error occurred.
          self._completed_files += 1
        if not self._manifest_manager:
          # If managing manifest, _add_to_manifest clears items from tracking.
          del self._tracked_file_progress[file_url_string]
    else:
      component_tracker[component_number] = processed_component_bytes

  def _add_to_manifest(self, status_message):
    """Updates manifest file and pops file from tracking if needed."""
    if not self._manifest_manager:
      raise errors.Error(
          'Received ManifestMessage but StatusTracker was not initialized with'
          ' manifest path.'
      )
    file_progress = self._tracked_file_progress.pop(
        status_message.source_url.url_string, None)
    self._manifest_manager.write_row(status_message, file_progress)

  def add_message(self, status_message):
    """See super class."""
    if isinstance(status_message, thread_messages.WorkloadEstimatorMessage):
      self._add_to_workload_estimation(status_message)
    elif isinstance(status_message, thread_messages.DetailedProgressMessage):
      self._set_source_and_destination_schemes(status_message)
      # If files start getting counted twice, see b/225182075.
      self._add_progress(status_message)
    elif isinstance(status_message, thread_messages.IncrementProgressMessage):
      self._completed_files += 1
    elif isinstance(status_message, thread_messages.ManifestMessage):
      self._add_to_manifest(status_message)

  def stop(self, exc_type, exc_val, exc_tb):
    super(_FilesAndBytesStatusTracker, self).stop(exc_type, exc_val, exc_tb)

    if (self._first_operation_time is not None and
        self._last_operation_time is not None and
        self._first_operation_time != self._last_operation_time):
      time_delta = self._last_operation_time - self._first_operation_time
      # Don't use get_done_string because it may cause line wrapping.
      log.status.Print('\nAverage throughput: {}'.format(
          _get_formatted_throughput(self._processed_bytes, time_delta)))
      # Report event for analytics tracking, if enabled.
      self._report_metrics(self._processed_bytes, time_delta,
                           self._completed_files)


def status_message_handler(task_status_queue, status_tracker):
  """Thread method for submiting items from queue to tracker for processing."""
  unhandled_message_exists = False

  while True:
    status_message = task_status_queue.get()
    if status_message == '_SHUTDOWN':
      break
    if status_tracker:
      status_tracker.add_message(status_message)
    else:
      unhandled_message_exists = True

  if unhandled_message_exists:
    log.warning('Status message submitted to task_status_queue without a'
                ' manager to print it.')


def progress_manager(task_status_queue=None, progress_manager_args=None):
  """Factory function that returns a ProgressManager instance.

  Args:
    task_status_queue (multiprocessing.Queue|None): Tasks can submit their
      progress messages here.
    progress_manager_args (ProgressManagerArgs|None): Determines what type of
      progress indicator to display.

  Returns:
    An instance of _ProgressManager or _NoOpProgressManager.
  """
  if task_status_queue is not None:
    return _ProgressManager(task_status_queue, progress_manager_args)
  else:
    return _NoOpProgressManager()


class _ProgressManager:
  """Context manager for processing and displaying progress completing command.

  Ensure that this class is instantiated after all the child
  processes (if any) are started to prevent deadlock.
  """

  def __init__(self, task_status_queue, progress_manager_args=None):
    """Initializes context manager.

    Args:
      task_status_queue (multiprocessing.Queue): Tasks can submit their progress
        messages here.
      progress_manager_args (ProgressManagerArgs|None): Determines what type of
        progress indicator to display.
    """
    self._progress_manager_args = progress_manager_args
    self._status_message_handler_thread = None
    self._status_tracker = None
    self._task_status_queue = task_status_queue

  def __enter__(self):
    if self._progress_manager_args:
      if self._progress_manager_args.increment_type is IncrementType.INTEGER:
        self._status_tracker = _IntegerStatusTracker()
      elif (self._progress_manager_args.increment_type is
            IncrementType.FILES_AND_BYTES):
        self._status_tracker = _FilesAndBytesStatusTracker(
            self._progress_manager_args.manifest_path)

    self._status_message_handler_thread = threading.Thread(
        target=status_message_handler,
        args=(self._task_status_queue, self._status_tracker))
    self._status_message_handler_thread.start()

    if self._status_tracker:
      self._status_tracker.start()
    return self

  def __exit__(self, exc_type, exc_val, exc_tb):
    self._task_status_queue.put('_SHUTDOWN')
    self._status_message_handler_thread.join()

    if self._status_tracker:
      self._status_tracker.stop(exc_type, exc_val, exc_tb)


class _NoOpProgressManager:
  """Progress Manager that does not do anything.

  Similar to contextlib.nullcontext, but it is available only for Python3.7+.
  """

  def __enter__(self):
    return self

  def __exit__(self, exc_type, exc_val, exc_tb):
    del  exc_type, exc_val, exc_tb  # Unused.
    pass