HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/394/lib/googlecloudsdk/command_lib/auth/enterprise_certificate_config.py
# -*- coding: utf-8 -*- #
# Copyright 2023 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Create ECP configurations."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import enum
import json
import os

from googlecloudsdk.core import config
from googlecloudsdk.core import log
from googlecloudsdk.core.util import files
from googlecloudsdk.core.util import platforms

RESOURCE_TYPE = 'enterprise-certificate-proxy configuration file'


def get_platform_folder():
  sdk_root = config.Paths().sdk_root
  if not sdk_root:
    raise ECPConfigError(
        'Unable to find the SDK root path. The gcloud installation may be'
        ' corrupted.'
    )

  return os.path.join(sdk_root, 'platform', 'enterprise_cert')


def get_bin_folder():
  sdk_bin_path = config.Paths().sdk_bin_path
  if not sdk_bin_path:
    raise ECPConfigError(
        'Unable to find the SDK bin path. The gcloud installation may be'
        ' corrupted.'
    )

  return sdk_bin_path


def get_config_path(output_file):
  if output_file:
    return output_file
  return config.CertConfigDefaultFilePath()


def platform_to_config(platform):
  if not platform:
    platform = platforms.Platform.Current()
  if platform.operating_system == platforms.OperatingSystem.MACOSX:
    return ConfigType.KEYCHAIN
  elif platform.operating_system == platforms.OperatingSystem.LINUX:
    return ConfigType.PKCS11
  elif platform.operating_system == platforms.OperatingSystem.WINDOWS:
    return ConfigType.MYSTORE
  else:
    raise ECPConfigError(
        (
            'Unsupported platform {}. Enterprise Certificate Proxy currently'
            ' only supports OSX, Windows, and Linux.'
        ).format(platform.operating_system)
    )


class ConfigType(enum.Enum):
  PKCS11 = 1
  KEYCHAIN = 2
  MYSTORE = 3
  WORKLOAD = 4


class WindowsBinaryPathConfig(object):

  def __init__(self, ecp, ecp_client, tls_offload):
    self.ecp = ecp if ecp else os.path.join(get_bin_folder(), 'ecp.exe')
    self.ecp_client = (
        ecp_client
        if ecp_client
        else os.path.join(get_platform_folder(), 'libecp.dll')
    )
    self.tls_offload = (
        tls_offload
        if tls_offload
        else os.path.join(get_platform_folder(), 'libtls_offload.dll')
    )


class LinuxPathConfig(object):

  def __init__(self, ecp, ecp_client, tls_offload):
    self.ecp = ecp if ecp else os.path.join(get_bin_folder(), 'ecp')
    self.ecp_client = (
        ecp_client
        if ecp_client
        else os.path.join(get_platform_folder(), 'libecp.so')
    )
    self.tls_offload = (
        tls_offload
        if tls_offload
        else os.path.join(get_platform_folder(), 'libtls_offload.so')
    )


class MacOSBinaryPathConfig(object):

  def __init__(self, ecp, ecp_client, tls_offload):
    self.ecp = ecp if ecp else os.path.join(get_bin_folder(), 'ecp')
    self.ecp_client = (
        ecp_client
        if ecp_client
        else os.path.join(get_platform_folder(), 'libecp.dylib')
    )
    self.tls_offload = (
        tls_offload
        if tls_offload
        else os.path.join(get_platform_folder(), 'libtls_offload.dylib')
    )


class PKCS11Config(object):

  def __init__(self, module, slot, label, user_pin):
    self.module = module
    self.slot = slot
    self.label = label

    if user_pin:
      self.user_pin = user_pin


class KeyChainConfig(object):

  def __init__(self, issuer, keychain_type):
    self.issuer = issuer
    self.keychain_type = keychain_type


class MyStoreConfig(object):

  def __init__(self, issuer, store, provider):
    self.issuer = issuer
    self.store = store
    self.provider = provider


class WorkloadConfig(object):

  def __init__(self, cert_path, key_path):
    self.cert_path = cert_path
    self.key_path = key_path


def create_linux_config(base_config, **kwargs):
  """Creates a Linux ECP Config.

  Args:
    base_config: Optional parameter to use as a fallback for parameters that are
      not set in kwargs.
    **kwargs: Linux config parameters. See go/enterprise-cert-config for valid
      variables.

  Returns:
    A dictionary object containing the ECP config.
  """
  if base_config:
    base_linux_config = base_config.get('cert_configs', {}).get('pkcs11', {})
    base_libs_config = base_config.get('libs', {})
  else:
    base_linux_config = {}
    base_libs_config = {}

  ecp_config = PKCS11Config(
      kwargs.get('module', None) or base_linux_config.get('module', None),
      kwargs.get('slot', None) or base_linux_config.get('slot', 0),
      kwargs.get('label', None) or base_linux_config.get('label', None),
      kwargs.get('user_pin', None) or base_linux_config.get('user_pin', None),
  )
  lib_config = LinuxPathConfig(
      kwargs.get('ecp', None) or base_libs_config.get('ecp', None),
      kwargs.get('ecp_client', None)
      or base_libs_config.get('ecp_client', None),
      kwargs.get('tls_offload', None)
      or base_libs_config.get('tls_offload', None),
  )
  return {'pkcs11': vars(ecp_config)}, {'libs': vars(lib_config)}


def create_macos_config(base_config, **kwargs):
  """Creates a MacOS ECP Config.

  Args:
    base_config: Optional parameter to use as a fallback for parameters that are
      not set in kwargs.
    **kwargs: MacOS config parameters. See go/enterprise-cert-config for valid
      variables.

  Returns:
    A dictionary object containing the ECP config.
  """
  if base_config:
    base_macos_config = base_config['cert_configs']['macos_keychain']
    base_libs_config = base_config['libs']
  else:
    base_macos_config = {}
    base_libs_config = {}

  ecp_config = KeyChainConfig(
      kwargs.get('issuer', None) or base_macos_config.get('issuer', None),
      kwargs.get('keychain_type', 'all')
      or base_macos_config.get('keychain_type', 'all'),
  )
  lib_config = MacOSBinaryPathConfig(
      kwargs.get('ecp', None) or base_libs_config.get('ecp', None),
      kwargs.get('ecp_client', None)
      or base_libs_config.get('ecp_client', None),
      kwargs.get('tls_offload', None)
      or base_libs_config.get('tls_offload', None),
  )
  return {'macos_keychain': vars(ecp_config)}, {'libs': vars(lib_config)}


def create_windows_config(base_config, **kwargs):
  """Creates a Windows ECP Config.

  Args:
    base_config: Optional parameter to use as a fallback for parameters that are
      not set in kwargs.
    **kwargs: Windows config parameters. See go/enterprise-cert-config for valid
      variables.

  Returns:
    A dictionary object containing the ECP config.
  """
  if base_config:
    base_windows_config = base_config['cert_configs']['windows_store']
    base_libs_config = base_config['libs']
  else:
    base_windows_config = {}
    base_libs_config = {}

  ecp_config = MyStoreConfig(
      kwargs.get('issuer', None) or base_windows_config.get('issuer', None),
      kwargs.get('store', None) or base_windows_config.get('store', None),
      kwargs.get('provider', None) or base_windows_config.get('provider', None),
  )
  lib_config = WindowsBinaryPathConfig(
      kwargs.get('ecp', None) or base_libs_config.get('ecp', None),
      kwargs.get('ecp_client', None)
      or base_libs_config.get('ecp_client', None),
      kwargs.get('tls_offload', None)
      or base_libs_config.get('tls_offload', None),
  )
  return {'windows_store': vars(ecp_config)}, {'libs': vars(lib_config)}


def create_workload_config(base_config, **kwargs):
  """Creates a Workload ECP Config.

  Args:
    base_config: Optional parameter to use as a fallback for parameters that are
      not set in kwargs.
    **kwargs: Workload config parameters. See go/enterprise-cert-config for
      valid variables.

  Returns:
    A dictionary object containing the ECP config.
  """
  if base_config:
    base_workload_config = base_config['cert_configs']['workload']
  else:
    base_workload_config = {}

  workload_config = WorkloadConfig(
      kwargs.get('cert_path', None)
      or base_workload_config.get('cert_path', None),
      kwargs.get('key_path', None)
      or base_workload_config.get('key_path', None),
  )

  return {'workload': vars(workload_config)}, {}


def create_ecp_config(config_type, base_config=None, **kwargs):
  """Creates an ECP Config.

  Args:
    config_type: An ConfigType Enum that describes the type of ECP config.
    base_config: Optional parameter to use as a fallback for parameters that are
      not set in kwargs.
    **kwargs: config parameters. See go/enterprise-cert-config for valid
      variables.

  Returns:
    A dictionary object containing the ECP config.
  Raises:
    ECPConfigError: No valid config_type is specified.
  """

  if config_type == ConfigType.PKCS11:
    ecp_config, libs_config = create_linux_config(base_config, **kwargs)
  elif config_type == ConfigType.KEYCHAIN:
    ecp_config, libs_config = create_macos_config(base_config, **kwargs)
  elif config_type == ConfigType.MYSTORE:
    ecp_config, libs_config = create_windows_config(base_config, **kwargs)
  elif config_type == ConfigType.WORKLOAD:
    ecp_config, libs_config = create_workload_config(base_config, **kwargs)
  else:
    raise ECPConfigError(
        (
            'Unknown config_type {} passed to create enterprise certificate'
            ' configuration. Valid options are: [PKCS11, KEYCHAIN, MYSTORE]'
        ).format(config_type)
    )
  return {'cert_configs': ecp_config, **libs_config}


def create_config(config_type, **kwargs):
  """Creates the ECP config based on the passed in CLI arguments."""
  output = create_ecp_config(config_type, None, **kwargs)
  config_path = get_config_path(kwargs.get('output_file', None))

  files.WriteFileContents(config_path, json.dumps(output, indent=2))
  log.CreatedResource(config_path, RESOURCE_TYPE)


def update_config(config_type, **kwargs):
  """Updates the ECP config based on the passed in CLI arguments.

  Args:
    config_type: An ConfigType Enum that describes the type of ECP config.
    **kwargs: config parameters that will be updated. See
      go/enterprise-cert-config for valid variables.

  Only explicit args will overwrite existing values
  """
  config_path = get_config_path(kwargs.get('output_file', None))
  data = files.ReadFileContents(config_path)

  active_config = json.loads(data)
  output = create_ecp_config(config_type, active_config, **kwargs)

  files.WriteFileContents(config_path, json.dumps(output, indent=2))
  log.CreatedResource(config_path, RESOURCE_TYPE)


class ECPConfigError(Exception):

  def __init__(self, message):
    super(ECPConfigError, self).__init__()
    self.message = message