HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/394/lib/googlecloudsdk/command_lib/storage/tasks/cp/upload_util.py
# -*- coding: utf-8 -*- #
# Copyright 2021 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Utility functions for performing upload operation."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import mimetypes
import os
import subprocess
import threading

from googlecloudsdk.api_lib.storage import api_factory
from googlecloudsdk.api_lib.storage import cloud_api
from googlecloudsdk.api_lib.storage import request_config_factory
from googlecloudsdk.command_lib.storage import buffered_upload_stream
from googlecloudsdk.command_lib.storage import component_stream
from googlecloudsdk.command_lib.storage import errors
from googlecloudsdk.command_lib.storage import fast_crc32c_util
from googlecloudsdk.command_lib.storage import hash_util
from googlecloudsdk.command_lib.storage import progress_callbacks
from googlecloudsdk.command_lib.storage import upload_stream
from googlecloudsdk.command_lib.storage.tasks import task_status
from googlecloudsdk.command_lib.storage.tasks.rm import delete_task
from googlecloudsdk.command_lib.util import crc32c
from googlecloudsdk.core import properties
from googlecloudsdk.core.util import files
from googlecloudsdk.core.util import hashing
from googlecloudsdk.core.util import platforms
from googlecloudsdk.core.util import scaled_integer


COMMON_EXTENSION_RULES = {
    '.md': 'text/markdown',  # b/169088193
    '.tgz': 'application/gzip',  # b/179176339
}


def get_upload_strategy(api, object_length):
  """Determines if resumbale uplaod should be performed.

  Args:
    api (CloudApi): An api instance to check if it supports resumable upload.
    object_length (int): Length of the data to be uploaded.

  Returns:
    bool: True if resumable upload can be performed.
  """
  resumable_threshold = scaled_integer.ParseInteger(
      properties.VALUES.storage.resumable_threshold.Get())
  if (object_length >= resumable_threshold and
      cloud_api.Capability.RESUMABLE_UPLOAD in api.capabilities):
    return cloud_api.UploadStrategy.RESUMABLE
  else:
    return cloud_api.UploadStrategy.SIMPLE


def get_content_type(source_path, is_stream):
  """Gets a file's MIME type.

  Favors returning the result of `file -b --mime ...` if the command is
  available and users have enabled it. Otherwise, it returns a type based on the
  file's extension.

  Args:
    source_path (str): Path to file. May differ from file_resource.storage_url
      if using a temporary file (e.g. for gzipping).
    is_stream (bool): If the source file is a pipe (typically FIFO or stdin).

  Returns:
    A MIME type (str).
    If a type cannot be guessed, request_config_factory.DEFAULT_CONTENT_TYPE is
    returned.
  """
  if is_stream:
    return request_config_factory.DEFAULT_CONTENT_TYPE

  # Some common extensions are not recognized by the mimetypes library and
  # "file" command, so we'll hard-code support for them.
  for extension, content_type in COMMON_EXTENSION_RULES.items():
    if source_path.endswith(extension):
      return content_type

  if (not platforms.OperatingSystem.IsWindows() and
      properties.VALUES.storage.use_magicfile.GetBool()):
    output = subprocess.run(['file', '-b', '--mime', source_path],
                            check=True,
                            stdout=subprocess.PIPE,
                            universal_newlines=True)
    content_type = output.stdout.strip()
  else:
    content_type, _ = mimetypes.guess_type(source_path)
  if content_type:
    return content_type
  return request_config_factory.DEFAULT_CONTENT_TYPE


def get_digesters(source_resource, destination_resource):
  """Gets appropriate hash objects for upload validation.

  Args:
    source_resource (resource_reference.FileObjectResource): The upload source.
    destination_resource (resource_reference.ObjectResource): The upload
      destination.

  Returns:
    A dict[hash_util.HashAlgorithm, hash object], the values of which should be
    updated with uploaded bytes.
  """
  provider = destination_resource.storage_url.scheme
  bucket_name = (
      destination_resource.storage_url.bucket_name
      if properties.VALUES.storage.enable_zonal_buckets_bidi_streaming.GetBool()
      else None
  )
  capabilities = api_factory.get_capabilities(provider, bucket_name)
  check_hashes = properties.CheckHashes(
      properties.VALUES.storage.check_hashes.Get())

  # If the API supports appendable uploads, we should use CRC32C only.
  if cloud_api.Capability.APPENDABLE_UPLOAD in capabilities:
    fast_crc32c_util.log_or_raise_crc32c_issues()
    if (
        check_hashes == properties.CheckHashes.ALWAYS
        or fast_crc32c_util.check_if_will_use_fast_crc32c()
    ):
      return {hash_util.HashAlgorithm.CRC32C: fast_crc32c_util.get_crc32c()}
    return {}

  if (source_resource.md5_hash or
      cloud_api.Capability.CLIENT_SIDE_HASH_VALIDATION in capabilities or
      check_hashes == properties.CheckHashes.NEVER):
    return {}
  return {hash_util.HashAlgorithm.MD5: hashing.get_md5()}


def get_stream(source_resource,
               length=None,
               offset=None,
               digesters=None,
               task_status_queue=None,
               destination_resource=None,
               component_number=None,
               total_components=None):
  """Gets a stream to use for an upload.

  Args:
    source_resource (resource_reference.FileObjectResource): Contains a path to
      the source file.
    length (int|None): The total number of bytes to be uploaded.
    offset (int|None): The position of the first byte to be uploaded.
    digesters (dict[hash_util.HashAlgorithm, hash object]|None): Hash objects to
      be populated as bytes are read.
    task_status_queue (multiprocessing.Queue|None): Used for sending progress
      messages. If None, no messages will be generated or sent.
    destination_resource (resource_reference.ObjectResource|None): The upload
      destination. Used for progress reports, and should be specified if
      task_status_queue is.
    component_number (int|None): Identifies a component in composite uploads.
    total_components (int|None): The total number of components used in a
      composite upload.

  Returns:
    An UploadStream wrapping the file specified by source_resource.
  """
  if task_status_queue:
    progress_callback = progress_callbacks.FilesAndBytesProgressCallback(
        status_queue=task_status_queue,
        offset=offset or 0,
        length=length,
        source_url=source_resource.storage_url,
        destination_url=destination_resource.storage_url,
        component_number=component_number,
        total_components=total_components,
        operation_name=task_status.OperationName.UPLOADING,
        process_id=os.getpid(),
        thread_id=threading.get_ident(),
    )
  else:
    progress_callback = None

  if source_resource.storage_url.is_stdio:
    source_stream = os.fdopen(0, 'rb')
  else:
    source_stream = files.BinaryFileReader(
        source_resource.storage_url.resource_name)

  if source_resource.storage_url.is_stream:
    max_buffer_size = scaled_integer.ParseBinaryInteger(
        properties.VALUES.storage.upload_chunk_size.Get())
    return buffered_upload_stream.BufferedUploadStream(
        source_stream,
        max_buffer_size=max_buffer_size,
        digesters=digesters,
        progress_callback=progress_callback)
  elif offset is None:
    return upload_stream.UploadStream(
        source_stream,
        length=length,
        digesters=digesters,
        progress_callback=progress_callback)
  else:
    return component_stream.ComponentStream(
        source_stream, offset=offset, length=length, digesters=digesters,
        progress_callback=progress_callback)


def validate_uploaded_object(digesters, uploaded_resource, task_status_queue):
  """Raises error if hashes for uploaded_resource and digesters do not match."""
  if not digesters:
    return
  if hash_util.HashAlgorithm.MD5 in digesters:
    calculated_digest = hash_util.get_base64_hash_digest_string(
        digesters[hash_util.HashAlgorithm.MD5])
    destination_hash = uploaded_resource.md5_hash
  elif hash_util.HashAlgorithm.CRC32C in digesters:
    calculated_digest = crc32c.get_hash(
        digesters[hash_util.HashAlgorithm.CRC32C]
    )
    destination_hash = uploaded_resource.crc32c_hash
  else:
    raise errors.Error(
        'Unsupported hash algorithm(s) found in digesters: {}'.format(
            ', '.join(digesters.keys())
        )
    )
  try:
    hash_util.validate_object_hashes_match(
        uploaded_resource.storage_url.url_string, calculated_digest,
        destination_hash)
  except errors.HashMismatchError:
    delete_task.DeleteObjectTask(uploaded_resource.storage_url).execute(
        task_status_queue=task_status_queue
    )
    raise