HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/394/platform/gsutil/gslib/tests/test_wrapped_credentials.py
# -*- coding: utf-8 -*-
# Copyright 2022 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for wrapped_credentials.py."""

import datetime
import json
import httplib2

from google.auth import aws
from google.auth import external_account
from google.auth import external_account_authorized_user
from google.auth import identity_pool
from google.auth import pluggable
from gslib.tests import testcase
from gslib.utils.wrapped_credentials import WrappedCredentials
import oauth2client

from six import add_move, MovedModule

add_move(MovedModule("mock", "mock", "unittest.mock"))
from six.moves import mock

ACCESS_TOKEN = "foo"
CONTENT = "content"
RESPONSE = httplib2.Response({
    "content-type": "text/plain",
    "status": "200",
    "content-length": len(CONTENT),
})


class MockCredentials(external_account.Credentials):

  def __init__(self, token=None, expiry=None, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self._audience = None
    self.expiry = expiry
    self.token = None

    def side_effect(*args, **kwargs):
      del args, kwargs  # Unused.
      self.token = token

    self.refresh = mock.Mock(side_effect=side_effect)

  def retrieve_subject_token():
    pass


class HeadersWithAuth(dict):
  """A utility class to use to make sure a set of headers includes specific authentication"""

  def __init__(self, token):
    self.token = token or ""

  def __eq__(self, headers):
    return headers[b"Authorization"] == bytes("Bearer " + self.token, "utf-8")


class TestWrappedCredentials(testcase.GsUtilUnitTestCase):
  """Test logic for interacting with Wrapped Credentials the way we intend to use them."""

  @mock.patch.object(httplib2, "Http", autospec=True)
  def testWrappedCredentialUsage(self, http):
    http.return_value.request.return_value = (RESPONSE, CONTENT)
    req = http.return_value.request

    creds = WrappedCredentials(
        MockCredentials(token=ACCESS_TOKEN,
                        audience="foo",
                        subject_token_type="bar",
                        token_url="https://sts.googleapis.com",
                        credential_source="qux"))

    http = oauth2client.transport.get_http_object()
    creds.authorize(http)
    _, content = http.request(uri="google.com")
    self.assertEqual(content, CONTENT)
    creds._base.refresh.assert_called_once_with(mock.ANY)

    # Make sure the default request gets called with the correct token.
    req.assert_called_once_with("google.com",
                                method="GET",
                                headers=HeadersWithAuth(ACCESS_TOKEN),
                                body=None,
                                connection_type=mock.ANY,
                                redirections=mock.ANY)

  def testWrappedCredentialSerialization(self):
    """Test logic for converting Wrapped Credentials to and from JSON for serialization."""
    creds = WrappedCredentials(
        identity_pool.Credentials(audience="foo",
                                  subject_token_type="bar",
                                  token_url="https://sts.googleapis.com",
                                  credential_source={"url": "google.com"}))
    creds.access_token = ACCESS_TOKEN
    creds.token_expiry = datetime.datetime(2001, 12, 5, 0, 0)
    creds_json = creds.to_json()
    json_values = json.loads(creds_json)
    self.assertEqual(json_values["client_id"], "foo")
    self.assertEqual(json_values['access_token'], ACCESS_TOKEN)
    self.assertEqual(json_values['token_expiry'], "2001-12-05T00:00:00Z")
    self.assertEqual(json_values["_base"]["audience"], "foo")
    self.assertEqual(json_values["_base"]["subject_token_type"], "bar")
    self.assertEqual(json_values["_base"]["token_url"],
                     "https://sts.googleapis.com")
    self.assertEqual(json_values["_base"]["credential_source"]["url"],
                     "google.com")

    creds2 = WrappedCredentials.from_json(creds_json)
    self.assertIsInstance(creds2, WrappedCredentials)
    self.assertIsInstance(creds2._base, identity_pool.Credentials)
    self.assertEqual(creds2.client_id, "foo")
    self.assertEqual(creds2.access_token, ACCESS_TOKEN)
    self.assertEqual(creds2.token_expiry, creds.token_expiry)

  def testWrappedCredentialSerializationMissingKeywords(self):
    """Test logic for creating a Wrapped Credentials using keywords that exist in IdentityPool but not AWS."""
    creds = WrappedCredentials.from_json(
        json.dumps({
            "client_id": "foo",
            "access_token": ACCESS_TOKEN,
            "token_expiry": "2001-12-05T00:00:00Z",
            "_base": {
                "type": "external_account",
                "audience": "foo",
                "subject_token_type": "bar",
                "token_url": "https://sts.googleapis.com",
                "credential_source": {
                    "url": "google.com",
                    "workforce_pool_user_project": "1234567890"
                }
            }
        }))

    self.assertIsInstance(creds, WrappedCredentials)
    self.assertIsInstance(creds._base, identity_pool.Credentials)

  @mock.patch.object(httplib2, "Http", autospec=True)
  def testWrappedCredentialUsageExternalAccountAuthorizedUser(self, http):
    http.return_value.request.return_value = (RESPONSE, CONTENT)
    req = http.return_value.request

    creds = WrappedCredentials(
        external_account_authorized_user.Credentials(
            audience=
            "//iam.googleapis.com/locations/global/workforcePools/$WORKFORCE_POOL_ID/providers/$PROVIDER_ID",
            refresh_token="refreshToken",
            token_url="https://sts.googleapis.com/v1/oauth/token",
            token_info_url="https://sts.googleapis.com/v1/instrospect",
            client_id="clientId",
            client_secret="clientSecret"))

    def _refresh_token_side_effect(*args, **kwargs):
      del args, kwargs  # Unused.
      creds._base.token = ACCESS_TOKEN

    creds._base.refresh = mock.Mock(side_effect=_refresh_token_side_effect)

    http = oauth2client.transport.get_http_object()
    creds.authorize(http)
    _, content = http.request(uri="google.com")
    self.assertEqual(content, CONTENT)
    creds._base.refresh.assert_called_once_with(mock.ANY)

    # Make sure the default request gets called with the correct token.
    req.assert_called_once_with("google.com",
                                method="GET",
                                headers=HeadersWithAuth(ACCESS_TOKEN),
                                body=None,
                                connection_type=mock.ANY,
                                redirections=mock.ANY)

  def testWrappedCredentialSerializationExternalAccountAuthorizedUser(self):
    """Test logic for converting Wrapped Credentials to and from JSON for serialization."""
    creds = WrappedCredentials(
        external_account_authorized_user.Credentials(
            audience=
            "//iam.googleapis.com/locations/global/workforcePools/$WORKFORCE_POOL_ID/providers/$PROVIDER_ID",
            refresh_token="refreshToken",
            token_url="https://sts.googleapis.com/v1/oauth/token",
            token_info_url="https://sts.googleapis.com/v1/instrospect",
            client_id="clientId",
            client_secret="clientSecret"))
    creds.access_token = ACCESS_TOKEN
    creds.token_expiry = datetime.datetime(2001, 12, 5, 0, 0)
    creds_json = creds.to_json()
    json_values = json.loads(creds_json)
    expected_json_values = {
        "_class": "WrappedCredentials",
        "_module": "gslib.utils.wrapped_credentials",
        "client_id": "clientId",
        "access_token": ACCESS_TOKEN,
        "token_expiry": "2001-12-05T00:00:00Z",
        "client_secret": "clientSecret",
        "refresh_token": "refreshToken",
        "id_token": None,
        "id_token_jwt": None,
        "invalid": False,
        "revoke_uri": None,
        "scopes": [],
        "token_info_uri": None,
        "token_response": None,
        "token_uri": None,
        "user_agent": None,
        "_base": {
            "type":
                "external_account_authorized_user",
            "audience":
                "//iam.googleapis.com/locations/global/workforcePools/$WORKFORCE_POOL_ID/providers/$PROVIDER_ID",
            "token":
                ACCESS_TOKEN,
            "expiry":
                "2001-12-05T00:00:00Z",
            "token_url":
                "https://sts.googleapis.com/v1/oauth/token",
            "token_info_url":
                "https://sts.googleapis.com/v1/instrospect",
            "refresh_token":
                "refreshToken",
            "client_id":
                "clientId",
            "client_secret":
                "clientSecret",
            "universe_domain":
                "googleapis.com",
        }
    }
    self.assertEqual(json_values, expected_json_values)

    creds2 = WrappedCredentials.from_json(creds_json)
    self.assertIsInstance(creds2, WrappedCredentials)
    self.assertIsInstance(creds2._base,
                          external_account_authorized_user.Credentials)
    self.assertEqual(creds2.client_id, "clientId")

  def testFromJsonAWSCredentials(self):
    creds = WrappedCredentials.from_json(
        json.dumps({
            "_base": {
                "audience":
                    "//iam.googleapis.com/projects/123456/locations/global/workloadIdentityPools/POOL_ID/providers/PROVIDER_ID",
                "credential_source": {
                    "environment_id":
                        "aws1",
                    "region_url":
                        "http://169.254.169.254/latest/meta-data/placement/availability-zone",
                    "regional_cred_verification_url":
                        "https://sts.{region}.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15",
                    "url":
                        "http://169.254.169.254/latest/meta-data/iam/security-credentials"
                },
                "service_account_impersonation_url":
                    "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/service-1234@service-name.iam.gserviceaccount.com:generateAccessToken",
                "subject_token_type":
                    "urn:ietf:params:aws:token-type:aws4_request",
                "token_url":
                    "https://sts.googleapis.com/v1/token",
                "type":
                    "external_account"
            }
        }))

    self.assertIsInstance(creds, WrappedCredentials)
    self.assertIsInstance(creds._base, external_account.Credentials)
    self.assertIsInstance(creds._base, aws.Credentials)

  def testFromJsonFileBasedCredentials(self):
    creds = WrappedCredentials.from_json(
        json.dumps({
            "_base": {
                "audience":
                    "//iam.googleapis.com/projects/123456/locations/global/workloadIdentityPools/POOL_ID/providers/PROVIDER_ID",
                "credential_source": {
                    "file": "/var/run/secrets/goog.id/token"
                },
                "service_account_impersonation_url":
                    "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/service-1234@service-name.iam.gserviceaccount.com:generateAccessToken",
                "subject_token_type":
                    "urn:ietf:params:oauth:token-type:jwt",
                "token_url":
                    "https://sts.googleapis.com/v1/token",
                "type":
                    "external_account"
            }
        }))

    self.assertIsInstance(creds, WrappedCredentials)
    self.assertIsInstance(creds._base, external_account.Credentials)
    self.assertIsInstance(creds._base, identity_pool.Credentials)

  def testFromJsonPluggableCredentials(self):
    creds = WrappedCredentials.from_json(
        json.dumps({
            "_base": {
                "audience":
                    "//iam.googleapis.com/projects/123456/locations/global/workloadIdentityPools/POOL_ID/providers/PROVIDER_ID",
                "credential_source": {
                    "executable": {
                        "command": "/path/to/command.sh"
                    }
                },
                "service_account_impersonation_url":
                    "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/service-1234@service-name.iam.gserviceaccount.com:generateAccessToken",
                "subject_token_type":
                    "urn:ietf:params:oauth:token-type:jwt",
                "token_url":
                    "https://sts.googleapis.com/v1/token",
                "type":
                    "external_account"
            }
        }))

    self.assertIsInstance(creds, WrappedCredentials)
    self.assertIsInstance(creds._base, external_account.Credentials)
    self.assertIsInstance(creds._base, pluggable.Credentials)

  def testFromJsonExternalAccountAuthorizedUserCredentials(self):
    creds = WrappedCredentials.from_json(
        json.dumps({
            "_base": {
                "type":
                    "external_account_authorized_user",
                "audience":
                    "//iam.googleapis.com/locations/global/workforcePools/$WORKFORCE_POOL_ID/providers/$PROVIDER_ID",
                "refresh_token":
                    "refreshToken",
                "token_url":
                    "https://sts.googleapis.com/v1/oauth/token",
                "token_info_url":
                    "https://sts.googleapis.com/v1/instrospect",
                "client_id":
                    "clientId",
                "client_secret":
                    "clientSecret",
            }
        }))

    self.assertIsInstance(creds, WrappedCredentials)
    self.assertIsInstance(creds._base,
                          external_account_authorized_user.Credentials)