HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/396/platform/gsutil/gslib/tests/test_requester_pays.py
# -*- coding: utf-8 -*-
# Copyright 2017 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Integration tests for notification command."""

from __future__ import absolute_import
from __future__ import print_function
from __future__ import division
from __future__ import unicode_literals

import re

import gslib.tests.testcase as testcase
from gslib.project_id import PopulateProjectId
from gslib.tests.testcase.integration_testcase import SkipForS3
from gslib.tests.testcase.integration_testcase import SkipForXML
from gslib.tests.util import ObjectToURI as suri
from gslib.utils.retry_util import Retry
from gslib.utils.constants import UTF8

OBJECT_CONTENTS = b'innards'


@SkipForS3('gsutil doesn\'t support S3 Requester Pays.')
@SkipForXML('Requester Pays is not supported for the XML API.')
class TestRequesterPays(testcase.GsUtilIntegrationTestCase):
  """Integration tests for Requester Pays.

  Passing in a user project should succeed for operations on Requester Pays
  buckets, and with the GA release will also succeed for non-Requester Pays
  buckets.
  """

  _set_rp_cmd = ['requesterpays', 'set']
  _get_rp_cmd = ['requesterpays', 'get']

  def setUp(self):
    super(TestRequesterPays, self).setUp()
    self.non_requester_pays_bucket_uri = self.CreateBucket()
    self.requester_pays_bucket_uri = self.CreateBucket()
    self._set_requester_pays(self.requester_pays_bucket_uri)
    self.non_requester_pays_object_uri = self.CreateObject(
        bucket_uri=self.non_requester_pays_bucket_uri, contents=OBJECT_CONTENTS)
    self.requester_pays_object_uri = self.CreateObject(
        bucket_uri=self.requester_pays_bucket_uri, contents=OBJECT_CONTENTS)
    self.user_project_flag = ['-u', PopulateProjectId()]

  def _set_requester_pays(self, bucket_uri):
    self.RunGsUtil(['requesterpays', 'set', 'on', suri(bucket_uri)])

  def _run_requester_pays_test(self, command_list, regex=None):
    """Test a command with a user project.

    Run a command with a user project on a Requester Pays bucket. The command is
    expected to pass because the source bucket is Requester Pays. If a regex
    pattern is supplied, also assert that stdout of the command matches it.
    """
    stdout = self.RunGsUtil(self.user_project_flag + command_list,
                            return_stdout=True)
    if regex:
      if isinstance(regex, bytes):
        regex = regex.decode(UTF8)
      self.assertRegexpMatchesWithFlags(stdout, regex, flags=re.IGNORECASE)

  def _run_non_requester_pays_test(self, command_list):
    """Test a command with a user project on a non-Requester Pays bucket.

    Run a command with a user project on a non-Requester Pays bucket. The
    command will still succeed, because with GA user project is accepted for
    all requests.
    """
    stdout = self.RunGsUtil(self.user_project_flag + command_list,
                            return_stdout=True)

  def test_off_default(self):
    bucket_uri = self.CreateBucket()
    stdout = self.RunGsUtil(self._get_rp_cmd + [suri(bucket_uri)],
                            return_stdout=True)
    self.assertEqual(stdout.strip(), '%s: Disabled' % suri(bucket_uri))

  def test_turning_on(self):
    bucket_uri = self.CreateBucket()
    self.RunGsUtil(self._set_rp_cmd + ['on', suri(bucket_uri)])

    def _Check1():
      stdout = self.RunGsUtil(self._get_rp_cmd + [suri(bucket_uri)],
                              return_stdout=True)
      self.assertEqual(stdout.strip(), '%s: Enabled' % suri(bucket_uri))

    _Check1()

  def test_turning_off(self):
    bucket_uri = self.CreateBucket()
    self.RunGsUtil(self._set_rp_cmd + ['on', suri(bucket_uri)])

    def _Check1():
      stdout = self.RunGsUtil(self._get_rp_cmd + [suri(bucket_uri)],
                              return_stdout=True)
      self.assertEqual(stdout.strip(), '%s: Enabled' % suri(bucket_uri))

    _Check1()

    self.RunGsUtil(self._set_rp_cmd + ['off', suri(bucket_uri)])

    def _Check2():
      stdout = self.RunGsUtil(self._get_rp_cmd + [suri(bucket_uri)],
                              return_stdout=True)
      self.assertEqual(stdout.strip(), '%s: Disabled' % suri(bucket_uri))

    _Check2()

  def testTooFewArgumentsFails(self):
    """Ensures requesterpays commands fail with too few arguments."""
    # No arguments for set, but valid subcommand.
    stderr = self.RunGsUtil(self._set_rp_cmd,
                            return_stderr=True,
                            expected_status=1)
    self.assertIn('command requires at least', stderr)

    # No arguments for get, but valid subcommand.
    stderr = self.RunGsUtil(self._get_rp_cmd,
                            return_stderr=True,
                            expected_status=1)
    self.assertIn('command requires at least', stderr)

    # Neither arguments nor subcommand.
    stderr = self.RunGsUtil(['requesterpays'],
                            return_stderr=True,
                            expected_status=1)
    self.assertIn('command requires at least', stderr)

  def test_acl(self):
    requester_pays_bucket_uri = self.CreateBucket()
    self._set_requester_pays(requester_pays_bucket_uri)
    self._run_requester_pays_test(
        ['acl', 'set', 'public-read',
         suri(requester_pays_bucket_uri)])
    self._run_requester_pays_test(
        ['acl', 'get', suri(requester_pays_bucket_uri)])

    non_requester_pays_bucket_uri = self.CreateBucket()
    self._run_non_requester_pays_test(
        ['acl', 'set', 'public-read',
         suri(non_requester_pays_bucket_uri)])
    self._run_non_requester_pays_test(
        ['acl', 'get', suri(non_requester_pays_bucket_uri)])

  def test_ls(self):
    self._run_requester_pays_test(['ls', suri(self.requester_pays_bucket_uri)])
    self._run_non_requester_pays_test(
        ['ls', suri(self.non_requester_pays_bucket_uri)])

  def test_rb(self):
    rp_bucket_uri = self.CreateBucket()
    self._set_requester_pays(rp_bucket_uri)
    self._run_requester_pays_test(['rb', suri(rp_bucket_uri)])

    non_rp_bucket_uri = self.CreateBucket()
    self._run_non_requester_pays_test(['rb', suri(non_rp_bucket_uri)])

  def test_copy(self):
    dest_bucket_uri = self.CreateBucket()

    self._run_requester_pays_test(
        ['cp',
         suri(self.requester_pays_object_uri),
         suri(dest_bucket_uri)])
    self._run_non_requester_pays_test(
        ['cp',
         suri(self.non_requester_pays_object_uri),
         suri(dest_bucket_uri)])

  def test_compose(self):
    data_list = [b'apple', b'orange', b'banana']

    bucket_uri = self.CreateBucket()
    components = [
        self.CreateObject(bucket_uri=bucket_uri, contents=data).uri
        for data in data_list
    ]
    composite = self.StorageUriCloneReplaceName(bucket_uri,
                                                self.MakeTempName('obj'))
    self._run_non_requester_pays_test(['compose'] + components +
                                      [composite.uri])

    rp_bucket_uri = self.CreateBucket()
    self._set_requester_pays(rp_bucket_uri)
    rp_components = [
        self.CreateObject(bucket_uri=rp_bucket_uri, contents=data).uri
        for data in data_list
    ]
    rp_composite = suri(rp_bucket_uri) + '/composite.txt'
    self._run_requester_pays_test(['compose'] + rp_components + [rp_composite])

  def test_cat(self):
    self._run_requester_pays_test(
        ['cat', suri(self.requester_pays_object_uri)], regex=OBJECT_CONTENTS)
    self._run_non_requester_pays_test(
        ['cat', suri(self.non_requester_pays_object_uri)])

  def test_du_obj(self):

    @Retry(AssertionError, tries=3, timeout_secs=1)
    # Use @Retry as hedge against bucket listing eventual consistency.
    def _check():
      self._run_requester_pays_test(
          ['du', suri(self.requester_pays_object_uri)])
      self._run_non_requester_pays_test(
          ['du', suri(self.non_requester_pays_object_uri)])

    _check()

  def test_hash(self):
    self._run_requester_pays_test(
        ['hash', '-c', suri(self.requester_pays_object_uri)],
        regex=r'Hash \(crc32c\)')
    self._run_non_requester_pays_test(
        ['hash', '-c', suri(self.non_requester_pays_object_uri)])

  def test_iam(self):
    self._run_requester_pays_test(
        ['iam', 'get', str(self.requester_pays_object_uri)])
    self._run_non_requester_pays_test(
        ['iam', 'get', str(self.non_requester_pays_object_uri)])

  def test_mv(self):
    requester_pays_bucket_uri = self.CreateBucket()
    object1_uri = self.CreateObject(bucket_uri=requester_pays_bucket_uri,
                                    contents=b'foo')
    object2_uri = self.CreateObject(bucket_uri=requester_pays_bucket_uri,
                                    contents=b'oOOo')
    self.AssertNObjectsInBucket(requester_pays_bucket_uri, 2)
    self._set_requester_pays(requester_pays_bucket_uri)
    dest_bucket_uri = self.CreateBucket()

    # Move two objects from bucket1 to Requester Pays bucket.
    for obj in [object1_uri, object2_uri]:
      self._run_requester_pays_test(['mv', suri(obj), suri(dest_bucket_uri)])
    self.AssertNObjectsInBucket(requester_pays_bucket_uri, 0)

    bucket_uri = self.CreateBucket()
    object1_uri = self.CreateObject(bucket_uri=bucket_uri, contents=b'bar')
    object2_uri = self.CreateObject(bucket_uri=bucket_uri, contents=b'baz')
    self.AssertNObjectsInBucket(bucket_uri, 2)
    for obj in [object1_uri, object2_uri]:
      self._run_non_requester_pays_test(
          ['mv', suri(obj), suri(dest_bucket_uri)])
    self.AssertNObjectsInBucket(bucket_uri, 0)

  def test_rewrite(self):
    object_uri = self.CreateObject(contents=b'bar')
    storage_class = 'nearline' if self._use_gcloud_storage else 'dra'
    self._run_non_requester_pays_test(
        ['rewrite', '-s', storage_class,
         suri(object_uri)])

    req_pays_bucket_uri = self.CreateBucket()
    self._set_requester_pays(req_pays_bucket_uri)
    req_pays_obj_uri = self.CreateObject(bucket_uri=req_pays_bucket_uri,
                                         contents=b'baz')
    self._run_requester_pays_test(
        ['rewrite', '-s', storage_class,
         suri(req_pays_obj_uri)])

  def test_rsync(self):
    req_pays_bucket_uri = self.CreateBucket(test_objects=2)
    self._set_requester_pays(req_pays_bucket_uri)
    bucket_uri = self.CreateBucket(test_objects=1)
    self._run_requester_pays_test([
        'rsync', '-d',
        suri(req_pays_bucket_uri),
        suri(self.requester_pays_bucket_uri)
    ])

    bucket_uri1 = self.CreateBucket(test_objects=2)
    bucket_uri2 = self.CreateBucket(test_objects=1)
    self._run_non_requester_pays_test(
        ['rsync', '-d', suri(bucket_uri1),
         suri(bucket_uri2)])

  def test_setmeta(self):
    req_pays_obj_uri = self.CreateObject(
        bucket_uri=self.requester_pays_bucket_uri,
        contents=b'<html><body>text</body></html>')
    self._run_requester_pays_test(
        ['setmeta', '-h', 'content-type:text/html',
         suri(req_pays_obj_uri)])

    obj_uri = self.CreateObject(bucket_uri=self.non_requester_pays_bucket_uri,
                                contents=b'<html><body>text</body></html>')
    self._run_non_requester_pays_test(
        ['setmeta', '-h', 'content-type:text/html',
         suri(obj_uri)])

  def test_stat(self):
    self._run_requester_pays_test(
        ['stat', suri(self.requester_pays_object_uri)])
    self._run_non_requester_pays_test(
        ['stat', suri(self.non_requester_pays_object_uri)])