HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/api_lib/storage/gcs_json/upload.py
# -*- coding: utf-8 -*- #
# Copyright 2023 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Classes that represent and execute different upload strategies for GCS."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import abc
import copy
import json

from apitools.base.py import encoding_helper
from apitools.base.py import transfer
from googlecloudsdk.api_lib.storage import errors
from googlecloudsdk.api_lib.storage import retry_util
from googlecloudsdk.api_lib.storage.gcs_json import metadata_util
from googlecloudsdk.api_lib.util import apis
from googlecloudsdk.command_lib.storage.resources import gcs_resource_reference
from googlecloudsdk.command_lib.storage.resources import resource_reference
from googlecloudsdk.command_lib.storage.resources import s3_resource_reference
from googlecloudsdk.command_lib.storage.tasks.cp import copy_util
from googlecloudsdk.core import log
from googlecloudsdk.core import properties
from googlecloudsdk.core.util import retry
from googlecloudsdk.core.util import scaled_integer
import six


class _Upload(six.with_metaclass(abc.ABCMeta, object)):
  """Base class shared by different upload strategies."""

  def __init__(
      self,
      gcs_api,
      http_client,
      source_stream,
      destination_resource,
      should_gzip_in_flight,
      request_config,
      posix_to_set=None,
      source_resource=None,
  ):
    """Initializes an _Upload instance.

    Args:
      gcs_api (gcs_api.GcsApi): The API used to execute the upload request.
      http_client: An httplib2.Http-like object.
      source_stream (io.IOBase): Yields bytes to upload.
      destination_resource (resource_reference.ObjectResource|UnknownResource):
        Metadata for the destination object.
      should_gzip_in_flight (bool): Should gzip encode upload in flight.
      request_config (gcs_api.GcsRequestConfig): Tracks additional request
        preferences.
      posix_to_set (PosixAttributes|None): Set as custom metadata on target.
      source_resource (FileObjectResource|ObjectResource|None): Contains the
        source StorageUrl and source object metadata for daisy chain transfers.
        Can be None if source is pure stream.
    """
    self._gcs_api = gcs_api
    self._http_client = http_client
    self._source_stream = source_stream
    self._destination_resource = destination_resource
    self._should_gzip_in_flight = should_gzip_in_flight
    self._request_config = request_config

    self._posix_to_set = posix_to_set
    self._source_resource = source_resource

    self._messages = apis.GetMessagesModule('storage', 'v1')

  def _copy_acl_from_source_if_source_is_a_cloud_object_and_preserve_acl_is_true(
      self, destination_metadata
  ):
    if (
        isinstance(self._source_resource, resource_reference.ObjectResource)
        and self._request_config.resource_args.preserve_acl
        and hasattr(self._source_resource.metadata, 'acl')
    ):

      destination_metadata.acl = copy.deepcopy(
          self._source_resource.metadata.acl
      )

  def _update_object_metadata_with_contexts(self, object_metadata):
    """Update object metadata with contexts from the source resource."""
    contexts_to_set = None
    if isinstance(
        self._source_resource, gcs_resource_reference.GcsObjectResource
    ):
      contexts_to_set = (
          metadata_util.parse_custom_contexts_dict_from_resource_contexts_dict(
              self._source_resource.contexts
          )
      )
    elif isinstance(
        self._source_resource, s3_resource_reference.S3ObjectResource
    ):
      # Treat S3 tags as contexts.
      if self._source_resource.tags:
        contexts_to_set = (
            metadata_util.get_contexts_dict_from_custom_contexts_dict({
                key: (
                    metadata_util.get_context_value_dict_from_value(
                        value
                    )
                )
                for key, value in self._source_resource.tags.items()
            })
        )

    if contexts_to_set:
      object_metadata.contexts = encoding_helper.DictToMessage(
          contexts_to_set,
          self._messages.Object.ContextsValue,
      )

  def _get_validated_insert_request(self):
    """Get an insert request that includes validated object metadata."""
    if self._request_config.predefined_acl_string:
      predefined_acl = getattr(
          self._messages.StorageObjectsInsertRequest
          .PredefinedAclValueValuesEnum,
          self._request_config.predefined_acl_string)
    else:
      predefined_acl = None

    object_metadata = self._messages.Object(
        name=self._destination_resource.storage_url.resource_name,
        bucket=self._destination_resource.storage_url.bucket_name)

    if (isinstance(self._source_resource, resource_reference.ObjectResource) and
        self._source_resource.custom_fields):
      object_metadata.metadata = (
          encoding_helper.DictToAdditionalPropertyMessage(
              self._source_resource.custom_fields,
              self._messages.Object.MetadataValue,
          )
      )

    self._update_object_metadata_with_contexts(object_metadata)
    self._copy_acl_from_source_if_source_is_a_cloud_object_and_preserve_acl_is_true(
        object_metadata
    )

    metadata_util.update_object_metadata_from_request_config(
        object_metadata,
        self._request_config,
        attributes_resource=self._source_resource,
        posix_to_set=self._posix_to_set,
        method_type=metadata_util.MethodType.OBJECT_INSERT,
    )

    return self._messages.StorageObjectsInsertRequest(
        bucket=object_metadata.bucket,
        object=object_metadata,
        ifGenerationMatch=copy_util.get_generation_match_value(
            self._request_config),
        ifMetagenerationMatch=(
            self._request_config.precondition_metageneration_match),
        predefinedAcl=predefined_acl)

  @abc.abstractmethod
  def run(self):
    """Performs an upload and returns an Object message."""
    pass


class SimpleUpload(_Upload):
  """Uploads objects with a single request."""

  def run(self):
    resource_args = self._request_config.resource_args
    apitools_upload = transfer.Upload(
        self._source_stream,
        resource_args.content_type,
        gzip_encoded=self._should_gzip_in_flight,
        total_size=resource_args.size)
    apitools_upload.bytes_http = self._http_client
    apitools_upload.strategy = transfer.SIMPLE_UPLOAD

    return self._gcs_api.client.objects.Insert(
        self._get_validated_insert_request(), upload=apitools_upload)


class _BaseRecoverableUpload(_Upload):
  """Common logic for strategies allowing retries in-flight."""

  def _get_upload(self):
    """Returns an apitools upload class used for a new transfer."""
    resource_args = self._request_config.resource_args
    size = getattr(resource_args, 'size', None)
    max_retries = properties.VALUES.storage.max_retries.GetInt()
    apitools_upload = transfer.Upload(
        self._source_stream,
        resource_args.content_type,
        auto_transfer=False,
        chunksize=scaled_integer.ParseInteger(
            properties.VALUES.storage.upload_chunk_size.Get()),
        gzip_encoded=self._should_gzip_in_flight,
        total_size=size,
        num_retries=max_retries)
    apitools_upload.strategy = transfer.RESUMABLE_UPLOAD
    return apitools_upload

  def _initialize_upload(self):
    """Inserts a a new object at the upload destination."""
    if not self._apitools_upload.initialized:
      self._gcs_api.client.objects.Insert(
          self._get_validated_insert_request(), upload=self._apitools_upload)

  @abc.abstractmethod
  def _call_appropriate_apitools_upload_strategy(self):
    """Responsible for pushing bytes to GCS with an appropriate strategy."""
    pass

  def _should_retry_resumable_upload(
      self, exc_type, exc_value, exc_traceback, state):
    """Returns True if the failure should be retried."""
    if not isinstance(exc_value, errors.RetryableApiError):
      return False

    self._apitools_upload.RefreshResumableUploadState()
    if self._apitools_upload.progress > self._last_progress_byte:
      # Progress was made.
      self._last_progress_byte = self._apitools_upload.progress
      state.retrial = 0

    log.debug('Retrying upload after exception: {}.'
              ' Trace: {}'.format(exc_type, exc_traceback))
    return True

  def run(self):
    """Uploads with in-flight retry logic and returns an Object message."""
    self._apitools_upload = self._get_upload()
    self._apitools_upload.bytes_http = self._http_client
    retry_util.set_retry_func(self._apitools_upload)

    self._initialize_upload()

    self._last_progress_byte = self._apitools_upload.progress
    try:
      http_response = retry_util.retryer(
          target=self._call_appropriate_apitools_upload_strategy,
          should_retry_if=self._should_retry_resumable_upload)
    except retry.MaxRetrialsException as e:
      raise errors.ResumableUploadAbortError(
          'Max retrial attempts reached. Aborting upload.'
          'Error: {}'.format(e))

    return self._gcs_api.client.objects.ProcessHttpResponse(
        self._gcs_api.client.objects.GetMethodConfig('Insert'), http_response)


class StreamingUpload(_BaseRecoverableUpload):
  """Uploads objects from a stream with support for error recovery in-flight."""

  def _call_appropriate_apitools_upload_strategy(self):
    """Calls StreamInChunks since the final size is unknown."""
    return self._apitools_upload.StreamInChunks()


class ResumableUpload(_BaseRecoverableUpload):
  """Uploads objects with support for resuming between runs of a command."""

  # pylint: disable=g-doc-args
  def __init__(
      self,
      gcs_api,
      http_client,
      source_stream,
      destination_resource,
      should_gzip_in_flight,
      request_config,
      posix_to_set=None,
      serialization_data=None,
      source_resource=None,
      tracker_callback=None,
  ):
    """Initializes a ResumableUpload instance.

    See super class for arguments not described below.

    New Args:
      serialization_data (dict): JSON used by apitools to resume an upload.
      tracker_callback (Callable[[dict]|None]): Function that writes a tracker
        file with serialization data.
    """
    # pylint: enable=g-doc-args
    super(ResumableUpload, self).__init__(
        gcs_api,
        http_client,
        source_stream,
        destination_resource,
        should_gzip_in_flight,
        request_config,
        posix_to_set=posix_to_set,
        source_resource=source_resource,
    )
    self._serialization_data = serialization_data
    self._tracker_callback = tracker_callback

  def _get_upload(self):
    """Creates a new transfer object, or gets one from serialization data."""
    max_retries = properties.VALUES.storage.max_retries.GetInt()
    if self._serialization_data is not None:
      # FromData implicitly sets strategy as RESUMABLE.
      return transfer.Upload.FromData(
          self._source_stream,
          json.dumps(self._serialization_data),
          self._gcs_api.client.http,
          auto_transfer=False,
          gzip_encoded=self._should_gzip_in_flight,
          num_retries=max_retries)
    else:
      return super(__class__, self)._get_upload()

  def _initialize_upload(self):
    """Inserts an object if not already inserted, and writes a tracker file."""
    if self._serialization_data is None:
      super(__class__, self)._initialize_upload()

    if self._tracker_callback is not None:
      self._tracker_callback(self._apitools_upload.serialization_data)

  def _call_appropriate_apitools_upload_strategy(self):
    """Calls StreamMedia, or StreamInChunks when the final size is unknown."""
    if self._should_gzip_in_flight:
      # We do not know the final size of the file, so we must use chunks.
      return self._apitools_upload.StreamInChunks()
    else:
      # We know the size of the file, so use a strategy that requires fewer
      # round trip API calls.
      return self._apitools_upload.StreamMedia()