HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/command_lib/storage/storage_parallel.py
# -*- coding: utf-8 -*- #
# Copyright 2016 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilities for parallelizing Cloud Storage file operations.

Example usage (for simplicity, use absolute *nix-style paths; in practice you'll
want to use os.path.join and friends):

>>> upload_tasks = [
...     FileUploadTask('/tmp/file1.txt', 'gs://my-bucket',
...                    'path/to/remote1.txt'),
...     FileUploadTask('/tmp/file2.txt', 'gs://my-bucket', '/remote2.txt')
... ]
>>> UploadFiles(upload_tasks, num_threads=16)

This will block until all files are uploaded, using 16 threads (but just the
current process). Afterwards, there will be objects at
'gs://my-bucket/path/to/remote1.txt' and 'gs://my-bucket/remote2.txt'.

>>> delete_tasks = [
...     ObjectDeleteTask('gs://my-bucket', 'path/to/remote1.txt'),
...     ObjectDeleteTask('gs://my-bucket', '/remote2.txt')
... ]
>>> DeleteObjects(delete_tasks, num_threads=16)

This removes the objects uploaded in the last code snippet.
"""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import abc

from googlecloudsdk.api_lib.storage import storage_api
from googlecloudsdk.core import log
from googlecloudsdk.core.console import console_io
from googlecloudsdk.core.util import parallel
from googlecloudsdk.core.util import retry
from googlecloudsdk.core.util import text

import six


# This default value has been chosen after lots of experimentation.
DEFAULT_NUM_THREADS = 16


class Task(six.with_metaclass(abc.ABCMeta)):
  """Base clase for a storage tasks that can be parallelized."""

  @abc.abstractmethod
  def Execute(self, callback=None):
    pass


class FileUploadTask(Task):
  """Self-contained representation of a file to upload and its destination.

  Attributes:
    source_local_path: str, The local filesystem path of the source file to
      upload.
    dest_obj_ref: storage_util.ObjectReference, The object the file will be
      copied to.
  """

  def __init__(self, source_local_path, dest_obj_ref):
    self.source_local_path = source_local_path
    self.dest_obj_ref = dest_obj_ref

  def __str__(self):
    return 'Upload: {} --> {}'.format(
        self.source_local_path, self.dest_obj_ref.ToUrl())

  def __repr__(self):
    return (
        'FileUploadTask(source_path={source_path}, dest_path={dest_path})'
        .format(source_path=self.source_local_path,
                dest_path=self.dest_obj_ref.ToUrl()))

  def __hash__(self):
    return hash((self.source_local_path, self.dest_obj_ref))

  def Execute(self, callback=None):
    storage_client = storage_api.StorageClient()
    retry.Retryer(max_retrials=3).RetryOnException(
        storage_client.CopyFileToGCS,
        args=(self.source_local_path, self.dest_obj_ref))
    if callback:
      callback()


class FileDownloadTask(Task):
  """Self-contained representation of a file to download and its destination.

  Attributes:
    source_obj_ref: storage_util.ObjectReference, The object reference of the
      file to download.
    dest_local_path: str, The local filesystem path to write the file to.
  """

  def __init__(self, source_obj_ref, dest_local_path):
    self.source_obj_ref = source_obj_ref
    self.dest_local_path = dest_local_path

  def __str__(self):
    return 'Download: {} --> {}'.format(
        self.source_obj_ref.ToUrl(), self.dest_local_path)

  def __repr__(self):
    return (
        'FileDownloadTask(source_path={source_path}, dest_path={dest_path})'
        .format(source_path=self.source_obj_ref.ToUrl(),
                dest_path=self.dest_local_path))

  def __hash__(self):
    return hash((self.source_obj_ref, self.dest_local_path))

  def Execute(self, callback=None):
    storage_client = storage_api.StorageClient()
    retry.Retryer(max_retrials=3).RetryOnException(
        storage_client.CopyFileFromGCS,
        args=(self.source_obj_ref, self.dest_local_path))
    if callback:
      callback()


class FileRemoteCopyTask(Task):
  """Self-contained representation of a copy between GCS objects.

  Attributes:
    source_obj_ref: storage_util.ObjectReference, The object reference of the
      file to download.
    dest_obj_ref: storage_util.ObjectReference, The object reference to write
      the file to.
  """

  def __init__(self, source_obj_ref, dest_obj_ref):
    self.source_obj_ref = source_obj_ref
    self.dest_obj_ref = dest_obj_ref

  def __str__(self):
    return 'Copy: {} --> {}'.format(
        self.source_obj_ref.ToUrl(), self.dest_obj_ref.ToUrl())

  def __repr__(self):
    return (
        'FileRemoteCopyTask(source_path={source_path}, dest_path={dest_path})'
        .format(source_path=self.source_obj_ref.ToUrl(),
                dest_path=self.dest_obj_ref.ToUrl()))

  def __hash__(self):
    return hash((self.source_obj_ref, self.dest_obj_ref))

  def Execute(self, callback=None):
    storage_client = storage_api.StorageClient()
    retry.Retryer(max_retrials=3).RetryOnException(
        storage_client.Copy,
        args=(self.source_obj_ref, self.dest_obj_ref))
    if callback:
      callback()


class ObjectDeleteTask(Task):
  """Self-contained representation of an object to delete.

  Attributes:
    obj_ref: storage_util.ObjectReference, The object to delete.
  """

  def __init__(self, obj_ref):
    self.obj_ref = obj_ref

  def __str__(self):
    return 'Delete: {}'.format(self.obj_ref.ToUrl())

  def __repr__(self):
    return 'ObjectDeleteTask(object={obj}'.format(obj=self.obj_ref.ToUrl())

  def __hash__(self):
    return hash(self.obj_ref)

  def Execute(self, callback=None):
    """Complete one ObjectDeleteTask (safe to run in parallel)."""
    storage_client = storage_api.StorageClient()
    retry.Retryer(max_retrials=3).RetryOnException(
        storage_client.DeleteObject, args=(self.obj_ref,))
    if callback:
      callback()


def ExecuteTasks(tasks, num_threads=DEFAULT_NUM_THREADS,
                 progress_bar_label=None):
  """Perform the given storage tasks in parallel.

  Factors out common work: logging, setting up parallelism, managing a progress
  bar (if necessary).

  Args:
    tasks: [Operation], To be executed in parallel.
    num_threads: int, The number of threads to use
    progress_bar_label: str, If set, a progress bar will be shown with this
      label. Otherwise, no progress bar is displayed.
  """
  log.debug(progress_bar_label)
  log.debug('Using [%d] threads', num_threads)

  pool = parallel.GetPool(num_threads)
  if progress_bar_label:
    progress_bar = console_io.TickableProgressBar(
        len(tasks), progress_bar_label)
    callback = progress_bar.Tick
  else:
    progress_bar = console_io.NoOpProgressBar()
    callback = None

  if num_threads == 0:
    with progress_bar:
      for t in tasks:
        t.Execute(callback)
  else:
    with progress_bar, pool:
      pool.Map(lambda task: task.Execute(callback), tasks)


def UploadFiles(files_to_upload, num_threads=DEFAULT_NUM_THREADS,
                show_progress_bar=False):
  """Upload the given files to the given Cloud Storage URLs.

  Uses the appropriate parallelism (multi-process, multi-thread, both, or
  synchronous).

  Args:
    files_to_upload: list of FileUploadTask
    num_threads: int (optional), the number of threads to use.
    show_progress_bar: bool. If true, show a progress bar to the users when
      uploading files.
  """
  num_files = len(files_to_upload)
  if show_progress_bar:
    label = 'Uploading {} {} to Google Cloud Storage'.format(
        num_files, text.Pluralize(num_files, 'file'))
  else:
    label = None
  ExecuteTasks(files_to_upload, num_threads, label)


def DeleteObjects(objects_to_delete, num_threads=DEFAULT_NUM_THREADS,
                  show_progress_bar=False):
  """Delete the given Cloud Storage objects.

  Uses the appropriate parallelism (multi-process, multi-thread, both, or
  synchronous).

  Args:
    objects_to_delete: list of ObjectDeleteTask
    num_threads: int (optional), the number of threads to use.
    show_progress_bar: bool. If true, show a progress bar to the users when
      deleting files.
  """
  num_objects = len(objects_to_delete)
  if show_progress_bar:
    label = 'Deleting {} {} from Google Cloud Storage'.format(
        num_objects, text.Pluralize(num_objects, 'object'))
  else:
    label = None
  ExecuteTasks(objects_to_delete, num_threads, label)