HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/396/platform/gsutil/gslib/tests/test_creds_config.py
# -*- coding: utf-8 -*-
# Copyright 2014 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for various combinations of configured credentials."""

from __future__ import absolute_import
from __future__ import print_function
from __future__ import division
from __future__ import unicode_literals

import boto

from apitools.base.py import exceptions as apitools_exceptions
from apitools.base.py import http_wrapper
from gslib.cloud_api import AccessDeniedException
from gslib.cred_types import CredTypes
from gslib.discard_messages_queue import DiscardMessagesQueue
from gslib.exception import CommandException
from gslib.gcs_json_api import GcsJsonApi
from gslib.tests.mock_logging_handler import MockLoggingHandler
import gslib.tests.testcase as testcase
from gslib.tests.testcase.integration_testcase import SkipForS3
from gslib.tests.testcase.integration_testcase import SkipForXML
from gslib.tests.util import ObjectToURI as suri
from gslib.tests.util import SetBotoConfigForTest
from gslib.tests.util import SetEnvironmentForTest
from gslib.tests.util import unittest

from six import add_move, MovedModule

add_move(MovedModule('mock', 'mock', 'unittest.mock'))
from six.moves import mock

from datetime import datetime


def _Make403(message):
  return (apitools_exceptions.HttpError.FromResponse(
      http_wrapper.Response(info={'status': 403},
                            content='{"error": {"message": "%s"}}' % message,
                            request_url='http://www.google.com')))


SERVICE_ACCOUNT = boto.config.get_value('GSUtil',
                                        'test_impersonate_service_account')


class TestCredsConfig(testcase.GsUtilUnitTestCase):
  """Tests for various combinations of configured credentials."""

  def setUp(self):
    super(TestCredsConfig, self).setUp()
    self.log_handler = MockLoggingHandler()
    self.logger.addHandler(self.log_handler)

  def testMultipleConfiguredCreds(self):
    with SetBotoConfigForTest([
        ('Credentials', 'gs_oauth2_refresh_token', 'foo'),
        ('Credentials', 'gs_service_client_id', 'bar'),
        ('Credentials', 'gs_service_key_file', 'baz'),
        ('Credentials', 'gs_impersonate_service_account', None)
    ]):

      try:
        GcsJsonApi(None, self.logger, DiscardMessagesQueue())
        self.fail('Succeeded with multiple types of configured creds.')
      except CommandException as e:
        msg = str(e)
        self.assertIn('types of configured credentials', msg)
        self.assertIn(CredTypes.OAUTH2_USER_ACCOUNT, msg)
        self.assertIn(CredTypes.OAUTH2_SERVICE_ACCOUNT, msg)

  @SkipForS3('Tests only uses gs credentials.')
  @SkipForXML('Tests only run on JSON API.')
  @mock.patch('gslib.third_party.iamcredentials_apitools'
              '.iamcredentials_v1_client.IamcredentialsV1'
              '.ProjectsServiceAccountsService.GenerateAccessToken')
  def testImpersonationBlockedByIamCredentialsApiErrors(
      self, mock_iam_creds_generate_access_token):
    with SetBotoConfigForTest([
        ('Credentials', 'gs_oauth2_refresh_token', 'foo'),
        ('Credentials', 'gs_service_client_id', None),
        ('Credentials', 'gs_service_key_file', None),
        ('Credentials', 'gs_impersonate_service_account', 'bar')
    ]):
      mock_iam_creds_generate_access_token.side_effect = (
          _Make403('The caller does not have permission'))
      try:
        GcsJsonApi(None, self.logger, DiscardMessagesQueue())
        self.fail('Succeeded when IAM Credentials threw an error')
      except AccessDeniedException as e:
        self.assertIn('Service account impersonation failed.', str(e))

      mock_iam_creds_generate_access_token.side_effect = (
          _Make403('IAM Service Account Credentials API has not been used'))
      try:
        GcsJsonApi(None, self.logger, DiscardMessagesQueue())
        self.fail('Succeeded when IAM Credentials threw an error')
      except AccessDeniedException as e:
        self.assertIn('IAM Service Account Credentials API has not', str(e))

  @SkipForS3('Tests only uses gs credentials.')
  @SkipForXML('Tests only run on JSON API.')
  @mock.patch('gslib.third_party.iamcredentials_apitools'
              '.iamcredentials_v1_client.IamcredentialsV1'
              '.ProjectsServiceAccountsService.GenerateAccessToken')
  def testImpersonationSuccessfullyUsesToken(
      self, mock_iam_creds_generate_access_token):
    with SetBotoConfigForTest([
        ('Credentials', 'gs_oauth2_refresh_token', 'foo'),
        ('Credentials', 'gs_service_client_id', None),
        ('Credentials', 'gs_service_key_file', None),
        ('Credentials', 'gs_impersonate_service_account', 'bar')
    ]):
      fake_token = 'Mock token from IAM Credentials API'
      expire_time = datetime.now().strftime("%Y-%m-%dT23:59:59Z")
      mock_iam_creds_generate_access_token.return_value.accessToken = fake_token
      mock_iam_creds_generate_access_token.return_value.expireTime = expire_time
      api = GcsJsonApi(None, self.logger, DiscardMessagesQueue())
      self.assertIn(fake_token, str(api.credentials.access_token))


class TestCredsConfigIntegration(testcase.GsUtilIntegrationTestCase):

  @SkipForS3('Tests only uses gs credentials.')
  def testExactlyOneInvalid(self):
    bucket_uri = self.CreateBucket()
    with SetBotoConfigForTest(
        [('Credentials', 'gs_oauth2_refresh_token', 'foo'),
         ('Credentials', 'gs_service_client_id', None),
         ('Credentials', 'gs_service_key_file', None),
         ('Credentials', 'gs_impersonate_service_account', None)],
        use_existing_config=False):
      stderr = self.RunGsUtil(['ls', suri(bucket_uri)],
                              expected_status=1,
                              return_stderr=True)
      self.assertIn('credentials are invalid', stderr)

  @unittest.skipUnless(SERVICE_ACCOUNT,
                       'Test requires test_impersonate_service_account.')
  @SkipForS3('Tests only uses gs credentials.')
  @SkipForXML('Tests only run on JSON API.')
  def testImpersonationCredentialsFromBotoConfig(self):
    with SetBotoConfigForTest([('Credentials', 'gs_impersonate_service_account',
                                SERVICE_ACCOUNT)]):
      with SetEnvironmentForTest({}):
        stderr = self.RunGsUtil(['ls', 'gs://pub'], return_stderr=True)
        self.assertIn('using service account impersonation', stderr)

  @unittest.skipUnless(SERVICE_ACCOUNT,
                       'Test requires test_impersonate_service_account.')
  @SkipForS3('Tests only uses gs credentials.')
  @SkipForXML('Tests only run on JSON API.')
  def testImpersonationCredentialsFromGCloud(self):
    with SetBotoConfigForTest([('Credentials', 'gs_impersonate_service_account',
                                None)]):
      with SetEnvironmentForTest(
          {'CLOUDSDK_AUTH_IMPERSONATE_SERVICE_ACCOUNT': SERVICE_ACCOUNT}):
        stderr = self.RunGsUtil(['ls', 'gs://pub'], return_stderr=True)
        self.assertIn('using service account impersonation', stderr)

  @unittest.skipUnless(SERVICE_ACCOUNT,
                       'Test requires test_impersonate_service_account.')
  @SkipForS3('Tests only uses gs credentials.')
  @SkipForXML('Tests only run on JSON API.')
  def testImpersonationCredentialsFromOptionOverridesOthers(self):
    with SetBotoConfigForTest([('Credentials', 'gs_impersonate_service_account',
                                'foo@google.com')]):
      with SetEnvironmentForTest(
          {'CLOUDSDK_AUTH_IMPERSONATE_SERVICE_ACCOUNT': 'bar@google.com'}):
        stderr = self.RunGsUtil(['-i', SERVICE_ACCOUNT, 'ls', 'gs://pub'],
                                return_stderr=True)
        self.assertIn('API calls will be executed as [%s' % SERVICE_ACCOUNT,
                      stderr)

  @unittest.skipUnless(SERVICE_ACCOUNT,
                       'Test requires test_impersonate_service_account.')
  @SkipForS3('Tests only uses gs credentials.')
  @SkipForXML('Tests only run on JSON API.')
  def testImpersonationSuccess(self):
    with SetBotoConfigForTest([('Credentials', 'gs_impersonate_service_account',
                                SERVICE_ACCOUNT)]):
      stderr = self.RunGsUtil(['ls', 'gs://pub'], return_stderr=True)
      self.assertIn('API calls will be executed as [%s' % SERVICE_ACCOUNT,
                    stderr)