HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/396/lib/googlecloudsdk/command_lib/iam/byoid_utilities/cred_config.py
# -*- coding: utf-8 -*- #
# Copyright 2021 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Generators for Credential Config Files."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import abc
import enum
import json

from googlecloudsdk.command_lib.auth import enterprise_certificate_config
from googlecloudsdk.core import log
from googlecloudsdk.core import properties
from googlecloudsdk.core.util import files
import six


class ConfigType(enum.Enum):
  WORKLOAD_IDENTITY_POOLS = 1
  WORKFORCE_POOLS = 2


class ByoidEndpoints(object):
  """Base class for BYOID endpoints."""

  def __init__(
      self,
      service,
      enable_mtls=False,
      universe_domain='googleapis.com',
      sts_location='',
  ):
    # TODO: b/444042857 - Remove this check and add support for mTLS with
    # locational STS endpoints when it is GA-ed.
    if enable_mtls and sts_location and sts_location != 'global':
      raise GeneratorError(
          'mTLS is not supported with locational Security Token Service'
          ' endpoints.'
      )
    self._sts_global_template = 'https://{service}.{mtls}{universe}'
    self._sts_locational_template = (
        'https://{service}.{sts_location}.rep.{universe}'
    )
    self._service = service
    self._mtls = 'mtls.' if enable_mtls else ''
    self._universe_domain = universe_domain
    self._sts_location = sts_location

  @property
  def _base_url(self):
    if not self._sts_location or self._sts_location == 'global':
      return self._sts_global_template.format(
          service=self._service, mtls=self._mtls, universe=self._universe_domain
      )
    return self._sts_locational_template.format(
        service=self._service,
        sts_location=self._sts_location,
        universe=self._universe_domain,
    )


class StsEndpoints(ByoidEndpoints):
  """Simple class to build STS endpoints."""

  def __init__(self, **kwargs):
    super(StsEndpoints, self).__init__('sts', **kwargs)

  @property
  def token_url(self):
    api = 'v1/token'
    return '{}/{}'.format(self._base_url, api)

  @property
  def oauth_token_url(self):
    api = 'v1/oauthtoken'
    return '{}/{}'.format(self._base_url, api)

  @property
  def token_info_url(self):
    api = 'v1/introspect'
    return '{}/{}'.format(self._base_url, api)


class IamEndpoints(ByoidEndpoints):
  """Simple class to build IAM Credential endpoints."""

  def __init__(self, service_account, **kwargs):
    self._service_account = service_account
    super(IamEndpoints, self).__init__('iamcredentials', **kwargs)

  @property
  def impersonation_url(self):
    api = 'v1/projects/-/serviceAccounts/{}:generateAccessToken'.format(
        self._service_account
    )
    return '{}/{}'.format(self._base_url, api)


RESOURCE_TYPE = 'credential configuration file'


def create_credential_config(args, config_type):
  """Creates the byoid credential config based on CLI arguments."""
  # If a certificate path was provided, enable mtls by default.
  is_cert = getattr(args, 'credential_cert_path', None) is not None
  enable_mtls = getattr(args, 'enable_mtls', False)

  sts_location = getattr(args, 'sts_location', '')

  # If a certificate path was provided, mtls must be enabled.
  if is_cert:
    if not enable_mtls and hasattr(args, 'enable_mtls'):
      raise GeneratorError(
          'Cannot disable mTLS when a certificate path is provided.'
      )
    enable_mtls = True

  # Take universe_domain into account.
  universe_domain_property = properties.VALUES.core.universe_domain
  if getattr(args, 'universe_domain', None):
    # Universe_domain arg takes precedence.
    universe_domain = args.universe_domain
  elif universe_domain_property.IsExplicitlySet():
    universe_domain = universe_domain_property.Get()
  else:
    universe_domain = properties.VALUES.core.universe_domain.default

  token_endpoint_builder = StsEndpoints(
      enable_mtls=enable_mtls,
      universe_domain=universe_domain,
      sts_location=sts_location,
  )

  try:
    generator = get_generator(args, config_type)
    output = {
        'universe_domain': universe_domain,
        'type': 'external_account',
        'audience': '//iam.googleapis.com/' + args.audience,
        'subject_token_type': generator.get_token_type(args.subject_token_type),
        'token_url': token_endpoint_builder.token_url,
        'credential_source': generator.get_source(args),
    }

    if config_type is ConfigType.WORKFORCE_POOLS:
      output['workforce_pool_user_project'] = args.workforce_pool_user_project

    if args.service_account:
      sa_endpoint_builder = IamEndpoints(
          args.service_account,
          enable_mtls=enable_mtls,
          universe_domain=universe_domain,
      )
      output['service_account_impersonation_url'] = (
          sa_endpoint_builder.impersonation_url
      )

      service_account_impersonation = {}

      if args.service_account_token_lifetime_seconds:
        service_account_impersonation['token_lifetime_seconds'] = (
            args.service_account_token_lifetime_seconds
        )
        output['service_account_impersonation'] = service_account_impersonation
    else:
      output['token_info_url'] = token_endpoint_builder.token_info_url

    files.WriteFileContents(args.output_file, json.dumps(output, indent=2))
    log.CreatedResource(args.output_file, RESOURCE_TYPE)

    # If the credential type is X.509, we need to create an additional
    # certificate config file to store the certificate information.
    if isinstance(generator, X509CredConfigGenerator):
      enterprise_certificate_config.create_config(
          enterprise_certificate_config.ConfigType.WORKLOAD,
          cert_path=args.credential_cert_path,
          key_path=args.credential_cert_private_key_path,
          output_file=args.credential_cert_configuration_output_file,
          trust_chain_path=args.credential_cert_trust_chain_path,
      )

  except GeneratorError as cce:
    log.CreatedResource(args.output_file, RESOURCE_TYPE, failed=cce.message)


def get_generator(args, config_type):
  """Determines the type of credential output based on CLI arguments."""
  if args.credential_source_file:
    return FileCredConfigGenerator(config_type, args.credential_source_file)
  if args.credential_source_url:
    return UrlCredConfigGenerator(config_type, args.credential_source_url,
                                  args.credential_source_headers)
  if args.executable_command:
    if hasattr(args, 'executable_interactive_timeout_millis'
              ) and args.executable_interactive_timeout_millis:
      return InteractiveExecutableCredConfigGenerator(
          config_type, args.executable_command, args.executable_timeout_millis,
          args.executable_output_file,
          args.executable_interactive_timeout_millis)

    return ExecutableCredConfigGenerator(config_type, args.executable_command,
                                         args.executable_timeout_millis,
                                         args.executable_output_file)
  if args.aws:
    return AwsCredConfigGenerator()
  if args.azure:
    return AzureCredConfigGenerator(args.app_id_uri, args.audience)
  if args.credential_cert_path:
    return X509CredConfigGenerator(
        args.credential_cert_path,
        args.credential_cert_private_key_path,
        args.credential_cert_configuration_output_file,
        args.credential_cert_trust_chain_path,
    )


class CredConfigGenerator(six.with_metaclass(abc.ABCMeta, object)):
  """Base class for generating Credential Config files."""

  def __init__(self, config_type):
    self.config_type = config_type

  def get_token_type(self, subject_token_type):
    """Returns the type of token that this credential config uses."""

    default_token_type = 'urn:ietf:params:oauth:token-type:jwt'
    if self.config_type is ConfigType.WORKFORCE_POOLS:
      default_token_type = 'urn:ietf:params:oauth:token-type:id_token'

    return subject_token_type or default_token_type

  def _get_format(self, credential_source_type, credential_source_field_name):
    """Returns an optional dictionary indicating the format of the token.

    This is a shared method, that several different token types need access to.

    Args:
      credential_source_type: The format of the token, either 'json' or 'text'.
      credential_source_field_name: The field name of a JSON object containing
        the text version of the token.

    Raises:
      GeneratorError: if an invalid token format is specified, or no field name
      is specified for a json token.

    """
    if not credential_source_type:
      return None

    credential_source_type = credential_source_type.lower()
    if credential_source_type not in ('json', 'text'):
      raise GeneratorError(
          '--credential-source-type must be either "json" or "text"')

    token_format = {'type': credential_source_type}
    if credential_source_type == 'json':
      if not credential_source_field_name:
        raise GeneratorError(
            '--credential-source-field-name required for JSON formatted tokens')
      token_format['subject_token_field_name'] = credential_source_field_name

    return token_format

  def _format_already_defined(self, credential_source_type):
    if credential_source_type:
      raise GeneratorError(
          '--credential-source-type is not supported with --azure or --aws')

  @abc.abstractmethod
  def get_source(self, args):
    """Gets the credential source info used for this credential config."""
    pass


class FileCredConfigGenerator(CredConfigGenerator):
  """The generator for File-based credential configs."""

  def __init__(self, config_type, credential_source_file):
    super(FileCredConfigGenerator, self).__init__(config_type)
    self.credential_source_file = credential_source_file

  def get_source(self, args):
    credential_source = {'file': self.credential_source_file}
    token_format = self._get_format(args.credential_source_type,
                                    args.credential_source_field_name)
    if token_format:
      credential_source['format'] = token_format
    return credential_source


class UrlCredConfigGenerator(CredConfigGenerator):
  """The generator for Url-based credential configs."""

  def __init__(self, config_type, credential_source_url,
               credential_source_headers):
    super(UrlCredConfigGenerator, self).__init__(config_type)
    self.credential_source_url = credential_source_url
    self.credential_source_headers = credential_source_headers

  def get_source(self, args):
    credential_source = {'url': self.credential_source_url}
    if self.credential_source_headers:
      credential_source['headers'] = self.credential_source_headers
    token_format = self._get_format(args.credential_source_type,
                                    args.credential_source_field_name)
    if token_format:
      credential_source['format'] = token_format
    return credential_source


class ExecutableCredConfigGenerator(CredConfigGenerator):
  """The generator for executable-command-based credentials configs."""

  def __init__(self, config_type, command, timeout_millis, output_file):
    if timeout_millis:
      timeout_millis = int(timeout_millis)

    super(ExecutableCredConfigGenerator, self).__init__(config_type)
    self.command = command
    self.timeout_millis = timeout_millis or 30000  # default to 30s
    self.output_file = output_file

  def get_source(self, args):
    executable_config = {
        'command': self.command,
        'timeout_millis': self.timeout_millis
    }

    if self.output_file:
      executable_config['output_file'] = self.output_file

    return {'executable': executable_config}


class InteractiveExecutableCredConfigGenerator(ExecutableCredConfigGenerator):
  """The generator for executable-command-based credentials configs with interactive mode."""

  def __init__(self, config_type, command, timeout_millis, output_file,
               interactive_timeout_millis):
    super(InteractiveExecutableCredConfigGenerator,
          self).__init__(config_type, command, timeout_millis, output_file)
    self.interactive_timeout_millis = int(interactive_timeout_millis)

  def get_source(self, args):

    if not self.output_file:
      raise GeneratorError('--executable-output-file must be specified if ' +
                           '--interactive-timeout-millis is provided.')

    executable_config = {
        'command': self.command,
        'timeout_millis': self.timeout_millis,
        'output_file': self.output_file,
        'interactive_timeout_millis': self.interactive_timeout_millis
    }
    return {'executable': executable_config}


class AwsCredConfigGenerator(CredConfigGenerator):
  """The generator for AWS-based credential configs."""

  def __init__(self):
    super(AwsCredConfigGenerator,
          self).__init__(ConfigType.WORKLOAD_IDENTITY_POOLS)

  def get_token_type(self, subject_token_type):
    return 'urn:ietf:params:aws:token-type:aws4_request'

  def get_source(self, args):
    self._format_already_defined(args.credential_source_type)
    credential_source = {
        'environment_id':
            'aws1',
        'region_url':
            'http://169.254.169.254/latest/meta-data/placement/availability-zone',
        'url':
            'http://169.254.169.254/latest/meta-data/iam/security-credentials',
        'regional_cred_verification_url':
            'https://sts.{region}.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15'
    }

    if args.enable_imdsv2:
      credential_source['imdsv2_session_token_url'] = (
          'http://169.254.169.254/latest/api/token'
      )

    return credential_source


class AzureCredConfigGenerator(CredConfigGenerator):
  """The generator for Azure-based credential configs."""

  def __init__(self, app_id_uri, audience):
    super(AzureCredConfigGenerator,
          self).__init__(ConfigType.WORKLOAD_IDENTITY_POOLS)
    self.app_id_uri = app_id_uri
    self.audience = audience

  def get_token_type(self, subject_token_type):
    return 'urn:ietf:params:oauth:token-type:jwt'

  def get_source(self, args):
    self._format_already_defined(args.credential_source_type)
    return {
        'url':
            'http://169.254.169.254/metadata/identity/oauth2/token?api-version=2018-02-01&resource='
            +
            (self.app_id_uri or 'https://iam.googleapis.com/' + self.audience),
        'headers': {
            'Metadata': 'True'
        },
        'format': {
            'type': 'json',
            'subject_token_field_name': 'access_token'
        }
    }


class X509CredConfigGenerator(CredConfigGenerator):
  """The generator for X.509-based credential configs."""

  def __init__(self,
               certificate_path,
               key_path,
               cert_config_path,
               trust_chain_path):
    super(X509CredConfigGenerator,
          self).__init__(ConfigType.WORKLOAD_IDENTITY_POOLS)

    self.certificate_path = certificate_path
    self.key_path = key_path
    self.cert_config_path = cert_config_path
    self.trust_chain_path = trust_chain_path

  def get_token_type(self, subject_token_type):
    return 'urn:ietf:params:oauth:token-type:mtls'

  def get_source(self, args):
    certificate_config = {}

    if self.key_path is None:
      raise GeneratorError(
          '--credential-cert-private-key-path must be specified if'
          ' --credential-cert-path '
          + 'is provided.'
      )

    if self.cert_config_path is not None:
      certificate_config['certificate_config_location'] = self.cert_config_path
    else:
      certificate_config['use_default_certificate_config'] = True

    if self.trust_chain_path is not None:
      certificate_config['trust_chain_path'] = self.trust_chain_path

    return {'certificate': certificate_config}


class GeneratorError(Exception):

  def __init__(self, message):
    super(GeneratorError, self).__init__()
    self.message = message