HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/394/lib/googlecloudsdk/command_lib/anthos/anthoscli_backend.py
# -*- coding: utf-8 -*- #
# Copyright 2019 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Anthos command library functions and utilities for the anthoscli binary."""
from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import base64
import copy
import json
import os

from googlecloudsdk.command_lib.anthos import flags
from googlecloudsdk.command_lib.anthos.common import file_parsers
from googlecloudsdk.command_lib.anthos.common import messages
from googlecloudsdk.command_lib.util.anthos import binary_operations
from googlecloudsdk.core import exceptions as c_except
from googlecloudsdk.core import log
from googlecloudsdk.core.console import console_io
from googlecloudsdk.core.credentials import store as c_store
from googlecloudsdk.core.util import files
from googlecloudsdk.core.util import platforms

import requests
import six
from six.moves import urllib

DEFAULT_ENV_ARGS = {'COBRA_SILENCE_USAGE': 'true'}

DEFAULT_LOGIN_CONFIG_PATH = {
    platforms.OperatingSystem.LINUX.id:
        '~/.config/google/anthos/kubectl-anthos-config.yaml',
    platforms.OperatingSystem.MACOSX.id:
        '~/Library/Preferences/google/anthos/kubectl-anthos-config.yaml',
    platforms.OperatingSystem.WINDOWS.id:
        os.path.join('%APPDATA%', 'google', 'anthos',
                     'kubectl-anthos-config.yaml')
}


def GetEnvArgsForCommand(extra_vars=None, exclude_vars=None):
  """Return an env dict to be passed on command invocation."""
  env = copy.deepcopy(os.environ)
  env.update(DEFAULT_ENV_ARGS)
  if extra_vars:
    env.update(extra_vars)
  if exclude_vars:
    for k in exclude_vars:
      env.pop(k)
  return env


class AnthosAuthException(c_except.Error):
  """Base Exception for auth issues raised by gcloud anthos surface."""


def RelativePkgPathFromFullPath(path):
  """Splits full path into relative(basename) path and parent dir."""
  normpath = os.path.normpath(path)
  rel_path = os.path.basename(normpath)
  parent_dir = os.path.dirname(normpath) or rel_path
  return rel_path, parent_dir


class AnthosCliWrapper(binary_operations.StreamingBinaryBackedOperation):
  """Binary operation wrapper for anthoscli commands."""

  def __init__(self, **kwargs):
    custom_errors = {
        'MISSING_EXEC': messages.MISSING_BINARY.format(binary='anthoscli')
    }
    super(AnthosCliWrapper, self).__init__(
        binary='anthoscli', custom_errors=custom_errors, **kwargs)

  def _ParseGetArgs(self, repo_uri, local_dest, file_pattern=None, **kwargs):
    del kwargs  # Not Used Here
    exec_args = ['get', repo_uri, local_dest]

    if file_pattern:
      exec_args.extend(['--pattern', file_pattern])

    return exec_args

  def _ParseUpdateArgs(self,
                       local_dir,
                       repo_uri=None,
                       strategy=None,
                       dry_run=False,
                       verbose=False,
                       **kwargs):
    del kwargs  # Not Used here

    exec_args = ['update', local_dir]
    if repo_uri:
      exec_args.extend(['--repo', repo_uri])

    if dry_run:
      exec_args.append('--dry-run')

    if strategy:
      exec_args.extend(['--strategy', strategy])

    if verbose:
      exec_args.append('--verbose')

    return exec_args

  def _ParseDescribeArgs(self, local_dir, **kwargs):
    del kwargs  # Not Used here
    return ['desc', local_dir]

  def _ParseTags(self, tags):
    return ','.join(['{}={}'.format(x, y) for x, y in six.iteritems(tags)])

  def _ParseInitArgs(self,
                     local_dir,
                     description=None,
                     name=None,
                     tags=None,
                     info_url=None,
                     **kwargs):
    del kwargs  # Not Used here
    package_path = local_dir

    if not package_path.endswith('/'):
      package_path += '/'
    exec_args = ['init', package_path]

    if description:
      exec_args.extend(['--description', description])
    if name:
      exec_args.extend(['--name', name])
    if tags:
      exec_args.extend(['--tag', self._ParseTags(tags)])

    if info_url:
      exec_args.extend(['--url', info_url])

    return exec_args

  def _ParseApplyArgs(self, apply_dir, project, **kwargs):
    del kwargs  # Not Used Here
    exec_args = ['apply', '-f', apply_dir, '--project', project]

    return exec_args

  def _ParseExportArgs(self, cluster, project, location, output_dir, **kwargs):
    del kwargs  # Not Used here

    exec_args = ['export', '-c', cluster, '--project', project]
    if location:
      exec_args.extend(['--location', location])

    if output_dir:
      exec_args.extend(['--output-directory', output_dir])

    return exec_args

  def _ParseArgsForCommand(self, command, **kwargs):
    if command == 'get':
      return self._ParseGetArgs(**kwargs)
    if command == 'update':
      return self._ParseUpdateArgs(**kwargs)
    if command == 'desc':
      return self._ParseDescribeArgs(**kwargs)
    if command == 'init':
      return self._ParseInitArgs(**kwargs)
    if command == 'apply':
      return self._ParseApplyArgs(**kwargs)
    if command == 'export':
      return self._ParseExportArgs(**kwargs)

    raise binary_operations.InvalidOperationForBinary(
        'Invalid Operation [{}] for anthoscli'.format(command))


def GetAuthToken(account, operation, impersonated=False):
  """Generate a JSON object containing the current gcloud auth token."""
  try:
    access_token = c_store.GetFreshAccessToken(
        account, allow_account_impersonation=impersonated)
    output = {
        'auth_token': access_token,
    }
  except Exception as e:  # pylint: disable=broad-except
    raise AnthosAuthException(
        'Error retrieving auth credentials for {operation}: {error}. '.format(
            operation=operation, error=e))
  return json.dumps(output, sort_keys=True)


class AnthosAuthWrapper(binary_operations.StreamingBinaryBackedOperation):
  """Binary operation wrapper for anthoscli commands."""

  def __init__(self, **kwargs):
    custom_errors = {
        'MISSING_EXEC':
            messages.MISSING_AUTH_BINARY.format(binary='kubectl-anthos')
    }
    super(AnthosAuthWrapper, self).__init__(
        binary='kubectl-anthos', custom_errors=custom_errors, **kwargs)

  @property
  def default_config_path(self):
    return files.ExpandHomeAndVars(
        DEFAULT_LOGIN_CONFIG_PATH[platforms.OperatingSystem.Current().id])

  def _ParseLoginArgs(
      self,
      cluster,
      kube_config=None,
      login_config=None,
      login_config_cert=None,
      user=None,
      ldap_user=None,
      ldap_pass=None,
      dry_run=None,
      preferred_auth=None,
      server_url=None,
      no_browser=None,
      remote_bootstrap=None,
      **kwargs
  ):
    del kwargs  # Not Used Here
    exec_args = ['login']
    if cluster:
      exec_args.extend(['--cluster', cluster])
    if kube_config:
      exec_args.extend(['--kubeconfig', kube_config])
    if login_config:
      exec_args.extend(['--login-config', login_config])
    if login_config_cert:
      exec_args.extend(['--login-config-cert', login_config_cert])
    if user:
      exec_args.extend(['--user', user])
    if dry_run:
      exec_args.extend(['--dry-run'])
    if ldap_pass and ldap_user:
      exec_args.extend(
          ['--ldap-username', ldap_user, '--ldap-password', ldap_pass])
    if preferred_auth:
      exec_args.extend(['--preferred-auth', preferred_auth])
    if server_url:
      exec_args.extend(['--server', server_url])
    if no_browser:
      exec_args.extend(['--remote-login'])
    if remote_bootstrap:
      exec_args.extend(['--remote-bootstrap', remote_bootstrap])

    return exec_args

  def _ParseCreateLoginConfigArgs(self,
                                  kube_config,
                                  output_file=None,
                                  merge_from=None,
                                  **kwargs):
    del kwargs  # Not Used Here
    exec_args = ['create-login-config']
    exec_args.extend(['--kubeconfig', kube_config])
    if output_file:
      exec_args.extend(['--output', output_file])
    if merge_from:
      exec_args.extend(['--merge-from', merge_from])
    return exec_args

  def _ParseTokenArgs(self, token_type, cluster, aws_sts_region, id_token,
                      access_token, access_token_expiry, refresh_token,
                      client_id, client_secret, idp_certificate_authority_data,
                      idp_issuer_url, kubeconfig_path, user, **kwargs):
    del kwargs  # Not Used Here
    exec_args = ['token']
    if token_type:
      exec_args.extend(['--type', token_type])
    if cluster:
      exec_args.extend(['--cluster', cluster])
    if aws_sts_region:
      exec_args.extend(['--aws-sts-region', aws_sts_region])
    if id_token:
      exec_args.extend(['--id-token', id_token])
    if access_token:
      exec_args.extend(['--access-token', access_token])
    if access_token_expiry:
      exec_args.extend(['--access-token-expiry', access_token_expiry])
    if refresh_token:
      exec_args.extend(['--refresh-token', refresh_token])
    if client_id:
      exec_args.extend(['--client-id', client_id])
    if client_secret:
      exec_args.extend(['--client-secret', client_secret])
    if idp_certificate_authority_data:
      exec_args.extend(
          ['--idp-certificate-authority-data', idp_certificate_authority_data])
    if idp_issuer_url:
      exec_args.extend(['--idp-issuer-url', idp_issuer_url])
    if kubeconfig_path:
      exec_args.extend(['--kubeconfig-path', kubeconfig_path])
    if user:
      exec_args.extend(['--user', user])

    return exec_args

  def _ParseArgsForCommand(self, command, **kwargs):
    if command == 'login':
      return self._ParseLoginArgs(**kwargs)
    elif command == 'create-login-config':
      return self._ParseCreateLoginConfigArgs(**kwargs)
    elif command == 'version':
      return ['version']
    elif command == 'token':
      return self._ParseTokenArgs(**kwargs)
    else:
      raise binary_operations.InvalidOperationForBinary(
          'Invalid Operation [{}] for kubectl-anthos'.format(command))


def _GetClusterConfig(all_configs, cluster):
  found_clusters = all_configs.FindMatchingItem(
      file_parsers.LoginConfigObject.CLUSTER_NAME_KEY, cluster)
  if len(found_clusters) != 1:
    raise AnthosAuthException(
        'Cluster [{}] not found for config path [{}]'.format(
            cluster, all_configs.file_path))
  return found_clusters.pop()


def _Base64EncodeLdap(username, passwd):
  """Base64 Encode Ldap username and password."""
  enc = lambda s: six.ensure_text(base64.b64encode(six.ensure_binary(s)))
  return enc(username), enc(passwd)


def _GetLdapUserAndPass(cluster_config, auth_name, cluster):
  """Prompt User for Ldap Username and Password."""
  ldap_user = None
  ldap_pass = None
  if not cluster_config.IsLdap():
    return None, None
  # do the prompting
  user_message = ('Please enter the ldap user for '
                  '[{}] on cluster [{}]: '.format(auth_name, cluster))
  pass_message = ('Please enter the ldap password for '
                  '[{}] on cluster [{}]: '.format(auth_name, cluster))
  ldap_user = console_io.PromptWithValidator(
      validator=lambda x: len(x) > 1,
      error_message='Error: Invalid username, please try again.',
      prompt_string=user_message)
  ldap_pass = console_io.PromptPassword(
      pass_message, validation_callable=lambda x: len(x) > 1)
  return _Base64EncodeLdap(ldap_user, ldap_pass)


def GetFileOrURL(cluster_config, certificate_file=True):
  """Parses config input to determine whether URL or File logic should execute.

     Determines whether the cluster_config is a file or URL. If it's a URL, it
     then pulls the contents of the file using a GET request. If it's a
     file, then it expands the file path and returns its contents.

  Args:
    cluster_config: str, A file path or URL for the login-config.
    certificate_file: str, Optional file path to the CA certificate to use with
      the GET request to the URL.

  Raises:
    AnthosAuthException: If the data could not be pulled from the URL.

  Returns:
    parsed_config_fileOrURL, config_contents, and is_url
    parsed_config_fileOrURL: str, returns either the URL that was passed or an
      expanded file path if a file was passed.
      config_contents: str, returns the contents of the file or URL.
    is_url: bool, True if the provided cluster_config input was a URL.
  """

  if not cluster_config:
    return None, None, None

  # Handle if input is URL.
  config_url = urllib.parse.urlparse(cluster_config)
  is_url = config_url.scheme == 'http' or config_url.scheme == 'https'
  if is_url:
    response = requests.get(cluster_config, verify=certificate_file or True)
    if response.status_code != requests.codes.ok:
      raise AnthosAuthException('Request to login-config URL failed with'
                                'response code [{}] and text [{}]: '.format(
                                    response.status_code, response.text))
    return cluster_config, response.text, is_url

  # Handle if input is file.
  expanded_config_path = flags.ExpandLocalDirAndVersion(cluster_config)
  contents = files.ReadFileContents(expanded_config_path)
  return expanded_config_path, contents, is_url


def GetPreferredAuthForCluster(cluster,
                               login_config,
                               config_contents=None,
                               force_update=False,
                               is_url=False):
  """Get preferredAuthentication value for cluster."""
  if not (cluster and login_config):
    return None, None, None

  configs = None
  # If URL, then pass contents directly.
  if is_url:
    if not config_contents:
      raise AnthosAuthException(
          'Config contents were not passed with URL [{}]'.format(login_config))
    configs = file_parsers.YamlConfigFile(
        file_contents=config_contents, item_type=file_parsers.LoginConfigObject)
  # If file, pass contents and location for updating.
  else:
    configs = file_parsers.YamlConfigFile(
        file_contents=config_contents,
        file_path=login_config,
        item_type=file_parsers.LoginConfigObject)

  cluster_config = _GetClusterConfig(configs, cluster)

  try:
    auth_method = cluster_config.GetPreferredAuth()
  except KeyError:
    auth_method = None
  except file_parsers.YamlConfigObjectFieldError:
    # gracefully quit for config versions older than v2alpha1 that
    # do not support 'preferredAuthentication' field.
    return None, None, None
  if not auth_method or force_update:
    providers = cluster_config.GetAuthProviders()
    if not providers:
      raise AnthosAuthException(
          'No Authentication Providers found in [{}]'.format(login_config))
    if len(providers) == 1:
      auth_method = providers.pop()
    else:  # do the prompting
      prompt_message = ('Please select your preferred authentication option '
                        'for cluster [{}]'.format(cluster))
      override_warning = ('. Note: This will overwrite current preferred auth '
                          'method [{}] in config file.')
      # Only print override warning in certain cases.
      if auth_method and force_update and not is_url:
        prompt_message = prompt_message + override_warning.format(auth_method)
      index = console_io.PromptChoice(
          providers, message=prompt_message, cancel_option=True)
      auth_method = providers[index]

    log.status.Print(
        'Setting Preferred Authentication option to [{}]'.format(auth_method))
    cluster_config.SetPreferredAuth(auth_method)
    # Only save to disk if file is specified. Don't want URL failure.
    if login_config and not is_url:
      configs.WriteToDisk()

  ldap_user, ldap_pass = _GetLdapUserAndPass(cluster_config, auth_method,
                                             cluster)
  return auth_method, ldap_user, ldap_pass


def LoginResponseHandler(response, list_clusters_only=False):
  """Handle Login Responses."""
  if response.stdout:
    log.status.Print(response.stdout)

  if response.stderr:
    log.status.Print(response.stderr)

  if response.failed:
    log.error(messages.LOGIN_CONFIG_FAILED_MESSAGE.format(response.stderr))
    return None
  if not list_clusters_only:
    log.status.Print(messages.LOGIN_CONFIG_SUCCESS_MESSAGE)
  return response.stdout