HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/396/platform/gsutil/gslib/tests/test_gcs_json_credentials.py
# -*- coding: utf-8 -*-
# Copyright 2022 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for gcs_json_credentials.py."""

from __future__ import absolute_import
from __future__ import print_function
from __future__ import division
from __future__ import unicode_literals

from apitools.base.py import GceAssertionCredentials
from google_reauth import reauth_creds
from gslib import gcs_json_api
from gslib import gcs_json_credentials
from gslib.cred_types import CredTypes
from gslib.exception import CommandException
from gslib.tests import testcase
from gslib.tests.util import SetBotoConfigForTest
from gslib.tests.util import unittest
from gslib.utils.wrapped_credentials import WrappedCredentials
import logging
from oauth2client.service_account import ServiceAccountCredentials
import pkgutil

from six import add_move, MovedModule

add_move(MovedModule("mock", "mock", "unittest.mock"))
from six.moves import mock

try:
  from OpenSSL.crypto import load_pkcs12
  HAS_OPENSSL = True
except ImportError:
  HAS_OPENSSL = False

try:
  import cryptography
  HAS_CRYPTO = True
except ImportError:
  HAS_CRYPTO = False

ERROR_MESSAGE = "This is the error message"


def getBotoCredentialsConfig(
    service_account_creds=None,
    user_account_creds=None,
    gce_creds=None,
    external_account_creds=None,
    external_account_authorized_user_creds=None,
):
  config = []
  if service_account_creds:
    config.append(("Credentials", "gs_service_key_file",
                   service_account_creds["keyfile"]))
    config.append(("Credentials", "gs_service_client_id",
                   service_account_creds["client_id"]))
  else:
    config.append(("Credentials", "gs_service_key_file", None))
  config.extend([("Credentials", "gs_oauth2_refresh_token", user_account_creds),
                 ("GoogleCompute", "service_account", gce_creds),
                 ("Credentials", "gs_external_account_file",
                  external_account_creds),
                 ("Credentials", "gs_external_account_authorized_user_file",
                  external_account_authorized_user_creds)])
  return config


class TestGcsJsonCredentials(testcase.GsUtilUnitTestCase):
  """Test logic for interacting with GCS JSON Credentials."""

  @mock.patch.object(gcs_json_credentials.P12Credentials,
                     "from_service_account_pkcs12_keystring",
                     return_value=gcs_json_credentials.P12Credentials(mock.Mock(), token_uri='123', service_account_email='123', scopes=['a', 'b']))
  def testOauth2ServiceAccountCredential(self, _):
    contents = pkgutil.get_data("gslib", "tests/test_data/test.p12")
    tmpfile = self.CreateTempFile(contents=contents)
    with SetBotoConfigForTest(
        getBotoCredentialsConfig(service_account_creds={
            "keyfile": tmpfile,
            "client_id": "?",
        })):
      self.assertTrue(gcs_json_credentials._HasOauth2ServiceAccountCreds())
      client = gcs_json_api.GcsJsonApi(None, None, None, None)
      self.assertEqual(client.credentials.service_account_email, '123')
      self.assertIsInstance(client.credentials, gcs_json_credentials.P12Credentials)

  def testP12CredentialsthrowsErrorIfProvidedWithMissingFields(self):
    contents = pkgutil.get_data("gslib", "tests/test_data/test.p12")
    tmpfile = self.CreateTempFile(contents=contents)
    with self.assertRaises(Exception) as exc:
      gcs_json_credentials.CreateP12ServiceAccount(tmpfile)

  @unittest.skipUnless(HAS_CRYPTO, 'p12credentials requires cryptography.')
  @mock.patch.object(gcs_json_credentials.P12Credentials,
                     "__init__",
                     side_effect=ValueError(ERROR_MESSAGE))
  def testOauth2ServiceAccountFailure(self, _):
    contents = pkgutil.get_data("gslib", "tests/test_data/test.p12")
    tmpfile = self.CreateTempFile(contents=contents)
    with SetBotoConfigForTest(
        getBotoCredentialsConfig(service_account_creds={
            "keyfile": tmpfile,
            "client_id": "?",
        })):
      with self.assertLogs() as logger:
        with self.assertRaises(Exception) as exc:
          gcs_json_api.GcsJsonApi(None, logging.getLogger(), None, None)
        self.assertIn(ERROR_MESSAGE, str(exc.exception))
        self.assertIn(CredTypes.OAUTH2_SERVICE_ACCOUNT, logger.output[0])

  def testOauth2UserCredential(self):
    with SetBotoConfigForTest(getBotoCredentialsConfig(user_account_creds="?")):
      self.assertTrue(gcs_json_credentials._HasOauth2UserAccountCreds())
      client = gcs_json_api.GcsJsonApi(None, None, None, None)
      self.assertIsInstance(client.credentials,
                            reauth_creds.Oauth2WithReauthCredentials)

  @mock.patch.object(reauth_creds.Oauth2WithReauthCredentials,
                     "__init__",
                     side_effect=ValueError(ERROR_MESSAGE))
  def testOauth2UserFailure(self, _):
    with SetBotoConfigForTest(getBotoCredentialsConfig(user_account_creds="?")):
      with self.assertLogs() as logger:
        with self.assertRaises(Exception) as exc:
          gcs_json_api.GcsJsonApi(None, logging.getLogger(), None, None)
        self.assertIn(ERROR_MESSAGE, str(exc.exception))
        self.assertIn(CredTypes.OAUTH2_USER_ACCOUNT, logger.output[0])

  @mock.patch.object(gcs_json_credentials.credentials_lib,
                     'GceAssertionCredentials',
                     autospec=True)
  def testGCECredential(self, mock_credentials):

    def set_store(store):
      mock_credentials.return_value.store = store

    mock_credentials.return_value.client_id = None
    mock_credentials.return_value.refresh_token = "rEfrEshtOkEn"
    mock_credentials.return_value.set_store = set_store
    with SetBotoConfigForTest(getBotoCredentialsConfig(gce_creds="?")):
      self.assertTrue(gcs_json_credentials._HasGceCreds())
      client = gcs_json_api.GcsJsonApi(None, None, None, None)
      self.assertIsInstance(client.credentials, GceAssertionCredentials)
      self.assertEqual(client.credentials.refresh_token, "rEfrEshtOkEn")
      self.assertIs(client.credentials.client_id, None)

  @mock.patch.object(GceAssertionCredentials,
                     "__init__",
                     side_effect=ValueError(ERROR_MESSAGE))
  def testGCECredentialFailure(self, _):
    with SetBotoConfigForTest(getBotoCredentialsConfig(gce_creds="?")):
      with self.assertLogs() as logger:
        with self.assertRaises(Exception) as exc:
          gcs_json_api.GcsJsonApi(None, logging.getLogger(), None, None)
        self.assertIn(ERROR_MESSAGE, str(exc.exception))
        self.assertIn(CredTypes.GCE, logger.output[0])

  def testExternalAccountCredential(self):
    contents = pkgutil.get_data(
        "gslib", "tests/test_data/test_external_account_credentials.json")
    tmpfile = self.CreateTempFile(contents=contents)
    with SetBotoConfigForTest(
        getBotoCredentialsConfig(external_account_creds=tmpfile)):
      client = gcs_json_api.GcsJsonApi(None, None, None, None)
      self.assertIsInstance(client.credentials, WrappedCredentials)

  @mock.patch.object(WrappedCredentials,
                     "__init__",
                     side_effect=ValueError(ERROR_MESSAGE))
  def testExternalAccountFailure(self, _):
    contents = pkgutil.get_data(
        "gslib", "tests/test_data/test_external_account_credentials.json")
    tmpfile = self.CreateTempFile(contents=contents)
    with SetBotoConfigForTest(
        getBotoCredentialsConfig(external_account_creds=tmpfile)):
      with self.assertLogs() as logger:
        with self.assertRaises(Exception) as exc:
          gcs_json_api.GcsJsonApi(None, logging.getLogger(), None, None)
        self.assertIn(ERROR_MESSAGE, str(exc.exception))
        self.assertIn(CredTypes.EXTERNAL_ACCOUNT, logger.output[0])

  def testExternalAccountAuthorizedUserCredential(self):
    contents = pkgutil.get_data(
        "gslib",
        "tests/test_data/test_external_account_authorized_user_credentials.json"
    )
    tmpfile = self.CreateTempFile(contents=contents)
    with SetBotoConfigForTest(
        getBotoCredentialsConfig(
            external_account_authorized_user_creds=tmpfile)):
      client = gcs_json_api.GcsJsonApi(None, None, None, None)
      self.assertIsInstance(client.credentials, WrappedCredentials)

  @mock.patch.object(WrappedCredentials,
                     "__init__",
                     side_effect=ValueError(ERROR_MESSAGE))
  def testExternalAccountAuthorizedUserFailure(self, _):
    contents = pkgutil.get_data(
        "gslib",
        "tests/test_data/test_external_account_authorized_user_credentials.json"
    )
    tmpfile = self.CreateTempFile(contents=contents)
    with SetBotoConfigForTest(
        getBotoCredentialsConfig(
            external_account_authorized_user_creds=tmpfile)):
      with self.assertLogs() as logger:
        with self.assertRaises(Exception) as exc:
          gcs_json_api.GcsJsonApi(None, logging.getLogger(), None, None)
        self.assertIn(ERROR_MESSAGE, str(exc.exception))
        self.assertIn(CredTypes.EXTERNAL_ACCOUNT_AUTHORIZED_USER,
                      logger.output[0])

  def testOauth2ServiceAccountAndOauth2UserCredential(self):
    with SetBotoConfigForTest(
        getBotoCredentialsConfig(user_account_creds="?",
                                 service_account_creds={
                                     "keyfile": "?",
                                     "client_id": "?",
                                 })):
      with self.assertRaises(CommandException):
        gcs_json_api.GcsJsonApi(None, None, None, None)