HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/396/lib/googlecloudsdk/api_lib/storage/gcs_grpc/upload.py
# -*- coding: utf-8 -*- #
# Copyright 2023 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Upload workflow using gRPC API client."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import abc
import copy
import functools
import os

from googlecloudsdk.api_lib.storage import retry_util as storage_retry_util
from googlecloudsdk.api_lib.storage.gcs_grpc import grpc_util
from googlecloudsdk.api_lib.storage.gcs_grpc import metadata_util
from googlecloudsdk.api_lib.storage.gcs_grpc import retry_util
from googlecloudsdk.command_lib.storage import hash_util
from googlecloudsdk.command_lib.storage.resources import resource_reference
from googlecloudsdk.command_lib.storage.tasks.cp import copy_util
from googlecloudsdk.core import log
from googlecloudsdk.core import properties
from googlecloudsdk.core.util import scaled_integer
import six


class _Upload(six.with_metaclass(abc.ABCMeta, object)):
  """Base class shared by different upload strategies."""

  def __init__(
      self,
      client,
      source_stream,
      destination_resource,
      request_config,
      source_resource=None,
      start_offset=0,
  ):
    """Initializes _Upload.

    Args:
      client (StorageClient): The GAPIC client.
      source_stream (io.IOBase): Yields bytes to upload.
      destination_resource (resource_reference.ObjectResource|UnknownResource):
        Metadata for the destination object.
      request_config (gcs_api.GcsRequestConfig): Tracks additional request
        preferences.
      source_resource (FileObjectResource|ObjectResource|None): Contains the
        source StorageUrl and source object metadata for daisy chain transfers.
        Can be None if source is pure stream.
      start_offset (int): The offset from the beginning of the object at
        which the data should be written.
    """
    self._client = client
    self._source_stream = source_stream
    self._destination_resource = destination_resource
    self._request_config = request_config
    self._start_offset = start_offset
    # Maintain the state of upload. Useful for resumable and streaming uploads.
    self._uploaded_so_far = start_offset
    self._source_stream_finished = False
    # Variable used to split a stream of data into chunks of this size.
    self._chunk_size = None
    self._source_resource = source_resource

  def _get_md5_hash_if_given(self):
    """Returns MD5 hash bytes sequence from resource args if given.

    Returns:
      bytes|None: MD5 hash bytes sequence if MD5 string was given, otherwise
      None.
    """
    if (self._request_config.resource_args is not None
        and self._request_config.resource_args.md5_hash is not None):
      return hash_util.get_bytes_from_base64_string(
          self._request_config.resource_args.md5_hash)
    return None

  def _initialize_generator(self):
    # If this method is called multiple times, it is needed to reset
    # what has been uploaded so far.
    self._uploaded_so_far = self._start_offset
    # Set stream position at the start offset.
    self._source_stream.seek(self._start_offset, os.SEEK_SET)
    self._source_stream_finished = False

  def _upload_write_object_request_generator(self, first_message):
    """Yields the WriteObjectRequest for each chunk of the source stream.

    The amount_of_data_sent_so_far is equal to the number of bytes read
    from the source stream.

    If _chunk_size is not None, this function will yield the WriteObjectRequest
    object until the amount_of_data_sent_so_far is equal to or greater than the
    value of the new _chunk_size and the length of data sent in the last
    WriteObjectRequest is equal to MAX_WRITE_CHUNK_BYTES, or if there are no
    data in the source stream.

    MAX_WRITE_CHUNK_BYTES is a multiple 256 KiB.

    Clients must only send data that is a multiple of 256 KiB per message,
    unless the object is being finished with``finish_write`` set to ``true``.

    This means that if amount_of_data_sent_so_far >= _chunk_size,
    it must also be ensured before stopping yielding
    requests(WriteObjectRequest) that all requests have sent
    data multiple of 256 KiB, in other words length of data % 256 KiB is 0.

    The source stream data is read in chunks of MAX_WRITE_CHUNK_BYTES, that
    means that each request yielded will send data of size
    MAX_WRITE_CHUNK_BYTES, except if there is a last request before the final
    request(``finish_write`` set to ``true``) where the data length is less
    than MAX_WRITE_CHUNK_BYTES, this means if the length of data in the last
    request yielded is equal to MAX_WRITE_CHUNK_BYTES, all requests sent before
    have sent data of size MAX_WRITE_CHUNK_BYTES, therefore all requests have
    sent data that is multiple of 256 KiB, thus satisfying the condition
    stated before. If the the length of data in the last request yielded is not
    equal to MAX_WRITE_CHUNK_BYTES, then stop when there are no data
    in the source stream(the final request is sent).

    Otherwise if _chunk_size is None, it will yield all WriteObjectRequest
    objects until there are no data in the source stream.

    Args:
      first_message (WriteObjectSpec|str): WriteObjectSpec for Simple uploads,
      str that is the upload id for Resumable and Streaming uploads.

    Yields:
      (googlecloudsdk.generated_clients.gapic_clients.storage_v2.types.WriteObjectRequest)
      WriteObjectRequest instance.
    """
    first_request_done = False

    if isinstance(first_message, self._client.types.WriteObjectSpec):
      write_object_spec = first_message
      upload_id = None
    else:
      write_object_spec = None
      upload_id = first_message

    self._initialize_generator()

    while True:
      data = self._source_stream.read(
          self._client.types.ServiceConstants.Values.MAX_WRITE_CHUNK_BYTES
      )

      if not first_request_done:
        first_request_done = True
      else:
        write_object_spec = None
        upload_id = None

      if data:
        object_checksums = None
        finish_write = False
      else:
        # Handles final request case.
        object_checksums = self._client.types.ObjectChecksums(
            md5_hash=self._get_md5_hash_if_given()
        )
        finish_write = True

      yield self._client.types.WriteObjectRequest(
          write_object_spec=write_object_spec,
          upload_id=upload_id,
          write_offset=self._uploaded_so_far,
          checksummed_data=self._client.types.ChecksummedData(content=data),
          object_checksums=object_checksums,
          finish_write=finish_write,
      )
      self._uploaded_so_far += len(data)

      if finish_write:
        self._source_stream_finished = True
        break

      if self._chunk_size is None:
        continue

      # Can break when data sent in last request is multiple of 256 KiB.
      # For more context see docstring.
      is_length_of_data_equal_to_max_write_chunk_bytes = (
          len(data)
          == self._client.types.ServiceConstants.Values.MAX_WRITE_CHUNK_BYTES
      )
      amount_of_data_sent_so_far = self._uploaded_so_far - self._start_offset
      if (
          is_length_of_data_equal_to_max_write_chunk_bytes
          and amount_of_data_sent_so_far >= self._chunk_size
      ):
        break

  def _set_metadata_if_source_is_object_resource(
      self, object_metadata):
    """Copies metadata from _source_resource to object_metadata.

    It is copied if _source_resource is an instance of ObjectResource, this is
    in case a daisy chain copy is performed.

    Args:
      object_metadata (gapic_clients.storage_v2.types.storage.Object): Existing
        object metadata.
    """

    if not isinstance(self._source_resource, resource_reference.ObjectResource):
      return

    if not self._source_resource.custom_fields:
      return

    object_metadata.metadata = copy.deepcopy(
        self._source_resource.custom_fields)

  def _get_write_object_spec(self, size=None):
    """Returns the WriteObjectSpec instance.

    Args:
      size (int|None): Expected object size in bytes.

    Returns:
      (gapic_clients.storage_v2.types.storage.WriteObjectSpec) The
      WriteObjectSpec instance.
    """

    destination_object = self._client.types.Object(
        name=self._destination_resource.storage_url.resource_name,
        bucket=grpc_util.get_full_bucket_name(
            self._destination_resource.storage_url.bucket_name),
        size=size)

    self._set_metadata_if_source_is_object_resource(
        destination_object
    )

    metadata_util.update_object_metadata_from_request_config(
        destination_object, self._request_config, self._source_resource
    )

    return self._client.types.WriteObjectSpec(
        resource=destination_object,
        if_generation_match=copy_util.get_generation_match_value(
            self._request_config
        ),
        if_metageneration_match=(
            self._request_config.precondition_metageneration_match
        ),
        object_size=size)

  def _call_write_object(self, first_message):
    """Calls write object api method with routing header.

    Args:
      first_message (WriteObjectSpec|str): WriteObjectSpec for Simple uploads.
    Returns:
      (gapic_clients.storage_v2.types.WriteObjectResponse) Request response.
    """
    return self._client.storage.write_object(
        requests=self._upload_write_object_request_generator(
            first_message=first_message
        ),
        metadata=metadata_util.get_bucket_name_routing_header(
            grpc_util.get_full_bucket_name(
                self._destination_resource.storage_url.bucket_name
            )
        ),
    )

  @abc.abstractmethod
  def run(self):
    """Performs an upload and returns and returns an Object message."""
    raise NotImplementedError


class SimpleUpload(_Upload):
  """Uploads an object with a single request."""

  @retry_util.grpc_default_retryer
  def run(self):
    """Uploads the object in non-resumable mode.

    Returns:
      (gapic_clients.storage_v2.types.WriteObjectResponse) A WriteObjectResponse
      instance.
    """
    write_object_spec = self._get_write_object_spec(
        self._request_config.resource_args.size
    )
    return self._call_write_object(
        write_object_spec
    )


class RecoverableUpload(_Upload):
  """Common logic for strategies allowing retries in-flight."""

  def _initialize_upload(self):
    """Sets up the upload session and returns the upload id.

    This method sets the start offset to 0.

    Returns:
      (str) Session URI for resumable upload operation.
    """

    # TODO(b/267555253): Add size to the spec once the fix is in prod.
    write_object_spec = self._get_write_object_spec()

    request = self._client.types.StartResumableWriteRequest(
        write_object_spec=write_object_spec
    )

    upload_id = self._client.storage.start_resumable_write(
        request=request).upload_id
    self._start_offset = 0
    return upload_id

  def _get_write_offset(self, upload_id):
    """Returns the amount of data persisted on the server.

    Args:
      upload_id (str): Session URI for resumable upload operation.
    Returns:
      (int) The total number of bytes that have been persisted for an object
      on the server. This value can be used as the write_offset.
    """
    request = self._client.types.QueryWriteStatusRequest(
        upload_id=upload_id
    )

    return self._client.storage.query_write_status(
        request=request,
    ).persisted_size

  def _should_retry(self, upload_id, exc_type=None, exc_value=None,
                    exc_traceback=None, state=None):
    if not retry_util.is_retriable(exc_type, exc_value, exc_traceback, state):
      return False

    persisted_size = self._get_write_offset(upload_id)
    is_progress_made_since_last_uplaod = persisted_size > self._start_offset
    if is_progress_made_since_last_uplaod:
      self._start_offset = persisted_size

    return True

  def _perform_upload(self, upload_id):
    return self._call_write_object(upload_id)

  def run(self):
    upload_id = self._initialize_upload()

    new_should_retry = functools.partial(self._should_retry, upload_id)
    return storage_retry_util.retryer(
        target=self._perform_upload,
        should_retry_if=new_should_retry,
        target_args=[upload_id],
    )


class ResumableUpload(RecoverableUpload):
  """Uploads objects with support for resuming between runs of a command."""

  def __init__(
      self,
      client,
      source_stream,
      destination_resource,
      request_config,
      serialization_data=None,
      source_resource=None,
      tracker_callback=None,
  ):
    super(ResumableUpload, self).__init__(
        client,
        source_stream,
        destination_resource,
        request_config,
        source_resource
    )
    self._serialization_data = serialization_data
    self._tracker_callback = tracker_callback

  def _initialize_upload(self):
    """Sets up the upload session and returns the upload id.

    Additionally, it does the following tasks:
    1. Grabs the persisted size on the backend.
    2. Sets the appropiate write offset.
    3. Calls the tracker callback.

    Returns:
      The upload session ID.
    """

    if self._serialization_data is not None:
      upload_id = self._serialization_data['upload_id']

      write_offset = self._get_write_offset(upload_id)
      self._start_offset = write_offset
      log.debug('Write offset after resuming: %s', write_offset)
    else:
      upload_id = super(ResumableUpload, self)._initialize_upload()

    if self._tracker_callback is not None:
      self._tracker_callback({'upload_id': upload_id})

    return upload_id


class StreamingUpload(RecoverableUpload):
  """Uploads objects from a stream with support for error recovery in-flight.

    Stream is split into chunks of size set by property upload_chunk_size,
    rounded down to be a multiple of MAX_WRITE_CHUNK_BYTES.

    For example if upload_chunk_size is 7 MiB and MAX_WRITE_CHUNK_BYTES is
    2 MiB, it will be rounded down to 6 MiB. If upload_chunk_size is less than
    MAX_WRITE_CHUNK_BYTES, it will be equal to MAX_WRITE_CHUNK_BYTES.
  """

  def __init__(
      self,
      client,
      source_stream,
      destination_resource,
      request_config,
      source_resource=None,
  ):
    super(StreamingUpload, self).__init__(
        client,
        source_stream,
        destination_resource,
        request_config,
        source_resource
    )
    self._log_chunk_warning = False
    self._chunk_size = self._get_chunk_size()

  def _get_chunk_size(self):
    """Returns the chunk size corrected to be multiple of MAX_WRITE_CHUNK_BYTES.

    It also sets the attribute _should_log_message if it is needed to log
    the warning message.

    Look at the docstring on StreamingUpload class.

    Returns:
      (int) The chunksize value corrected.
    """
    initial_chunk_size = scaled_integer.ParseInteger(
        properties.VALUES.storage.upload_chunk_size.Get())

    if (initial_chunk_size
        >= self._client.types.ServiceConstants.Values.MAX_WRITE_CHUNK_BYTES):
      adjust_chunk_size = (
          initial_chunk_size
          % self._client.types.ServiceConstants.Values.MAX_WRITE_CHUNK_BYTES
      )
      if adjust_chunk_size > 0:
        self._log_chunk_warning = True

      return initial_chunk_size - adjust_chunk_size

    self._log_chunk_warning = True
    return self._client.types.ServiceConstants.Values.MAX_WRITE_CHUNK_BYTES

  def _log_message(self):
    if not self._log_chunk_warning:
      return

    initial_chunk_size = scaled_integer.ParseInteger(
        properties.VALUES.storage.upload_chunk_size.Get())

    log.warning(
        (
            'Data will be sent in chunks of {} instead of {},'
            ' as configured in the storage/upload_chunk_size config value.'
        ).format(scaled_integer.FormatBinaryNumber(self._chunk_size),
                 scaled_integer.FormatBinaryNumber(initial_chunk_size))
    )

  def _perform_upload(self, upload_id):
    self._log_message()
    response = None
    while True:
      response = self._call_write_object(upload_id)
      if self._source_stream_finished:
        break
      self._start_offset = response.persisted_size
    return response