HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/api_lib/storage/storage_api.py
# -*- coding: utf-8 -*- #
# Copyright 2015 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Utilities for interacting with Google Cloud Storage.

This makes use of both the Cloud Storage API as well as the gsutil command-line
tool. We use the command-line tool for syncing the contents of buckets as well
as listing the contents. We use the API for checking ACLs.
"""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import io
import mimetypes
import os

from apitools.base.py import exceptions as api_exceptions
from apitools.base.py import list_pager
from apitools.base.py import transfer

from googlecloudsdk.api_lib.storage import storage_util
from googlecloudsdk.api_lib.util import exceptions as http_exc
from googlecloudsdk.calliope import exceptions
from googlecloudsdk.command_lib.iam import iam_util
from googlecloudsdk.core import exceptions as core_exc
from googlecloudsdk.core import log
from googlecloudsdk.core import properties
from googlecloudsdk.core.credentials import transports
from googlecloudsdk.core.util import scaled_integer

import six


class Error(core_exc.Error):
  """Base exception for storage API module."""


class BucketNotFoundError(Error):
  """Error raised when the bucket specified does not exist."""


class ListBucketError(Error):
  """Error raised when there are problems listing the contents of a bucket."""


class UploadError(Error):
  """Error raised when there are problems uploading files."""


class BucketInWrongProjectError(Error):
  """Error raised when a bucket exists in a project the user doesn't own.

  Specifically, this applies when a command creates a bucket if it doesn't
  exist, or returns the existing bucket otherwise. If the bucket exists but is
  owned by a different project, it could belong to a malicious user squatting on
  the bucket name.
  """


def _GetMimetype(local_path):
  mime_type, _ = mimetypes.guess_type(local_path)
  return mime_type or 'application/octet-stream'


def _GetFileSize(local_path):
  try:
    return os.path.getsize(six.ensure_str(local_path))
  except os.error:
    raise exceptions.BadFileException('[{0}] not found or not accessible'
                                      .format(local_path))


class StorageClient(object):
  """Client for Google Cloud Storage API."""

  def __init__(self, client=None, messages=None):
    self.client = client or storage_util.GetClient()
    self.messages = messages or storage_util.GetMessages()

  def _GetChunkSize(self):
    """Returns the property defined chunksize corrected for server granularity.

    Chunk size for GCS must be a multiple of 256 KiB. This functions rounds up
    the property defined chunk size to the nearest chunk size interval.
    """
    gcs_chunk_granularity = 256 * 1024  # 256 KiB
    chunksize = scaled_integer.ParseInteger(
        properties.VALUES.storage.upload_chunk_size.Get())
    if chunksize == 0:
      chunksize = None  # Use apitools default (1048576 B)
    elif chunksize % gcs_chunk_granularity != 0:
      chunksize += gcs_chunk_granularity - (chunksize % gcs_chunk_granularity)
    return chunksize

  def ListBuckets(self, project):
    """List the buckets associated with the given project."""
    request = self.messages.StorageBucketsListRequest(project=project)
    for b in list_pager.YieldFromList(self.client.buckets,
                                      request, batch_size=None):
      yield b

  def Copy(self, src, dst):
    """Copy one GCS object to another.

    Args:
      src: Resource, the storage object resource to be copied from.
      dst: Resource, the storage object resource to be copied to.

    Returns:
      Object, the storage object that was copied to.
    """
    return self.client.objects.Copy(
        self.messages.StorageObjectsCopyRequest(
            sourceBucket=src.bucket,
            sourceObject=src.object,
            destinationBucket=dst.bucket,
            destinationObject=dst.object,
        ))

  def Rewrite(self, src, dst):
    """Rewrite one GCS object to another.

    This method has the same result as the Copy method, but can handle moving
    large objects that may potentially timeout a Copy request.

    Args:
      src: Resource, the storage object resource to be copied from.
      dst: Resource, the storage object resource to be copied to.

    Returns:
      Object, the storage object that was copied to.
    """
    rewrite_token = None
    while True:
      resp = self.client.objects.Rewrite(
          self.messages.StorageObjectsRewriteRequest(
              sourceBucket=src.bucket,
              sourceObject=src.object,
              destinationBucket=dst.bucket,
              destinationObject=dst.object,
              rewriteToken=rewrite_token,
          ))
      if resp.done:
        return resp.resource
      rewrite_token = resp.rewriteToken

  def GetObject(self, object_ref):
    """Gets an object from the given Cloud Storage bucket.

    Args:
      object_ref: storage_util.ObjectReference, The user-specified bucket to
        download from.

    Returns:
      Object: a StorageV1 Object message with details about the object.
    """
    return self.client.objects.Get(self.messages.StorageObjectsGetRequest(
        bucket=object_ref.bucket,
        object=object_ref.object))

  def CopyFileToGCS(self, local_path, target_obj_ref):
    """Upload a file to the GCS results bucket using the storage API.

    Args:
      local_path: str, the path of the file to upload. File must be on the local
        filesystem.
      target_obj_ref: storage_util.ObjectReference, the path of the file on GCS.

    Returns:
      Object, the storage object that was copied to.

    Raises:
      BucketNotFoundError if the user-specified bucket does not exist.
      UploadError if the file upload is not successful.
      exceptions.BadFileException if the uploaded file size does not match the
          size of the local file.
    """
    file_size = _GetFileSize(local_path)
    src_obj = self.messages.Object(size=file_size)
    mime_type = _GetMimetype(local_path)

    chunksize = self._GetChunkSize()
    upload = transfer.Upload.FromFile(
        six.ensure_str(local_path), mime_type=mime_type, chunksize=chunksize)
    insert_req = self.messages.StorageObjectsInsertRequest(
        bucket=target_obj_ref.bucket,
        name=target_obj_ref.object,
        object=src_obj)

    gsc_path = '{bucket}/{target_path}'.format(
        bucket=target_obj_ref.bucket, target_path=target_obj_ref.object,
    )

    log.info('Uploading [{local_file}] to [{gcs}]'.format(local_file=local_path,
                                                          gcs=gsc_path))
    try:
      response = self.client.objects.Insert(insert_req, upload=upload)
    except api_exceptions.HttpNotFoundError:
      raise BucketNotFoundError(
          'Could not upload file: [{bucket}] bucket does not exist.'
          .format(bucket=target_obj_ref.bucket))
    except api_exceptions.HttpError as err:
      log.debug('Could not upload file [{local_file}] to [{gcs}]: {e}'.format(
          local_file=local_path, gcs=gsc_path,
          e=http_exc.HttpException(err)))
      raise UploadError(
          '{code} Could not upload file [{local_file}] to [{gcs}]: {message}'
          .format(code=err.status_code, local_file=local_path, gcs=gsc_path,
                  message=http_exc.HttpException(
                      err, error_format='{status_message}')))
    finally:
      # If the upload fails with an error, apitools (for whatever reason)
      # doesn't close the file object, so we have to call this ourselves to
      # force it to happen.
      upload.stream.close()

    if response.size != file_size:
      log.debug('Response size: {0} bytes, but local file is {1} bytes.'.format(
          response.size, file_size))
      raise exceptions.BadFileException(
          'Cloud storage upload failure. Uploaded file does not match local '
          'file: {0}. Please retry.'.format(local_path))
    return response

  def CopyFileFromGCS(self, source_obj_ref, local_path, overwrite=False):
    """Download a file from the given Cloud Storage bucket.

    Args:
      source_obj_ref: storage_util.ObjectReference, the path of the file on GCS
        to download.
      local_path: str, the path of the file to download to. Path must be on the
        local filesystem.
      overwrite: bool, whether or not to overwrite local_path if it already
        exists.

    Raises:
      BadFileException if the file download is not successful.
    """
    chunksize = self._GetChunkSize()
    download = transfer.Download.FromFile(
        local_path, chunksize=chunksize, overwrite=overwrite)
    download.bytes_http = transports.GetApitoolsTransport(
        response_encoding=None)
    get_req = self.messages.StorageObjectsGetRequest(
        bucket=source_obj_ref.bucket,
        object=source_obj_ref.object)

    gsc_path = '{bucket}/{object_path}'.format(
        bucket=source_obj_ref.bucket, object_path=source_obj_ref.object,
    )

    log.info('Downloading [{gcs}] to [{local_file}]'.format(
        local_file=local_path, gcs=gsc_path))
    try:
      self.client.objects.Get(get_req, download=download)
      # When there's a download, Get() returns None so we Get() again to check
      # the file size.
      response = self.client.objects.Get(get_req)
    except api_exceptions.HttpError as err:
      raise exceptions.BadFileException(
          'Could not copy [{gcs}] to [{local_file}]. Please retry: {err}'
          .format(local_file=local_path, gcs=gsc_path,
                  err=http_exc.HttpException(err)))
    finally:
      # Close the stream to release the file handle so we can check its contents
      download.stream.close()

    file_size = _GetFileSize(local_path)
    if response.size != file_size:
      log.debug('Download size: {0} bytes, but expected size is {1} '
                'bytes.'.format(file_size, response.size))
      raise exceptions.BadFileException(
          'Cloud Storage download failure. Downloaded file [{0}] does not '
          'match Cloud Storage object. Please retry.'.format(local_path))

  def ReadObject(self, object_ref):
    """Read a file from the given Cloud Storage bucket.

    Args:
      object_ref: storage_util.ObjectReference, The object to read from.

    Raises:
      BadFileException if the file read is not successful.

    Returns:
      file-like object containing the data read.
    """
    data = io.BytesIO()
    chunksize = self._GetChunkSize()
    download = transfer.Download.FromStream(data, chunksize=chunksize)
    download.bytes_http = transports.GetApitoolsTransport(
        response_encoding=None)
    get_req = self.messages.StorageObjectsGetRequest(
        bucket=object_ref.bucket,
        object=object_ref.object)

    log.info('Reading [%s]', object_ref)
    try:
      self.client.objects.Get(get_req, download=download)
    except api_exceptions.HttpError as err:
      raise exceptions.BadFileException(
          'Could not read [{object_}]. Please retry: {err}'.format(
              object_=object_ref, err=http_exc.HttpException(err)))

    data.seek(0)
    return data

  def GetBucket(self, bucket, projection=None):
    """Gets a bucket from GCS, if it exists.

    Args:
      bucket: str, The bucket name.
      projection: int, The fields to get as part of this request. This is
        optional and defaults to whatever the server provides.

    Returns:
      Bucket: a StorageV1 Bucket message with details about the bucket.

    Raises:
      BucketNotFoundError if the given bucket does not exist.
    """
    try:
      return self.client.buckets.Get(self.messages.StorageBucketsGetRequest(
          bucket=bucket,
          projection=projection
      ))
    except api_exceptions.HttpNotFoundError:
      # Bucket doesn't exist, we'll try to create it.
      raise BucketNotFoundError('Bucket [{}] does not exist.'.format(bucket))

  def CreateBucketIfNotExists(
      self,
      bucket,
      project=None,
      location=None,
      check_ownership=True,
      enable_uniform_level_access=None,
      enable_public_access_prevention=None,
      soft_delete_duration=None,
      cors=None,
  ):
    """Create a bucket if it does not already exist.

    If it already exists and is accessible by the current user, this method
    returns.

    Args:
      bucket: str, The storage bucket to be created.
      project: str, The project to use for the API request. If None, current
        Cloud SDK project is used.
      location: str, The bucket location/region.
      check_ownership: bool, Whether to check that the resulting bucket belongs
        to the given project. DO NOT SET THIS TO FALSE if the bucket name can be
        guessed and claimed ahead of time by another user as it enables a name
        squatting exploit.
      enable_uniform_level_access: bool, to enable uniform bucket level access.
        If None, the uniformBucketLevelAccess field will be set to None in the
        bucket creation request, which means that it will use the default
        values.
      enable_public_access_prevention: bool, to enable public access prevention.
        If None, the publicAccessPrevention field will be set to None in the
        bucket creation request, which means that it will use the default
        values.
      soft_delete_duration: int, the soft delete duration in seconds.
      cors: list, A list of CorsValueListEntry objects. The bucket's
        Cross-Origin Resource Sharing (CORS) configuration. If None, no CORS
        configuration will be set.

    Raises:
      api_exceptions.HttpError: If the bucket is not able to be created or is
        not accessible due to permissions.
      BucketInWrongProjectError: If the bucket already exists in a different
        project. This could belong to a malicious user squatting on the bucket
        name.
    """
    project = project or properties.VALUES.core.project.Get(required=True)

    # Previous iterations of this code always attempted to Insert the bucket
    # and interpreted conflict errors to mean the bucket already existed; this
    # avoids a race condition, but meant that checking bucket existence was
    # subject to a lower-QPS rate limit for bucket creation/deletion. Instead,
    # we do a racy Get-then-Insert which is subject to a higher rate limit, and
    # still have to handle conflict errors in case of a race.
    try:
      self.client.buckets.Get(self.messages.StorageBucketsGetRequest(
          bucket=bucket,
      ))
    except api_exceptions.HttpNotFoundError:
      # Bucket doesn't exist, we'll try to create it.
      storage_buckets_insert_request = (
          self.messages.StorageBucketsInsertRequest(
              project=project,
              bucket=self.messages.Bucket(name=bucket,
                                          location=location)))
      iam_configuration = self.messages.Bucket.IamConfigurationValue()
      if enable_uniform_level_access is not None:
        iam_configuration.uniformBucketLevelAccess = self.messages.Bucket.IamConfigurationValue.UniformBucketLevelAccessValue(
            enabled=enable_uniform_level_access
        )
      if enable_public_access_prevention is not None:
        if enable_public_access_prevention:
          iam_configuration.publicAccessPrevention = 'enforced'
        else:
          iam_configuration.publicAccessPrevention = 'inherited'
      # Only set iam_configuration on the new bucket if it has some data in it.
      if iam_configuration != self.messages.Bucket.IamConfigurationValue():
        storage_buckets_insert_request.bucket.iamConfiguration = (
            iam_configuration
        )
      if cors is not None:
        storage_buckets_insert_request.bucket.cors = cors
      if soft_delete_duration is not None:
        storage_buckets_insert_request.bucket.softDeletePolicy = (
            self.messages.Bucket.SoftDeletePolicyValue(
                retentionDurationSeconds=soft_delete_duration
            )
        )
      try:
        self.client.buckets.Insert(storage_buckets_insert_request)
      except api_exceptions.HttpConflictError:
        # We lost a race with another process creating the bucket. At least we
        # know the bucket exists. But we must check again whether the
        # newly-created bucket is accessible to us, so we Get it again.
        self.client.buckets.Get(self.messages.StorageBucketsGetRequest(
            bucket=bucket,
        ))
      else:
        # We just created the bucket ourselves; no need to check for ownership.
        return

    if not check_ownership:
      return
    # Check that the bucket is in the user's project to prevent bucket squatting
    # exploit (see b/33046325, b/169171543).
    bucket_list_req = self.messages.StorageBucketsListRequest(
        project=project, prefix=bucket)
    bucket_list = self.client.buckets.List(bucket_list_req)
    if not any(b.id == bucket for b in bucket_list.items):
      raise BucketInWrongProjectError(
          'Unable to create bucket [{}] as it already exists in another '
          'project.'.format(bucket))

  def GetBucketLocationForFile(self, object_path):
    """Returns the location of the bucket for a file.

    Args:
      object_path: str, the path of the file in GCS.

    Returns:
      str, bucket location (region) for given object in GCS.

    Raises:
      BucketNotFoundError if bucket from the object path is not found.
    """

    object_reference = storage_util.ObjectReference.FromUrl(object_path)
    bucket_name = object_reference.bucket
    get_bucket_req = self.messages.StorageBucketsGetRequest(
        bucket=bucket_name)

    try:
      source_bucket = self.client.buckets.Get(get_bucket_req)
      return source_bucket.location
    except api_exceptions.HttpNotFoundError:
      raise BucketNotFoundError(
          'Could not get location for file: [{bucket}] bucket does not exist.'
          .format(bucket=bucket_name))

  def ListBucket(self, bucket_ref, prefix=None):
    """Lists the contents of a cloud storage bucket.

    Args:
      bucket_ref: The reference to the bucket.
      prefix: str, Filter results to those whose names begin with this prefix.

    Yields:
      Object messages.

    Raises:
      BucketNotFoundError if the user-specified bucket does not exist.
      ListBucketError if there was an error listing the bucket.
    """
    request = self.messages.StorageObjectsListRequest(
        bucket=bucket_ref.bucket, prefix=prefix)

    try:
      # batch_size=None gives us the API default
      for obj in list_pager.YieldFromList(self.client.objects,
                                          request, batch_size=None):
        yield obj
    except api_exceptions.HttpNotFoundError:
      raise BucketNotFoundError(
          'Could not list bucket: [{bucket}] bucket does not exist.'
          .format(bucket=bucket_ref.bucket))
    except api_exceptions.HttpError as e:
      log.debug('Could not list bucket [{bucket}]: {e}'.format(
          bucket=bucket_ref.bucket, e=http_exc.HttpException(e)))
      raise ListBucketError(
          '{code} Could not list bucket [{bucket}]: {message}'
          .format(code=e.status_code, bucket=bucket_ref.bucket,
                  message=http_exc.HttpException(
                      e, error_format='{status_message}')))

  def DeleteObject(self, object_ref):
    """Delete the specified object.

    Args:
      object_ref: storage_util.ObjectReference, The object to delete.
    """
    self.client.objects.Delete(self.messages.StorageObjectsDeleteRequest(
        bucket=object_ref.bucket,
        object=object_ref.object))

  def DeleteBucket(self, bucket_ref):
    """Delete the specified bucket.

    Args:
      bucket_ref: storage_util.BucketReference to the bucket of the object
    """
    self.client.buckets.Delete(
        self.messages.StorageBucketsDeleteRequest(bucket=bucket_ref.bucket))

  def GetIamPolicy(self, bucket_ref):
    """Fetch the IAM Policy attached to the specified bucket.

    Args:
      bucket_ref: storage_util.BucketReference to the bucket with the policy.

    Returns:
      The bucket's IAM Policy.
    """
    return self.client.buckets.GetIamPolicy(
        self.messages.StorageBucketsGetIamPolicyRequest(
            bucket=bucket_ref.bucket,
            optionsRequestedPolicyVersion=iam_util
            .MAX_LIBRARY_IAM_SUPPORTED_VERSION))

  def SetIamPolicy(self, bucket_ref, policy):
    """Set the IAM Policy attached to the specified bucket to the given policy.

    If 'policy' has no etag specified, this will BLINDLY OVERWRITE the IAM
    policy!

    Args:
      bucket_ref: storage_util.BucketReference to the bucket.
      policy: The new IAM Policy.

    Returns:
      The new IAM Policy.
    """
    return self.client.buckets.SetIamPolicy(
        self.messages.StorageBucketsSetIamPolicyRequest(
            bucket=bucket_ref.bucket, policy=policy))

  def AddIamPolicyBinding(self, bucket_ref, member, role):
    """Add an IAM policy binding on the specified bucket.

    Does an atomic Read-Modify-Write, adding the member to the role.

    Args:
      bucket_ref: storage_util.BucketReference to the bucket with the policy.
      member: Principal to add to the policy binding.
      role: Role to add to the policy binding.

    Returns:
      The new IAM Policy.
    """
    return self.AddIamPolicyBindings(bucket_ref, [(member, role)])

  def AddIamPolicyBindings(self, bucket_ref, member_roles):
    """Add IAM policy bindings on the specified bucket.

    Does an atomic Read-Modify-Write, adding the member to the role.

    Args:
      bucket_ref: storage_util.BucketReference to the bucket with the policy.
      member_roles: List of 2-tuples in the form [(member, role), ...].

    Returns:
      The new IAM Policy.
    """
    policy = self.GetIamPolicy(bucket_ref)
    policy.version = iam_util.MAX_LIBRARY_IAM_SUPPORTED_VERSION

    policy_was_updated = False
    for member, role in member_roles:
      if iam_util.AddBindingToIamPolicy(
          self.messages.Policy.BindingsValueListEntry, policy, member, role):
        policy_was_updated = True

    if policy_was_updated:
      return self.SetIamPolicy(bucket_ref, policy)

    return policy