HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/platform/gsutil/gslib/tests/test_context_config.py
# -*- coding: utf-8 -*-
# Copyright 2020 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for context_config.py."""

from __future__ import absolute_import
from __future__ import print_function
from __future__ import division
from __future__ import unicode_literals

import json
import os
import subprocess
from unittest import mock

import six

from gslib import context_config
from gslib import exception
from gslib.tests import testcase
from gslib.tests.testcase import base
from gslib.tests.util import SetBotoConfigForTest

DEFAULT_CERT_PROVIDER_FILE_CONTENTS = {
    'cert_provider_command': [
        os.path.join('some', 'helper'), '--print_certificate'
    ]
}

DEFAULT_CERT_PROVIDER_FILE_CONTENTS_WITH_SPACE = {
    'cert_provider_command': ['some helper', '--print_certificate']
}

DEFAULT_CERT_PROVIDER_FILE_NO_COMMAND = {'foo': 'foo'}

CERT_SECTION = """-----BEGIN CERTIFICATE-----
LKJHLSDJKFHLEUIORWUYERWEHJHL
KLJHGFDLSJKH(@#*&$)@*#KJHFLKJDSFSD
-----END CERTIFICATE-----
"""
ENCRYPTED_KEY_SECTION = """-----BEGIN ENCRYPTED PRIVATE KEY-----
LKJWE:RUWEORIU)(#*&$@(#$KJHLKDJHF(I*F@YLFHSLDKJFS
-----END ENCRYPTED PRIVATE KEY-----
"""
KEY_SECTION = """-----BEGIN PRIVATE KEY-----
LKJWE:RUWEORIU)(#*&$@(#$KJHLKDJHF(I*F@YLFHSLDKJFS
-----END PRIVATE KEY-----
"""
CERT_ENCRYPTED_KEY_SECTION = CERT_SECTION + ENCRYPTED_KEY_SECTION
PASSWORD = '##invalid-password##'
PASSWORD_SECTION = """
-----BEGIN PASSPHRASE-----
%s
-----END PASSPHRASE-----
""" % PASSWORD
FULL_ENCRYPTED_CERT = CERT_ENCRYPTED_KEY_SECTION + PASSWORD_SECTION
FULL_CERT = CERT_SECTION + KEY_SECTION

BAD_CERT_KEY_EMBEDDED_SECTION = """-----BEGIN CERTIFICATE-----
LKJHLSDJKFHLEUIORWUYERWEHJHL
KLJHGFDLSJKH(@#*&$)@*#KJHFLKJDSFSD
-----BEGIN ENCRYPTED PRIVATE KEY-----
LKJWE:RUWEORIU)(#*&$@(#$KJHLKDJHF(I*F@YLFHSLDKJFS
-----END ENCRYPTED PRIVATE KEY-----
-----END CERTIFICATE-----
"""
BAD_CERT_KEY_MISSING_END = """-----BEGIN CERTIFICATE-----
LKJHLSDJKFHLEUIORWUYERWEHJHL
KLJHGFDLSJKH(@#*&$)@*#KJHFLKJDSFSD
-----END CERTIFICATE-----
-----BEGIN ENCRYPTED PRIVATE KEY-----
LKJWE:RUWEORIU)(#*&$@(#$KJHLKDJHF(I*F@YLFHSLDKJFS
"""
BAD_CERT_KEY_MISSING_BEGIN = """-----END CERTIFICATE-----
-----BEGIN ENCRYPTED PRIVATE KEY-----
LKJWE:RUWEORIU)(#*&$@(#$KJHLKDJHF(I*F@YLFHSLDKJFS
-----END ENCRYPTED PRIVATE KEY-----
"""
BAD_CERT_KEY_MISMATCH = """-----BEGIN CERTIFICATE-----
LKJHLSDJKFHLEUIORWUYERWEHJHL
KLJHGFDLSJKH(@#*&$)@*#KJHFLKJDSFSD
-----END ENCRYPTED PRIVATE KEY-----
"""
CERT_KEY_WITH_COMMENT_AT_BEGIN = """SOMECOMMENTS
-----BEGIN CERTIFICATE-----
LKJHLSDJKFHLEUIORWUYERWEHJHL
KLJHGFDLSJKH(@#*&$)@*#KJHFLKJDSFSD
-----END CERTIFICATE-----
-----BEGIN ENCRYPTED PRIVATE KEY-----
LKJWE:RUWEORIU)(#*&$@(#$KJHLKDJHF(I*F@YLFHSLDKJFS
-----END ENCRYPTED PRIVATE KEY-----
"""
CERT_KEY_WITH_COMMENT_AT_END = """-----BEGIN CERTIFICATE-----
LKJHLSDJKFHLEUIORWUYERWEHJHL
KLJHGFDLSJKH(@#*&$)@*#KJHFLKJDSFSD
-----END CERTIFICATE-----
-----BEGIN ENCRYPTED PRIVATE KEY-----
LKJWE:RUWEORIU)(#*&$@(#$KJHLKDJHF(I*F@YLFHSLDKJFS
-----END ENCRYPTED PRIVATE KEY-----
SOMECOMMENT
"""
CERT_KEY_WITH_COMMENT_IN_BETWEEN = """-----BEGIN CERTIFICATE-----
LKJHLSDJKFHLEUIORWUYERWEHJHL
KLJHGFDLSJKH(@#*&$)@*#KJHFLKJDSFSD
-----END CERTIFICATE-----
SOMECOMMENT
-----BEGIN ENCRYPTED PRIVATE KEY-----
LKJWE:RUWEORIU)(#*&$@(#$KJHLKDJHF(I*F@YLFHSLDKJFS
-----END ENCRYPTED PRIVATE KEY-----
"""

OPEN_TO_PATCH = '__builtin__.open' if six.PY2 else 'builtins.open'


@testcase.integration_testcase.SkipForS3('mTLS only runs on GCS JSON API.')
@testcase.integration_testcase.SkipForXML('mTLS only runs on GCS JSON API.')
class TestPemFileParser(testcase.GsUtilUnitTestCase):
  """Test PEM-format certificate parsing for mTLS."""

  def test_pem_file_with_comment_at_beginning(self):
    sections = context_config._split_pem_into_sections(
        CERT_KEY_WITH_COMMENT_AT_BEGIN, self.logger)
    self.assertEqual(sections['CERTIFICATE'], CERT_SECTION)
    self.assertEqual(sections['ENCRYPTED PRIVATE KEY'], ENCRYPTED_KEY_SECTION)

  def test_pem_file_with_comment_at_end(self):
    sections = context_config._split_pem_into_sections(
        CERT_KEY_WITH_COMMENT_AT_END, self.logger)
    self.assertEqual(sections['CERTIFICATE'], CERT_SECTION)
    self.assertEqual(sections['ENCRYPTED PRIVATE KEY'], ENCRYPTED_KEY_SECTION)

  def test_pem_file_with_comment_in_between(self):
    sections = context_config._split_pem_into_sections(
        CERT_KEY_WITH_COMMENT_IN_BETWEEN, self.logger)
    self.assertEqual(sections['CERTIFICATE'], CERT_SECTION)
    self.assertEqual(sections['ENCRYPTED PRIVATE KEY'], ENCRYPTED_KEY_SECTION)

  def test_pem_file_with_bad_format_embedded_section(self):
    sections = context_config._split_pem_into_sections(
        BAD_CERT_KEY_EMBEDDED_SECTION, self.logger)
    self.assertIsNone(sections.get('CERTIFICATE'))
    self.assertEqual(sections.get('ENCRYPTED PRIVATE KEY'),
                     ENCRYPTED_KEY_SECTION)

  def test_pem_file_with_bad_format_missing_ending(self):
    sections = context_config._split_pem_into_sections(BAD_CERT_KEY_MISSING_END,
                                                       self.logger)
    self.assertEqual(sections.get('CERTIFICATE'), CERT_SECTION)
    self.assertIsNone(sections.get('ENCRYPTED PRIVATE KEY'))

  def test_pem_file_with_bad_format_missing_beginning(self):
    sections = context_config._split_pem_into_sections(
        BAD_CERT_KEY_MISSING_BEGIN, self.logger)
    self.assertIsNone(sections.get('CERTIFICATE'))
    self.assertEqual(sections.get('ENCRYPTED PRIVATE KEY'),
                     ENCRYPTED_KEY_SECTION)

  def test_pem_file_with_bad_format_section_mismatch(self):
    sections = context_config._split_pem_into_sections(BAD_CERT_KEY_MISMATCH,
                                                       self.logger)
    self.assertIsNone(sections.get('CERTIFICATE'))
    self.assertIsNone(sections.get('ENCRYPTED PRIVATE KEY'))


# Setting global context_config singleton causes issues in parallel.
@base.NotParallelizable
@testcase.integration_testcase.SkipForS3('mTLS only runs on GCS JSON API.')
@testcase.integration_testcase.SkipForXML('mTLS only runs on GCS JSON API.')
class TestContextConfig(testcase.GsUtilUnitTestCase):
  """Test the global ContextConfig singleton."""

  def setUp(self):
    super(TestContextConfig, self).setUp()
    self._old_context_config = context_config._singleton_config
    context_config._singleton_config = None

    self.mock_logger = mock.Mock()

  def tearDown(self):
    super(TestContextConfig, self).tearDown()
    context_config._singleton_config = self._old_context_config

  def test_context_config_is_a_singleton(self):
    first = context_config.create_context_config(self.mock_logger)

    with self.assertRaises(
        context_config.ContextConfigSingletonAlreadyExistsError):
      context_config.create_context_config(self.mock_logger)

    second = context_config.get_context_config()
    self.assertEqual(first, second)

  @mock.patch.object(subprocess, 'Popen')
  def test_does_not_provision_if_use_client_certificate_not_true(
      self, mock_Popen):
    context_config.create_context_config(self.mock_logger)
    mock_Popen.assert_not_called()

  @mock.patch('os.path.exists', new=mock.Mock(return_value=True))
  @mock.patch.object(json, 'load', autospec=True)
  @mock.patch.object(subprocess, 'Popen', autospec=True)
  @mock.patch(OPEN_TO_PATCH, new_callable=mock.mock_open)
  def test_executes_provider_command_from_default_file(self, mock_open,
                                                       mock_Popen,
                                                       mock_json_load):
    mock_json_load.side_effect = [DEFAULT_CERT_PROVIDER_FILE_CONTENTS]
    with SetBotoConfigForTest([('Credentials', 'use_client_certificate', 'True')
                              ]):
      # Purposely end execution here to avoid writing a file.
      with self.assertRaises(ValueError):
        context_config.create_context_config(self.mock_logger)

        mock_open.assert_called_with(context_config._DEFAULT_METADATA_PATH)
        mock_Popen.assert_called_once_with(
            os.path.realpath(os.path.join('some', 'helper')),
            '--print_certificate', '--with_passphrase')

  @mock.patch('os.path.exists', new=mock.Mock(return_value=True))
  @mock.patch.object(json, 'load', autospec=True)
  @mock.patch.object(subprocess, 'Popen', autospec=True)
  @mock.patch(OPEN_TO_PATCH, new_callable=mock.mock_open)
  def test_executes_provider_command_with_space_from_default_file(
      self, mock_open, mock_Popen, mock_json_load):
    mock_json_load.side_effect = [
        DEFAULT_CERT_PROVIDER_FILE_CONTENTS_WITH_SPACE
    ]
    with SetBotoConfigForTest([('Credentials', 'use_client_certificate',
                                'True'),
                               ('Credentials', 'cert_provider_command', None)]):
      # Purposely end execution here to avoid writing a file.
      with self.assertRaises(ValueError):
        context_config.create_context_config(self.mock_logger)

        mock_open.assert_called_with(context_config._DEFAULT_METADATA_PATH)
        mock_Popen.assert_called_once_with(os.path.realpath('cert helper'),
                                           '--print_certificate',
                                           '--with_passphrase')

  @mock.patch('os.path.exists', new=mock.Mock(return_value=True))
  @mock.patch.object(json, 'load', autospec=True)
  @mock.patch(OPEN_TO_PATCH, new_callable=mock.mock_open)
  def test_default_provider_no_command_error(self, mock_open, mock_json_load):
    mock_json_load.return_value = DEFAULT_CERT_PROVIDER_FILE_NO_COMMAND

    with SetBotoConfigForTest([('Credentials', 'use_client_certificate',
                                'True'),
                               ('Credentials', 'cert_provider_command', None)]):
      context_config.create_context_config(self.mock_logger)

      mock_open.assert_called_with(context_config._DEFAULT_METADATA_PATH)
      self.mock_logger.error.assert_called_once_with(
          "Failed to provision client certificate: "
          "Client certificate provider command not found.")

  @mock.patch('os.path.exists', new=mock.Mock(return_value=False))
  def test_default_provider_not_found_error(self):
    with SetBotoConfigForTest([
        ('Credentials', 'use_client_certificate', 'True'),
        ('Credentials', 'cert_provider_command', None),
        # Avoids permissions error on Windows tests:
        ('GSUtil', 'state_dir', self.CreateTempDir())
    ]):
      context_config.create_context_config(self.mock_logger)

      self.mock_logger.error.assert_called_once_with(
          "Failed to provision client certificate: "
          "Client certificate provider file not found.")

  @mock.patch.object(json, 'load', autospec=True)
  @mock.patch('os.path.exists', new=mock.Mock(return_value=True))
  @mock.patch(OPEN_TO_PATCH, new_callable=mock.mock_open)
  def test_raises_cert_provision_error_on_json_load_error(
      self, mock_open, mock_json_load):
    mock_json_load.side_effect = ValueError('valueError')
    with SetBotoConfigForTest([('Credentials', 'use_client_certificate',
                                'True'),
                               ('Credentials', 'cert_provider_command', None)]):
      context_config.create_context_config(self.mock_logger)
      mock_open.assert_called_with(context_config._DEFAULT_METADATA_PATH)
      self.mock_logger.error.assert_called_once_with(
          'Failed to provision client certificate: valueError')

  @mock.patch.object(subprocess, 'Popen', autospec=True)
  def test_executes_custom_provider_command_from_boto_config(self, mock_Popen):
    with SetBotoConfigForTest([
        ('Credentials', 'use_client_certificate', 'True'),
        ('Credentials', 'cert_provider_command', 'some/path')
    ]):
      # Purposely end execution here to avoid writing a file.
      with self.assertRaises(ValueError):
        context_config.create_context_config(self.mock_logger)

        mock_Popen.assert_called_once_with(os.path.realpath('some/path'),
                                           stdout=subprocess.PIPE,
                                           stderr=subprocess.PIPE)

  @mock.patch.object(subprocess, 'Popen')
  def test_converts_and_logs_provisioning_cert_provider_unexpected_exit_error(
      self, mock_Popen):
    mock_command_process = mock.Mock()
    mock_command_process.communicate.return_value = (None, 'oh no')
    mock_Popen.return_value = mock_command_process

    with SetBotoConfigForTest([
        ('Credentials', 'use_client_certificate', 'True'),
        ('Credentials', 'cert_provider_command', 'some/path')
    ]):
      context_config.create_context_config(self.mock_logger)
      self.mock_logger.error.assert_called_once_with(
          'Failed to provision client certificate: oh no')

  @mock.patch.object(subprocess, 'Popen')
  def test_converts_and_logs_provisioning_os_error(self, mock_Popen):
    mock_Popen.side_effect = OSError('foobar')

    with SetBotoConfigForTest([
        ('Credentials', 'use_client_certificate', 'True'),
        ('Credentials', 'cert_provider_command', 'some/path')
    ]):
      context_config.create_context_config(self.mock_logger)
      self.mock_logger.error.assert_called_once_with(
          'Failed to provision client certificate: foobar')

  @mock.patch.object(subprocess, 'Popen')
  def test_converts_and_logs_provisioning_external_binary_error(
      self, mock_Popen):
    mock_Popen.side_effect = exception.ExternalBinaryError('foobar')

    with SetBotoConfigForTest([
        ('Credentials', 'use_client_certificate', 'True'),
        ('Credentials', 'cert_provider_command', 'some/path')
    ]):
      context_config.create_context_config(self.mock_logger)
      self.mock_logger.error.assert_called_once_with(
          'Failed to provision client certificate: foobar')

  @mock.patch.object(subprocess, 'Popen')
  def test_converts_and_logs_provisioning_key_error(self, mock_Popen):
    # Mocking f.write would make more sense, but mocking Popen earlier in the
    # function results in much less code and tests the same error handling.
    mock_Popen.side_effect = KeyError('foobar')

    with SetBotoConfigForTest([
        ('Credentials', 'use_client_certificate', 'True'),
        ('Credentials', 'cert_provider_command', 'some/path')
    ]):
      context_config.create_context_config(self.mock_logger)

      unicode_escaped_error_string = "'foobar'" if six.PY3 else "u'foobar'"
      self.mock_logger.error.assert_called_once_with(
          "Failed to provision client certificate:"
          " Invalid output format from certificate provider, no " +
          unicode_escaped_error_string)

  @mock.patch.object(os, 'remove')
  def test_does_not_unprovision_if_no_client_certificate(self, mock_remove):
    context_config.create_context_config(self.mock_logger)
    context_config._singleton_config._unprovision_client_cert()
    mock_remove.assert_not_called()

  @mock.patch.object(os, 'remove')
  def test_handles_and_logs_unprovisioning_os_error(self, mock_remove):
    mock_remove.side_effect = OSError('no')

    context_config.create_context_config(self.mock_logger)
    context_config._singleton_config.client_cert_path = 'some/path'
    context_config._singleton_config._unprovision_client_cert()

    self.mock_logger.error.assert_called_once_with(
        'Failed to remove client certificate: no')

  @mock.patch(OPEN_TO_PATCH, new_callable=mock.mock_open)
  @mock.patch.object(os, 'remove')
  @mock.patch.object(subprocess, 'Popen')
  def test_writes_and_deletes_encrypted_certificate_file_storing_password_to_memory(
      self, mock_Popen, mock_remove, mock_open):
    mock_command_process = mock.Mock()
    mock_command_process.returncode = 0
    mock_command_process.communicate.return_value = (
        FULL_ENCRYPTED_CERT.encode(), None)
    mock_Popen.return_value = mock_command_process

    with SetBotoConfigForTest([
        ('Credentials', 'use_client_certificate', 'True'),
        ('Credentials', 'cert_provider_command', 'path --print_certificate')
    ]):
      # Mock logger argument to avoid atexit hook writing to stderr.
      test_config = context_config.create_context_config(mock.Mock())

      # Test writes certificate file.
      # Can't check whole mock_calls list because SetBotoConfigForTest also
      # uses the mock in Python 3. Should work with any_order=False based on
      # docs description but does not in current environment.
      mock_open.assert_has_calls([
          mock.call(test_config.client_cert_path, 'w+'),
          mock.call().write(CERT_SECTION),
          mock.call().write(ENCRYPTED_KEY_SECTION),
      ],
                                 any_order=True)
      # Test saves certificate password to memory.
      self.assertEqual(context_config._singleton_config.client_cert_password,
                       PASSWORD)
      # Test deletes certificate file.
      context_config._singleton_config._unprovision_client_cert()
      mock_remove.assert_called_once_with(test_config.client_cert_path)

  @mock.patch(OPEN_TO_PATCH, new_callable=mock.mock_open)
  @mock.patch.object(os, 'remove')
  @mock.patch.object(subprocess, 'Popen')
  def test_writes_and_deletes_unencrypted_certificate_file_without_storing_password(
      self, mock_Popen, mock_remove, mock_open):
    """This is the format used by gcloud by default."""
    mock_command_process = mock.Mock()
    mock_command_process.returncode = 0
    mock_command_process.communicate.return_value = (FULL_CERT.encode(), None)
    mock_Popen.return_value = mock_command_process

    with SetBotoConfigForTest([
        ('Credentials', 'use_client_certificate', 'True'),
        ('Credentials', 'cert_provider_command', 'path --print_certificate')
    ]):
      # Mock logger argument to avoid atexit hook writing to stderr.
      test_config = context_config.create_context_config(mock.Mock())

      # Test writes certificate file.
      # Can't check whole mock_calls list because SetBotoConfigForTest also
      # uses the mock in Python 3. Should work with any_order=False based on
      # docs description but does not in current environment.
      mock_open.assert_has_calls([
          mock.call(test_config.client_cert_path, 'w+'),
          mock.call().write(CERT_SECTION),
          mock.call().write(KEY_SECTION),
      ],
                                 any_order=True)
      # Does not save unnecessary password.
      self.assertIsNone(context_config._singleton_config.client_cert_password,
                        PASSWORD)
      # Test deletes certificate file.
      context_config._singleton_config._unprovision_client_cert()
      mock_remove.assert_called_once_with(test_config.client_cert_path)