HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/396/platform/bq/frontend/bq_cached_client.py
#!/usr/bin/env python
"""BigQuery client that exists for some reason."""

import logging
import sys
import textwrap
from typing import Any, Dict, List, Optional, Type

from absl import app
from absl import flags
import httplib2
import termcolor

import bq_auth_flags
import bq_flags
import bq_utils
import credential_loader
from auth import main_credential_loader
from clients import bigquery_client
from clients import bigquery_client_extended
from clients import wait_printer
from frontend import utils as bq_frontend_utils
from utils import bq_api_utils
from utils import bq_gcloud_utils
from utils import bq_logging


FLAGS = flags.FLAGS




def _GetWaitPrinterFactoryFromFlags() -> Type[wait_printer.WaitPrinter]:
  """Returns the default wait_printer_factory to use while waiting for jobs."""
  if bq_flags.QUIET.value:
    return wait_printer.QuietWaitPrinter
  if bq_flags.HEADLESS.value:
    return wait_printer.TransitionWaitPrinter
  return wait_printer.VerboseWaitPrinter


class Client(object):
  """Class caching bigquery_client.BigqueryClient based on arguments."""

  client_cache = {}

  @staticmethod
  def _CollectArgs(config_logging: bool = True, **kwds) -> Dict[str, Any]:
    """Collect and combine FLAGS and kwds to create BQ Client.

    Args:
      config_logging: if True, set python logging according to --apilog.
      **kwds: keyword arguments for creating BigqueryClient.
    """

    def KwdsOrFlags(name: str):
      return kwds[name] if name in kwds else getattr(FLAGS, name)

    # Note that we need to handle possible initialization tasks
    # for the case of being loaded as a library.
    bq_utils.ProcessBigqueryrc()
    if config_logging:
      bq_logging.ConfigureLogging(bq_flags.APILOG.value)
    # Gcloud config currently gets processed twice.
    bq_gcloud_utils.process_config(flag_values=FLAGS)

    if (
        bq_flags.UNIVERSE_DOMAIN.present
        and not bq_api_utils.is_gdu_universe(bq_flags.UNIVERSE_DOMAIN.value)
        and not bq_auth_flags.USE_GOOGLE_AUTH.value
    ):
      logging.warning(
          'Attempting to use a non-GDU universe domain without setting'
          ' `use_google_auth`. You might need to set `use_google_auth` to True.'
      )

    if bq_flags.HTTPLIB2_DEBUGLEVEL.value:
      httplib2.debuglevel = bq_flags.HTTPLIB2_DEBUGLEVEL.value
      if hasattr(httplib2, 'python3'):
        httplib2.python3.debuglevel = bq_flags.HTTPLIB2_DEBUGLEVEL.value

    client_args = {}
    global_args = (
        'credential_file',
        'job_property',
        'project_id',
        'dataset_id',
        'trace',  # str
        'sync',
        'use_google_auth',
        'api',
        'api_version',
        'quota_project_id',
        'request_reason',
    )
    for name in global_args:
      client_args[name] = KwdsOrFlags(name)

    logging.info(
        'Creating client for BQ CLI version: %s', bq_utils.VERSION_NUMBER
    )
    logging.debug('Global args collected: %s', client_args)

    client_args['wait_printer_factory'] = _GetWaitPrinterFactoryFromFlags()
    if bq_flags.DISCOVERY_FILE.value:
      with open(bq_flags.DISCOVERY_FILE.value) as f:
        client_args['discovery_document'] = f.read()
    client_args['enable_resumable_uploads'] = (
        True
        if bq_flags.ENABLE_RESUMABLE_UPLOADS.value is None
        else bq_flags.ENABLE_RESUMABLE_UPLOADS.value
    )
    if bq_flags.MAX_ROWS_PER_REQUEST.value:
      client_args['max_rows_per_request'] = bq_flags.MAX_ROWS_PER_REQUEST.value
    logging.info('Client args collected: %s', client_args)

    return client_args

  @staticmethod
  def GetCredentials(
      credentials: bigquery_client.LegacyAndGoogleAuthCredentialsUnionType = None,
  ) -> bigquery_client.LegacyAndGoogleAuthCredentialsUnionType:
    """A function to lookup the credentials to use for this BQ CLI invocation.

    Args:
      credentials: bypass the credential lookup and use these instead.

    Returns:
      The credentials ot use for this BQ CLI invocation.
    """
    if credentials is not None:
      logging.info('Credentials passed in directly')
    elif bq_auth_flags.USE_GOOGLE_AUTH.value:
      logging.info('Credentials loaded using Google Auth')
      credentials = main_credential_loader.GetCredentialsFromFlags()
    else:
      logging.info('Credentials loaded using oauth2client')
      credentials = credential_loader.GetCredentialsFromFlags()
    assert credentials is not None
    return credentials

  @staticmethod
  def Create(
      config_logging: bool = True,
      credentials: bigquery_client.LegacyAndGoogleAuthCredentialsUnionType = None,
      **kwds,
  ) -> bigquery_client_extended.BigqueryClientExtended:
    """Build a new BigqueryClient configured from kwds and FLAGS.

    Args:
      config_logging: if True, set python logging according to --apilog.
      credentials: bypass the credential lookup and use these instead.
      **kwds: keyword arguments for creating BigqueryClient.

    Returns:
      A BigqueryClient from the kwds.
    """
    # Resolve flag values first.
    logging.debug('Collecting args before creating a BigqueryClient: %s', kwds)
    client_args = Client._CollectArgs(config_logging, **kwds)

    bigquery_client_factory = Factory.GetBigqueryClientFactory()
    return bigquery_client_factory(
        credentials=Client.GetCredentials(credentials), **client_args
    )


  @classmethod
  def _GetClientCacheKey(cls, **kwds) -> str:
    logging.debug('In Client._GetClientCacheKey: %s', kwds)
    client_args = Client._CollectArgs(**kwds)
    return (
        'client_args={client_args},'
        'service_account_credential_file={service_account_credential_file},'
        'apilog={apilog},'.format(
            client_args=client_args,
            service_account_credential_file=bq_auth_flags.SERVICE_ACCOUNT_CREDENTIAL_FILE.value,
            apilog=bq_flags.APILOG.value,
        )
    )

  @classmethod
  def Get(cls) -> bigquery_client_extended.BigqueryClientExtended:
    """Return a BigqueryClient initialized from flags."""
    cache_key = Client._GetClientCacheKey()
    if cache_key in cls.client_cache:
      logging.info(
          'Using a cached client with previous auth and discovery docs from the'
          ' cache_key: %s',
          cache_key,
      )
    else:
      try:
        cls.client_cache[cache_key] = Client.Create()
        logging.info('Successfully created a new client.')
      except ValueError as e:
        logging.info('Failed to create a new client.')
        # Convert constructor parameter errors into flag usage errors.
        raise app.UsageError(e)

    return cls.client_cache[cache_key]


class Factory:
  """Class encapsulating factory creation of BigqueryClient."""

  _BIGQUERY_CLIENT_FACTORY = None

  class ClientTablePrinter:
    """Class encapsulating factory creation of TablePrinter."""

    _TABLE_PRINTER = None

    @classmethod
    def GetTablePrinter(cls) -> bq_frontend_utils.TablePrinter:
      if cls._TABLE_PRINTER is None:
        cls._TABLE_PRINTER = bq_frontend_utils.TablePrinter()
      return cls._TABLE_PRINTER

  @classmethod
  def GetBigqueryClientFactory(
      cls,
  ) -> Type[bigquery_client_extended.BigqueryClientExtended]:
    if cls._BIGQUERY_CLIENT_FACTORY is None:
      cls._BIGQUERY_CLIENT_FACTORY = (
          bigquery_client_extended.BigqueryClientExtended
      )
    return cls._BIGQUERY_CLIENT_FACTORY