HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/394/platform/gsutil/gslib/tests/test_command_runner.py
# -*- coding: utf-8 -*-
# Copyright 2011 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Unit and integration tests for gsutil command_runner module."""

from __future__ import absolute_import
from __future__ import print_function
from __future__ import division
from __future__ import unicode_literals

import logging
import os
import time

import six
from six.moves import input
import boto
import sys

import gslib
from gslib import command_runner
from gslib.command import Command
from gslib.command_argument import CommandArgument
from gslib.command_runner import CommandRunner
from gslib.command_runner import HandleArgCoding
from gslib.command_runner import HandleHeaderCoding
from gslib.exception import CommandException
from gslib.tab_complete import CloudObjectCompleter
from gslib.tab_complete import CloudOrLocalObjectCompleter
from gslib.tab_complete import LocalObjectCompleter
from gslib.tab_complete import LocalObjectOrCannedACLCompleter
from gslib.tab_complete import NoOpCompleter
import gslib.tests.testcase as testcase
import gslib.tests.util as util
from gslib.tests.util import ARGCOMPLETE_AVAILABLE
from gslib.tests.util import SetBotoConfigForTest
from gslib.tests.util import unittest
from gslib.utils import system_util
from gslib.utils.constants import GSUTIL_PUB_TARBALL
from gslib.utils.text_util import InsistAscii
from gslib.utils.unit_util import SECONDS_PER_DAY

from six import add_move, MovedModule

add_move(MovedModule('mock', 'mock', 'unittest.mock'))
from six.moves import mock

SKIP_BECAUSE_RETRIES_ARE_SLOW = (
    'gs_host is set to non-default value; trying to fetch gsutil version '
    'metadata from the wrong host would fail repeatedly, taking a long time '
    'for retry backoffs before finally failing the test.')


class FakeArgparseArgument(object):
  """Fake for argparse parser argument."""
  pass


class FakeArgparseParser(object):
  """Fake for argparse parser."""

  def __init__(self, name='Default'):
    self.arguments = []
    self.subparsers = None
    self.name = name

  def add_argument(self, *unused_args, **unused_kwargs):
    argument = FakeArgparseArgument()
    self.arguments.append(argument)
    return argument

  def add_subparsers(self):
    self.subparsers = FakeArgparseSubparsers()
    return self.subparsers


class FakeArgparseSubparsers(object):
  """Container for nested parsers."""

  def __init__(self):
    self.parsers = []

  def add_parser(self, name='Default', **unused_kwargs):
    parser = FakeArgparseParser(name=name)
    self.parsers.append(parser)
    return parser


class FakeCommandWithInvalidCompleter(Command):
  """Command with an invalid completer on an argument."""

  command_spec = Command.CreateCommandSpec(
      'fake1', argparse_arguments=[CommandArgument('arg', completer='BAD')])

  help_spec = Command.HelpSpec(help_name='fake1',
                               help_name_aliases=[],
                               help_type='command_help',
                               help_one_line_summary='fake command for tests',
                               help_text='fake command for tests',
                               subcommand_help_text={})

  def __init__(self):
    pass


class FakeCommandWithNestedArguments(object):
  subcommand_name = 'event'
  subcommand_subname = 'set'
  # Matches the number of CommandArguments in argparse_arguments below.
  num_args = 2

  command_spec = Command.CreateCommandSpec('FakeCommandWithNestedArguments',
                                           argparse_arguments={
                                               subcommand_name: {
                                                   subcommand_subname: [
                                                       CommandArgument('arg1'),
                                                       CommandArgument('arg2'),
                                                   ]
                                               }
                                           })


class FakeCommandWithCompleters(Command):
  """Command with various completer types."""

  command_spec = Command.CreateCommandSpec(
      'fake2',
      argparse_arguments=[
          CommandArgument.MakeZeroOrMoreCloudURLsArgument(),
          CommandArgument.MakeZeroOrMoreFileURLsArgument(),
          CommandArgument.MakeZeroOrMoreCloudOrFileURLsArgument(),
          CommandArgument.MakeFreeTextArgument(),
          CommandArgument.MakeZeroOrMoreCloudBucketURLsArgument(),
          CommandArgument.MakeFileURLOrCannedACLArgument(),
      ])

  help_spec = Command.HelpSpec(help_name='fake2',
                               help_name_aliases=[],
                               help_type='command_help',
                               help_one_line_summary='fake command for tests',
                               help_text='fake command for tests',
                               subcommand_help_text={})

  def __init__(self):
    pass


class TestCommandRunnerUnitTests(testcase.unit_testcase.GsUtilUnitTestCase):
  """Unit tests for gsutil update check in command_runner module."""

  def setUp(self):
    """Sets up the command runner mock objects."""
    super(TestCommandRunnerUnitTests, self).setUp()

    # Mock out the timestamp file so we can manipulate it.
    self.timestamp_file_path = self.CreateTempFile()
    get_timestamp_file_patcher = mock.patch.object(
        command_runner.boto_util,
        'GetLastCheckedForGsutilUpdateTimestampFile',
        return_value=self.timestamp_file_path)
    self.addCleanup(get_timestamp_file_patcher.stop)
    get_timestamp_file_patcher.start()

    # Mock out the gsutil version checker.
    base_version = six.text_type(gslib.VERSION)
    while not base_version.isnumeric():
      if not base_version:
        raise CommandException('Version number (%s) is not numeric.' %
                               gslib.VERSION)
      base_version = base_version[:-1]
    self.old_look_up_gsutil_version = command_runner.LookUpGsutilVersion
    command_runner.LookUpGsutilVersion = lambda u, v: float(base_version) + 1

    # Mock out input to trigger yes prompt.
    command_runner.input = lambda p: 'y'

    # Mock out TTY check to pretend we're on a TTY even if we're not.
    self.running_interactively = True
    command_runner.system_util.IsRunningInteractively = (
        lambda: self.running_interactively)

    # Mock out the modified time of the VERSION file.
    self.version_mod_time = 0
    self.previous_version_mod_time = gslib.GetGsutilVersionModifiedTime
    gslib.GetGsutilVersionModifiedTime = lambda: self.version_mod_time

    # Create a fake pub tarball that will be used to check for gsutil version.
    self.pub_bucket_uri = self.CreateBucket('pub')
    self.gsutil_tarball_uri = self.CreateObject(bucket_uri=self.pub_bucket_uri,
                                                object_name='gsutil.tar.gz',
                                                contents='foo')

  def tearDown(self):
    """Tears down the command runner mock objects."""
    super(TestCommandRunnerUnitTests, self).tearDown()

    command_runner.LookUpGsutilVersion = self.old_look_up_gsutil_version
    command_runner.input = input

    gslib.GetGsutilVersionModifiedTime = self.previous_version_mod_time

    command_runner.IsRunningInteractively = (
        gslib.utils.system_util.IsRunningInteractively)

    self.gsutil_tarball_uri.delete_key()
    self.pub_bucket_uri.delete_bucket()

  def _IsPackageOrCloudSDKInstall(self):
    # Update should not trigger for package installs or Cloud SDK installs.
    return gslib.IS_PACKAGE_INSTALL or system_util.InvokedViaCloudSdk()

  @unittest.skipIf(util.HAS_NON_DEFAULT_GS_HOST, SKIP_BECAUSE_RETRIES_ARE_SLOW)
  def test_py3_not_interactive(self):
    """Tests that py3 prompt is not triggered if not running interactively."""
    with mock.patch.object(sys, 'version_info') as version_info:
      version_info.major = 2
      self.running_interactively = False
      self.assertFalse(self.command_runner.MaybePromptForPythonUpdate('ver'))

  @unittest.skipIf(util.HAS_NON_DEFAULT_GS_HOST, SKIP_BECAUSE_RETRIES_ARE_SLOW)
  def test_py3_skipped_in_boto(self):
    """Tests that py3 prompt is not triggered if skipped in boto config."""
    with SetBotoConfigForTest([('GSUtil', 'skip_python_update_prompt', 'True')
                              ]):
      with mock.patch.object(sys, 'version_info') as version_info:
        version_info.major = 2
        self.assertFalse(self.command_runner.MaybePromptForPythonUpdate('ver'))

  @unittest.skipIf(util.HAS_NON_DEFAULT_GS_HOST, SKIP_BECAUSE_RETRIES_ARE_SLOW)
  def test_py3_prompt_on_py2(self):
    """Tests that py3 prompt is triggered on Python 2."""
    with mock.patch.object(sys, 'version_info') as version_info:
      version_info.major = 2
      self.assertEqual(True,
                       self.command_runner.MaybePromptForPythonUpdate('ver'))

  @unittest.skipIf(util.HAS_NON_DEFAULT_GS_HOST, SKIP_BECAUSE_RETRIES_ARE_SLOW)
  def test_py3_prompt_on_py3(self):
    """Tests that py3 prompt is not triggered on Python 3."""
    with mock.patch.object(sys, 'version_info') as version_info:
      version_info.major = 3
      self.assertEqual(False,
                       self.command_runner.MaybePromptForPythonUpdate('ver'))

  @unittest.skipIf(util.HAS_NON_DEFAULT_GS_HOST, SKIP_BECAUSE_RETRIES_ARE_SLOW)
  def test_not_interactive(self):
    """Tests that update is not triggered if not running interactively."""
    with SetBotoConfigForTest([('GSUtil', 'software_update_check_period', '1')
                              ]):
      with open(self.timestamp_file_path, 'w') as f:
        f.write(str(int(time.time() - 2 * SECONDS_PER_DAY)))
      self.running_interactively = False
      self.assertEqual(
          False,
          self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('ls', 0))

  @unittest.skipIf(util.HAS_NON_DEFAULT_GS_HOST, SKIP_BECAUSE_RETRIES_ARE_SLOW)
  def test_no_tracker_file_version_recent(self):
    """Tests when no timestamp file exists and VERSION file is recent."""
    if os.path.exists(self.timestamp_file_path):
      os.remove(self.timestamp_file_path)
    self.assertFalse(os.path.exists(self.timestamp_file_path))
    self.version_mod_time = time.time()
    self.assertEqual(
        False, self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('ls', 0))

  @unittest.skipIf(util.HAS_NON_DEFAULT_GS_HOST, SKIP_BECAUSE_RETRIES_ARE_SLOW)
  def test_no_tracker_file_version_old(self):
    """Tests when no timestamp file exists and VERSION file is old."""
    if os.path.exists(self.timestamp_file_path):
      os.remove(self.timestamp_file_path)
    self.assertFalse(os.path.exists(self.timestamp_file_path))
    self.version_mod_time = 0
    expected = not self._IsPackageOrCloudSDKInstall()
    self.assertEqual(
        expected,
        self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('ls', 0))

  @unittest.skipIf(util.HAS_NON_DEFAULT_GS_HOST, SKIP_BECAUSE_RETRIES_ARE_SLOW)
  def test_invalid_commands(self):
    """Tests that update is not triggered for certain commands."""
    self.assertEqual(
        False,
        self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('update', 0))

  @unittest.skipIf(util.HAS_NON_DEFAULT_GS_HOST, SKIP_BECAUSE_RETRIES_ARE_SLOW)
  def test_fails_silently_when_version_check_fails(self):
    """Tests that update is not triggered for certain commands."""

    command_runner.LookUpGsutilVersion = self.old_look_up_gsutil_version
    self.assertEqual(
        False,
        self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('update', 0))

  @unittest.skipIf(util.HAS_NON_DEFAULT_GS_HOST, SKIP_BECAUSE_RETRIES_ARE_SLOW)
  def test_invalid_file_contents(self):
    """Tests no update if timestamp file has invalid value."""
    with open(self.timestamp_file_path, 'w') as f:
      f.write('NaN')
    self.assertEqual(
        False, self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('ls', 0))

  @unittest.skipIf(util.HAS_NON_DEFAULT_GS_HOST, SKIP_BECAUSE_RETRIES_ARE_SLOW)
  def test_update_should_trigger(self):
    """Tests update should be triggered if time is up."""
    with SetBotoConfigForTest([('GSUtil', 'software_update_check_period', '1')
                              ]):
      with open(self.timestamp_file_path, 'w') as f:
        f.write(str(int(time.time() - 2 * SECONDS_PER_DAY)))
      expected = not self._IsPackageOrCloudSDKInstall()
      self.assertEqual(
          expected,
          self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('ls', 0))

  @unittest.skipIf(util.HAS_NON_DEFAULT_GS_HOST, SKIP_BECAUSE_RETRIES_ARE_SLOW)
  def test_not_time_for_update_yet(self):
    """Tests update not triggered if not time yet."""
    with SetBotoConfigForTest([('GSUtil', 'software_update_check_period', '3')
                              ]):
      with open(self.timestamp_file_path, 'w') as f:
        f.write(str(int(time.time() - 2 * SECONDS_PER_DAY)))
      self.assertEqual(
          False,
          self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('ls', 0))

  def test_user_says_no_to_update(self):
    """Tests no update triggered if user says no at the prompt."""
    with SetBotoConfigForTest([('GSUtil', 'software_update_check_period', '1')
                              ]):
      with open(self.timestamp_file_path, 'w') as f:
        f.write(str(int(time.time() - 2 * SECONDS_PER_DAY)))
      command_runner.input = lambda p: 'n'
      self.assertEqual(
          False,
          self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('ls', 0))

  @unittest.skipIf(util.HAS_NON_DEFAULT_GS_HOST, SKIP_BECAUSE_RETRIES_ARE_SLOW)
  def test_update_check_skipped_with_quiet_mode(self):
    """Tests that update isn't triggered when loglevel is in quiet mode."""
    with SetBotoConfigForTest([('GSUtil', 'software_update_check_period', '1')
                              ]):
      with open(self.timestamp_file_path, 'w') as f:
        f.write(str(int(time.time() - 2 * SECONDS_PER_DAY)))

      expected = not self._IsPackageOrCloudSDKInstall()
      self.assertEqual(
          expected,
          self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('ls', 0))

      prev_loglevel = logging.getLogger().getEffectiveLevel()
      try:
        logging.getLogger().setLevel(logging.ERROR)
        # With reduced loglevel, should return False.
        self.assertEqual(
            False,
            self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('ls', 0))
      finally:
        logging.getLogger().setLevel(prev_loglevel)

  def test_command_argument_parser_setup_invalid_completer(self):

    command_map = {
        FakeCommandWithInvalidCompleter.command_spec.command_name:
            FakeCommandWithInvalidCompleter()
    }

    runner = CommandRunner(
        bucket_storage_uri_class=self.mock_bucket_storage_uri,
        gsutil_api_class_map_factory=self.mock_gsutil_api_class_map_factory,
        command_map=command_map)

    parser = FakeArgparseParser()
    try:
      runner.ConfigureCommandArgumentParsers(parser)
    except RuntimeError as e:
      self.assertIn('Unknown completer', str(e))

  @unittest.skipUnless(ARGCOMPLETE_AVAILABLE,
                       'Tab completion requires argcomplete')
  def test_command_argument_parser_setup_nested_argparse_arguments(self):
    command_map = {
        FakeCommandWithNestedArguments.command_spec.command_name:
            FakeCommandWithNestedArguments(),
    }
    runner = CommandRunner(
        bucket_storage_uri_class=self.mock_bucket_storage_uri,
        gsutil_api_class_map_factory=self.mock_gsutil_api_class_map_factory,
        command_map=command_map)
    parser = FakeArgparseParser()
    runner.ConfigureCommandArgumentParsers(parser)

    # Start at first level of subparsers.
    cur_subparser = parser.subparsers.parsers[0]
    self.assertEqual(cur_subparser.name,
                     FakeCommandWithNestedArguments.command_spec.command_name)

    # Go one level further, checking the current subparser's first subparser.
    cur_subparser = cur_subparser.subparsers.parsers[0]
    self.assertEqual(cur_subparser.name,
                     FakeCommandWithNestedArguments.subcommand_name)

    cur_subparser = cur_subparser.subparsers.parsers[0]
    self.assertEqual(cur_subparser.name,
                     FakeCommandWithNestedArguments.subcommand_subname)

    # FakeArgparseParser adds a FakeArgparseArgument in place of any arguments
    # passed in via add_argument(), so we can't check for anything beyond the
    # presence of the correct number of arguments here.
    self.assertEqual(len(cur_subparser.arguments),
                     FakeCommandWithNestedArguments.num_args)

  @unittest.skipUnless(ARGCOMPLETE_AVAILABLE,
                       'Tab completion requires argcomplete')
  def test_command_argument_parser_setup_completers(self):

    command_map = {
        FakeCommandWithCompleters.command_spec.command_name:
            FakeCommandWithCompleters()
    }

    runner = CommandRunner(
        bucket_storage_uri_class=self.mock_bucket_storage_uri,
        gsutil_api_class_map_factory=self.mock_gsutil_api_class_map_factory,
        command_map=command_map)

    main_parser = FakeArgparseParser()

    runner.ConfigureCommandArgumentParsers(main_parser)

    self.assertEqual(1, len(main_parser.subparsers.parsers))
    subparser = main_parser.subparsers.parsers[0]
    self.assertEqual(6, len(subparser.arguments))
    self.assertEqual(CloudObjectCompleter,
                     type(subparser.arguments[0].completer))
    self.assertEqual(LocalObjectCompleter,
                     type(subparser.arguments[1].completer))
    self.assertEqual(CloudOrLocalObjectCompleter,
                     type(subparser.arguments[2].completer))
    self.assertEqual(NoOpCompleter, type(subparser.arguments[3].completer))
    self.assertEqual(CloudObjectCompleter,
                     type(subparser.arguments[4].completer))
    self.assertEqual(LocalObjectOrCannedACLCompleter,
                     type(subparser.arguments[5].completer))

  # pylint: disable=invalid-encoded-data
  def test_valid_arg_coding(self):
    """Tests that gsutil encodes valid args correctly."""
    # Args should be unicode; if they are not unicode, they should be utf-8
    # decoded.
    args = ['ls', '-p', 'abc:def', 'gs://bucket']
    HandleArgCoding(args)
    for a in args:
      self.assertTrue(isinstance(a, six.text_type))

  def test_valid_header_coding(self):
    headers = {
        'content-type': 'text/plain',
        'x-goog-meta-foo': 'bãr',
    }
    HandleHeaderCoding(headers)
    # Custom metadata header values should be decoded to unicode; others should
    # not be decoded, but should contain only ASCII characters.
    self.assertTrue(isinstance(headers['x-goog-meta-foo'], six.text_type))
    InsistAscii(
        headers['content-type'],
        'Value of non-custom-metadata header contained non-ASCII characters')

  def test_invalid_header_coding_fails(self):
    headers = {'content-type': 'bãr'}
    with self.assertRaisesRegex(CommandException, r'Invalid non-ASCII'):
      HandleHeaderCoding(headers)


class TestCommandRunnerIntegrationTests(testcase.GsUtilIntegrationTestCase):
  """Integration tests for gsutil update check in command_runner module."""

  def setUp(self):
    """Sets up the command runner mock objects."""
    super(TestCommandRunnerIntegrationTests, self).setUp()

    # Mock out the timestamp file so we can manipulate it.
    self.timestamp_file_path = self.CreateTempFile(contents='0')
    get_timestamp_file_patcher = mock.patch.object(
        command_runner.boto_util,
        'GetLastCheckedForGsutilUpdateTimestampFile',
        return_value=self.timestamp_file_path)
    self.addCleanup(get_timestamp_file_patcher.stop)
    get_timestamp_file_patcher.start()

    # Mock out input to trigger yes prompt.
    command_runner.input = lambda p: 'y'

  def tearDown(self):
    """Tears down the command runner mock objects."""
    super(TestCommandRunnerIntegrationTests, self).tearDown()
    command_runner.input = input

  @unittest.skipIf(util.HAS_NON_DEFAULT_GS_HOST, SKIP_BECAUSE_RETRIES_ARE_SLOW)
  def test_lookup_version_without_credentials(self):
    """Tests that gsutil tarball version lookup works without credentials."""
    with SetBotoConfigForTest([('GSUtil', 'software_update_check_period', '1')],
                              use_existing_config=False):
      self.command_runner = command_runner.CommandRunner()
      # Looking up software version shouldn't get auth failure exception.
      self.command_runner.RunNamedCommand('ls', [GSUTIL_PUB_TARBALL])