HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/command_lib/storage/thread_messages.py
# -*- coding: utf-8 -*- #
# Copyright 2021 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Messages parallel workers might send to the main thread."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import os
import threading


class ThreadMessage(object):
  """Message that can be sent from multithreading workers to global status."""


class IncrementProgressMessage(ThreadMessage):
  """Simple message indicating one of something has completed."""


class DetailedProgressMessage(ThreadMessage):
  """Message class for sending information about operation progress.

  This class contains specific information on the progress of operating on a
  file, cloud object, or single component.

  Attributes:
    offset (int): Start of byte range to start operation at.
    length (int): Total size of file or component in bytes.
    current_byte (int): Index of byte being operated on.
    finished (bool): Indicates if the operation is complete.
    time (float): When message was created (seconds since epoch).
    source_url (StorageUrl): Represents source of data used by operation.
    destination_url (StorageUrl|None): Represents destination of data used by
      operation. None for unary operations like hashing.
    component_number (int|None): If a multipart operation, indicates the
      component number.
    total_components (int|None): If a multipart operation, indicates the
      total number of components.
    operation_name (task_status.OperationName|None): Name of the operation
      running on target data.
    error_occurred (bool): Indicates if an error occurred during the
      operation.
    process_id (int|None): Identifies process that produced the instance of this
      message (overridable for testing).
    thread_id (int|None): Identifies thread that produced the instance of this
      message (overridable for testing).
  """

  def __init__(self,
               offset,
               length,
               current_byte,
               time,
               source_url,
               destination_url=None,
               component_number=None,
               total_components=None,
               operation_name=None,
               error_occurred=False,
               process_id=None,
               thread_id=None):
    # pylint:disable=g-doc-args
    """Initializes a ProgressMessage. See attributes docstring for arguments."""
    # pylint:enable=g-doc-args
    self.offset = offset
    self.length = length
    self.current_byte = current_byte
    self.time = time

    self.source_url = source_url
    self.destination_url = destination_url
    self.component_number = component_number
    self.total_components = total_components

    self.operation_name = operation_name
    self.error_occurred = error_occurred
    self.process_id = process_id or os.getpid()
    self.thread_id = thread_id or threading.current_thread().ident

  def __eq__(self, other):
    if not isinstance(other, self.__class__):
      return NotImplemented
    return self.__dict__ == other.__dict__

  def __repr__(self):
    """Returns a string with a valid constructor for this message."""
    source_url_string = "'{}'".format(self.source_url)
    destination_url_string = (
        "'{}'".format(self.destination_url) if self.destination_url else None
    )
    operation_name_string = (
        "'{}'".format(self.operation_name.value)
        if self.operation_name
        else None
    )
    return (
        '{class_name}(time={time}, offset={offset}, length={length},'
        ' current_byte={current_byte}, source_url={source_url},'
        ' destination_url={destination_url},'
        ' component_number={component_number},'
        ' total_components={total_components}, operation_name={operation_name},'
        ' error_occurred={error_occurred}, process_id={process_id},'
        ' thread_id={thread_id})'
    ).format(
        class_name=self.__class__.__name__,
        time=self.time,
        offset=self.offset,
        length=self.length,
        current_byte=self.current_byte,
        source_url=source_url_string,
        destination_url=destination_url_string,
        component_number=self.component_number,
        total_components=self.total_components,
        operation_name=operation_name_string,
        error_occurred=self.error_occurred,
        process_id=self.process_id,
        thread_id=self.thread_id,
    )


class ManifestMessage(ThreadMessage):
  """Message class for updating manifest file with result of copy.

  Attributes:
    source_url (StorageUrl): Source URL. Used to match information recorded by
      copy progress infra (FilesAndBytesStatusTracker).
    destination_url (StorageUrl): Destination URL.
    end_time (datetime): Date and time copy completed.
    size (int): Size of file or object.
    result_status (manifest_utils.ResultStatus): End status of copy. Either
      "OK", "skip", or "error".
    md5_hash (str|None): Hash of copied file or object.
    description (str|None): Message about something that happened during a copy.
  """

  def __init__(self,
               source_url,
               destination_url,
               end_time,
               size,
               result_status,
               md5_hash=None,
               description=None):
    """Initializes ManifestMessage. Args in attributes docstring."""
    self.source_url = source_url
    self.destination_url = destination_url
    self.end_time = end_time
    self.size = size
    self.result_status = result_status
    self.md5_hash = md5_hash
    self.description = description

  def __repr__(self):
    """Returns a string with a valid constructor for this message."""
    source_url_string = "'{}'".format(self.source_url)
    destination_url_string = "'{}'".format(self.destination_url)
    end_time_string = "'{}'".format(self.end_time)
    md5_hash_string = "'{}'".format(self.md5_hash) if self.md5_hash else 'None'
    description_string = "'{}'".format(
        self.description) if self.description else 'None'
    return ('{class_name}(source_url={source_url},'
            ' destination_url={destination_url}, end_time={end_time},'
            ' size={size}, result_status={result_status},'
            ' md5_hash={md5_hash}, description={description})').format(
                class_name=self.__class__.__name__,
                source_url=source_url_string,
                destination_url=destination_url_string,
                end_time=end_time_string,
                size=self.size,
                result_status=self.result_status,
                md5_hash=md5_hash_string,
                description=description_string)

  def __eq__(self, other):
    if not isinstance(other, type(self)):
      return NotImplemented
    return (self.source_url == other.source_url and
            self.destination_url == other.destination_url and
            self.end_time == other.end_time and self.size == other.size and
            self.result_status == other.result_status and
            self.md5_hash == other.md5_hash and
            self.description == other.description)


class WorkloadEstimatorMessage(ThreadMessage):
  """Message class for estimating total workload of operation.

  Attributes:
    item_count (int): Number of items to add to workload estimation.
    size (int|None): Number of bytes to add to workload estimation.
  """

  def __init__(self, item_count, size=None):
    # pylint:disable=g-doc-args
    """Initializes WorkloadEstimatorMessage. Args in attributes docstring."""
    # pylint:enable=g-doc-args
    self.item_count = item_count
    self.size = size

  def __eq__(self, other):
    if not isinstance(other, self.__class__):
      return NotImplemented
    return self.__dict__ == other.__dict__

  def __repr__(self):
    """Returns a string with a valid constructor for this message."""
    return '{class_name}(item_count={item_count}, size={size})'.format(
        class_name=self.__class__.__name__,
        item_count=self.item_count,
        size=self.size)