HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/command_lib/storage/upload_stream.py
# -*- coding: utf-8 -*- #
# Copyright 2021 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilities for representing a part of a stream."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import os

from googlecloudsdk.command_lib.storage import errors
from googlecloudsdk.command_lib.storage import hash_util
from googlecloudsdk.core.updater import installers

_PROGRESS_CALLBACK_THRESHOLD = 16777216  # 16 MiB in bytes.


class UploadStream:
  """Implements a subset of the io.IOBase API, adding functionality for uploads.

  When data is read from a stream, this class
  1. Updates hash digesters.
  2. Executes a progress callbacks if a byte threshold is passed.
  """

  def __init__(self,
               stream,
               length=None,
               digesters=None,
               progress_callback=None):
    """Initializes a FilePart instance.

    Args:
      stream (io.IOBase): The underlying stream wrapped by this class.
      length (int|None): The total number of bytes in the UploadStream.
      digesters (dict[util.HashAlgorithm, hashlib hash object]|None): Values are
        updated with with data as it's read.
      progress_callback (func[int]|None): Accepts an amount of processed bytes
        and submits progress information for aggregation.
    """
    self._stream = stream
    self._length = length
    self._digesters = digesters if digesters is not None else {}
    self._progress_callback = progress_callback

    self._bytes_read_since_last_progress_callback = 0
    self._progress_updated_with_end_byte = False
    self._checkpoint_digesters = None
    self._checkpoint_absolute_index = 0

    self._start_byte = 0

  def _get_absolute_position(self):
    """Returns absolute position in the stream.

    Hashing and progress reporting logic relies on absolute positions. Since
    child classes overwrite `tell` to make it return relative positions, we need
    to write hashing and progress reporting in a way that does not reference
    `self.tell`, which this function makes possible.
    """
    return self._stream.tell()

  def _update_absolute_position(self, offset):
    """Seeks to a position in the underlying stream.

    Catching up digesters sometimes requires seeking to a specific position in
    self._stream. Child classes wrap streams which are not seekable, and have
    different strategies to make it appear that a seek has occured, which can
    be supported by overriding this method.

    Args:
      offset (int): the position to seek to.

    Returns:
      the new position in the stream.
    """
    return self._stream.seek(offset)

  def _get_data(self, size=-1):
    """Reads bytes from the underlying stream.

    Child classes do not always read directly from the stream. Progress
    reporting and hashing logic can be reused by overriding only this method.

    Args:
      size (int): the number of bytes to read. If less than 0, all bytes are
          returned.

    Returns:
      bytes from self._stream.
    """
    return self._stream.read(size)

  def _save_digesters_checkpoint(self):
    """Updates checkpoint that holds old hashes to optimize backwards seeks."""
    if not self._digesters:
      return
    self._checkpoint_absolute_index = self._get_absolute_position()
    self._checkpoint_digesters = hash_util.copy_digesters(self._digesters)

  def _catch_up_digesters(self, new_absolute_index):
    """Digests data between last and current stream position."""
    if not self._digesters:
      return
    if new_absolute_index < self._checkpoint_absolute_index:
      # Case 1: New position < Checkpoint position < Old position.
      self._update_absolute_position(self._start_byte)
      hash_util.reset_digesters(self._digesters)
    elif new_absolute_index < self._get_absolute_position():
      # Case 2: Checkpoint position < New position < Old position.
      self._update_absolute_position(self._checkpoint_absolute_index)

      # The instantiator of this class expects the digesters dictionary it
      # passes to the initializer to contain updated digests. To handle backward
      # seeks we replace the current digester in that dictionary with their
      # values at the last checkpoint.
      self._digesters.update(self._checkpoint_digesters)
      self._checkpoint_digesters = hash_util.copy_digesters(
          self._checkpoint_digesters)
    elif new_absolute_index == self._get_absolute_position():
      # Case 3: Old position == New position.
      return
    # Case 4: Old position < New position.
    # Below digester updates are sufficient.

    self._save_digesters_checkpoint()
    while True:
      data = self._get_data(
          min(new_absolute_index - self._get_absolute_position(),
              installers.WRITE_BUFFER_SIZE))
      if not data:
        break
      hash_util.update_digesters(self._digesters, data)

  def tell(self):
    """Returns the current position in the stream."""
    return self._get_absolute_position()

  def read(self, size=-1):
    """Returns `size` bytes from the underlying stream."""
    self._save_digesters_checkpoint()
    data = self._get_data(size)
    if data:
      hash_util.update_digesters(self._digesters, data)
      if self._progress_callback:
        self._bytes_read_since_last_progress_callback += len(data)
        if (self._bytes_read_since_last_progress_callback >=
            _PROGRESS_CALLBACK_THRESHOLD):
          self._bytes_read_since_last_progress_callback = 0
          self._progress_callback(self._get_absolute_position())
          self._progress_updated_with_end_byte = self.tell() == self._length

    return data

  def seek(self, offset, whence=os.SEEK_SET):
    """Goes to a specific point in the stream.

    Args:
      offset (int): The number of bytes to move.
      whence: Specifies the position offset is added to.
        os.SEEK_SET: offset is added to the current byte.
        os.SEEK_END, os.SEEK_CUR are not supported.

    Returns:
      The new position in the stream (int).
    """
    if whence == os.SEEK_END:
      if self._length:
        new_absolute_index = offset + self._length
      else:
        raise errors.Error(
            'SEEK_END is not supported if the length of the stream is unknown.')
    elif whence == os.SEEK_CUR:
      new_absolute_index = self._get_absolute_position() + offset
    else:
      new_absolute_index = offset

    self._catch_up_digesters(new_absolute_index)
    # Above may perform seek, but repeating is harmless.
    return self._update_absolute_position(new_absolute_index)

  def close(self, caught_error=False):
    """Closes the underlying stream."""
    if (self._progress_callback and not self._progress_updated_with_end_byte):
      if not caught_error:
        self._progress_callback(self._get_absolute_position())
      self._progress_updated_with_end_byte = True
    return self._stream.close()

  def __enter__(self):
    return self

  def __exit__(self, error_type, *unused_args):
    self.close(caught_error=bool(error_type))