HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/394/platform/bq/clients/bigquery_client.py
#!/usr/bin/env python
"""BigqueryClient class."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import enum
from http import client as http_client_lib
import json
import logging
import tempfile
import time
import traceback
from typing import Callable, List, Optional, Union
import urllib

# To configure apiclient logging.
from absl import flags
import certifi
import googleapiclient
from googleapiclient import discovery
import httplib2
from typing_extensions import TypeAlias


import bq_flags
import bq_utils
import credential_loader
from auth import main_credential_loader
from clients import bigquery_http
from clients import utils as bq_client_utils
from clients import wait_printer
from discovery_documents import discovery_document_cache
from discovery_documents import discovery_document_loader
from utils import bq_api_utils
from utils import bq_error
from utils import bq_logging

# TODO(b/388312723): Review if we can remove this try/except block.
try:
  from google.auth import credentials as google_credentials  # pylint: disable=g-import-not-at-top

  _HAS_GOOGLE_AUTH = True
except ImportError:
  _HAS_GOOGLE_AUTH = False

# TODO(b/388312723): Review if we can remove this try/except block.
try:
  import google_auth_httplib2  # pylint: disable=g-import-not-at-top

  _HAS_GOOGLE_AUTH_HTTPLIB2 = True
except ImportError:
  _HAS_GOOGLE_AUTH_HTTPLIB2 = False

# A unique non-None default, for use in kwargs that need to
# distinguish default from None.
_DEFAULT = object()

LegacyAndGoogleAuthCredentialsUnionType = Union[
    main_credential_loader.GoogleAuthCredentialsUnionType,
    credential_loader.CredentialsFromFlagsUnionType,
]

Service = bq_api_utils.Service

Http: TypeAlias = Union[
    httplib2.Http,
]

AuthorizedHttp: TypeAlias = Union[
    httplib2.Http,
    'google_auth_httplib2.AuthorizedHttp',
]


class BigqueryClient:
  """Class encapsulating interaction with the BigQuery service."""

  class JobCreationMode(str, enum.Enum):
    """Enum of job creation mode."""

    JOB_CREATION_REQUIRED = 'JOB_CREATION_REQUIRED'
    JOB_CREATION_OPTIONAL = 'JOB_CREATION_OPTIONAL'

  def __init__(
      self,
      *,
      api: str,
      api_version: str,
      project_id: Optional[str] = '',
      dataset_id: Optional[str] = '',
      discovery_document: Union[bytes, object, None] = _DEFAULT,
      job_property: str = '',
      trace: Optional[str] = None,
      sync: bool = True,
      wait_printer_factory: Optional[
          Callable[[], wait_printer.WaitPrinter]
      ] = wait_printer.TransitionWaitPrinter,
      job_id_generator: bq_client_utils.JobIdGenerator = bq_client_utils.JobIdGeneratorIncrementing(
          bq_client_utils.JobIdGeneratorRandom()
      ),
      max_rows_per_request: Optional[int] = None,
      quota_project_id: Optional[str] = None,
      use_google_auth: bool = False,
      credentials: Optional[LegacyAndGoogleAuthCredentialsUnionType] = None,
      enable_resumable_uploads: bool = True,
      **kwds,
  ):
    """Initializes BigqueryClient.

    Required keywords:
      api: the api to connect to, for example "bigquery".
      api_version: the version of the api to connect to, for example "v2".

    Optional keywords:
      project_id: a default project id to use. While not required for
        initialization, a project_id is required when calling any
        method that creates a job on the server. Methods that have
        this requirement pass through **kwds, and will raise
        bq_error.BigqueryClientConfigurationError if no project_id can be
        found.
      dataset_id: a default dataset id to use.
      discovery_document: the discovery document to use. If None, one
        will be retrieved from the discovery api. If not specified,
        the built-in discovery document will be used.
      job_property: a list of "key=value" strings defining properties
        to apply to all job operations.
      trace: a tracing header to include in all bigquery api requests.
      sync: boolean, when inserting jobs, whether to wait for them to
        complete before returning from the insert request.
      wait_printer_factory: a function that returns a WaitPrinter.
        This will be called for each job that we wait on. See WaitJob().

    Raises:
      ValueError: if keywords are missing or incorrectly specified.
    """
    super().__init__()
    self.api = api
    self.api_version = api_version
    self.project_id = project_id
    self.dataset_id = dataset_id
    self.discovery_document = discovery_document
    self.job_property = job_property
    self.trace = trace
    self.sync = sync
    self.wait_printer_factory = wait_printer_factory
    self.job_id_generator = job_id_generator
    self.max_rows_per_request = max_rows_per_request
    self.quota_project_id = quota_project_id
    self.use_google_auth = use_google_auth
    self.credentials = credentials
    self.enable_resumable_uploads = enable_resumable_uploads
    # TODO(b/324243535): Delete this block to make attributes explicit.
    for key, value in kwds.items():
      setattr(self, key, value)
    self._apiclient = None
    self._routines_apiclient = None
    self._row_access_policies_apiclient = None
    self._op_transfer_client = None
    self._op_reservation_client = None
    self._op_bi_reservation_client = None
    self._models_apiclient = None
    self._op_connection_service_client = None
    self._iam_policy_apiclient = None
    default_flag_values = {
        'iam_policy_discovery_document': _DEFAULT,
    }
    for flagname, default in default_flag_values.items():
      if not hasattr(self, flagname):
        setattr(self, flagname, default)

  columns_to_include_for_transfer_run = [
      'updateTime',
      'schedule',
      'runTime',
      'scheduleTime',
      'params',
      'endTime',
      'dataSourceId',
      'destinationDatasetId',
      'state',
      'startTime',
      'name',
  ]

  # These columns appear to be empty with scheduling a new transfer run
  # so there are listed as excluded from the transfer run output.
  columns_excluded_for_make_transfer_run = ['schedule', 'endTime', 'startTime']

  def GetHttp(
      self,
  ) -> AuthorizedHttp:
    """Returns the httplib2 Http to use."""

    proxy_info = httplib2.proxy_info_from_environment
    if flags.FLAGS.proxy_address and flags.FLAGS.proxy_port:
      try:
        port = int(flags.FLAGS.proxy_port)
      except ValueError:
        raise ValueError(
            'Invalid value for proxy_port: {}'.format(flags.FLAGS.proxy_port)
        )
      proxy_info = httplib2.ProxyInfo(
          proxy_type=3,
          proxy_host=flags.FLAGS.proxy_address,
          proxy_port=port,
          proxy_user=flags.FLAGS.proxy_username or None,
          proxy_pass=flags.FLAGS.proxy_password or None,
      )

    http = httplib2.Http(
        proxy_info=proxy_info,
        ca_certs=flags.FLAGS.ca_certificates_file or certifi.where(),
        disable_ssl_certificate_validation=flags.FLAGS.disable_ssl_validation,
    )

    if hasattr(http, 'redirect_codes'):
      http.redirect_codes = set(http.redirect_codes) - {308}

    if flags.FLAGS.mtls:
      _, self._cert_file = tempfile.mkstemp()
      _, self._key_file = tempfile.mkstemp()
      discovery.add_mtls_creds(
          http, discovery.get_client_options(), self._cert_file, self._key_file
      )
    return http

  def GetDiscoveryUrl(
      self,
      service: Service,
      api_version: str,
      domain_root: Optional[str] = None,
      labels: Optional[str] = None,
  ) -> str:
    """Returns the url to the discovery document for bigquery."""
    discovery_url = None  # pylint:disable=unused-variable
    if not discovery_url:
      discovery_url = bq_api_utils.get_discovery_url_from_root_url(
          domain_root
          or bq_api_utils.get_tpc_root_url_from_flags(
              service=service, inputted_flags=bq_flags
          ),
          api_version=api_version,
      )
    return discovery_url

  def GetAuthorizedHttp(
      self,
      credentials: LegacyAndGoogleAuthCredentialsUnionType,
      http: Http,
  ) -> AuthorizedHttp:
    """Returns an http client that is authorized with the given credentials."""

    if self.use_google_auth:
      if not _HAS_GOOGLE_AUTH:
        logging.error(
            'System is set to use `google.auth`, but it did not load.'
        )
      if not isinstance(credentials, google_credentials.Credentials):
        logging.error(
            'The system is using `google.auth` but the parsed credentials are'
            ' of an incorrect type.'
        )
    else:
      logging.debug('System is set to not use `google.auth`.')

    # LINT.IfChange(http_authorization)
    if _HAS_GOOGLE_AUTH and isinstance(
        credentials, google_credentials.Credentials
    ):
      if not _HAS_GOOGLE_AUTH_HTTPLIB2:
        raise ValueError(
            'Credentials from google.auth specified, but '
            'google-api-python-client is unable to use these credentials '
            'unless google-auth-httplib2 is installed. Please install '
            'google-auth-httplib2.'
        )
      return google_auth_httplib2.AuthorizedHttp(credentials, http=http)
    # Note: This block simplified adding typing and should be removable when
    # legacy credentials are removed.
    if hasattr(credentials, 'authorize'):
      return credentials.authorize(http)
    else:
      raise TypeError('Unsupported credential type: {type(credentials)}')
    # LINT.ThenChange(
    #     //depot/google3/cloud/helix/testing/e2e/python_api_client/api_client_lib.py:http_authorization,
    #     //depot/google3/cloud/helix/testing/e2e/python_api_client/api_client_util.py:http_authorization,
    # )


  def _LoadDiscoveryDocumentLocal(
      self,
      service: Service,
      discovery_url: Optional[str],
      api_version: str,
  ) -> Optional[Union[str, bytes, object]]:
    """Loads the local discovery document for the given service.

    Args:
      service: The BigQuery service being used.
      discovery_url: The URL to load the discovery doc from.
      api_version: The API version for the targeted discovery doc.

    Returns:
      discovery_document The loaded discovery document.
    """

    discovery_document = None
    if self.discovery_document != _DEFAULT:
      discovery_document = self.discovery_document
      logging.info(
          'Skipping local "%s" discovery document load since discovery_document'
          ' has a value: %s',
          service,
          discovery_document,
      )
    elif discovery_url is not None:
      logging.info(
          'Skipping the local "%s" discovery document load since discovery_url'
          ' has a value',
          service,
      )
    elif bq_flags.BIGQUERY_DISCOVERY_API_KEY_FLAG.present:
      logging.info(
          'Skipping local "%s" discovery document load since the'
          ' bigquery_discovery_api_key flag was used',
          service,
      )
    else:
      # Load the local api description if one exists and is supported.
      try:
        discovery_document = (
            discovery_document_loader.load_local_discovery_doc_from_service(
                service=service,
                api=self.api,
                api_version=api_version,
            )
        )
        if discovery_document:
          logging.info('The "%s" discovery doc is already loaded', service)
      except FileNotFoundError as e:
        logging.warning(
            'Failed to load the "%s" discovery doc from local files: %s',
            service,
            e,
        )
    return discovery_document


  def _LoadDiscoveryDocumentUrl(
      self,
      service: Service,
      http: AuthorizedHttp,
      discovery_url: str,
  ) -> Optional[Union[str, bytes, object]]:
    """Loads the discovery document from the provided URL.

    Args:
      service: The BigQuery service being used.
      http: Http object to be used to execute request.
      discovery_url: The URL to load the discovery doc from.

    Returns:
      discovery_document The loaded discovery document.

    Raises:
      bq_error.BigqueryClientError: If the request to load the discovery
      document fails.
    """

    discovery_document = None
    # Attempt to retrieve discovery doc with retry logic for transient,
    # retry-able errors.
    max_retries = 3
    iterations = 0
    headers = (
        {'X-ESF-Use-Cloud-UberMint-If-Enabled': '1'}
        if hasattr(self, 'use_uber_mint') and self.use_uber_mint
        else None
    )
    while iterations < max_retries and discovery_document is None:
      if iterations > 0:
        # Wait briefly before retrying with exponentially increasing wait.
        time.sleep(2**iterations)
      iterations += 1
      try:
        logging.info(
            'Requesting "%s" discovery document from %s',
            service,
            discovery_url,
        )
        if headers:
          response_metadata, discovery_document = http.request(
              discovery_url, headers=headers
          )
        else:
          response_metadata, discovery_document = http.request(discovery_url)
        discovery_document = discovery_document.decode('utf-8')
        if int(response_metadata.get('status')) >= 400:
          msg = 'Got %s response from discovery url: %s' % (
              response_metadata.get('status'),
              discovery_url,
          )
          logging.error('%s:\n%s', msg, discovery_document)
          raise bq_error.BigqueryCommunicationError(msg)
      except (
          httplib2.HttpLib2Error,
          googleapiclient.errors.HttpError,
          http_client_lib.HTTPException,
      ) as e:
        # We can't find the specified server. This can be thrown for
        # multiple reasons, so inspect the error.
        if hasattr(e, 'content'):
          if iterations == max_retries:
            content = ''
            if hasattr(e, 'content'):
              content = e.content
            raise bq_error.BigqueryCommunicationError(
                'Cannot contact server. Please try again.\nError: %r'
                '\nContent: %s' % (e, content)
            )
        else:
          if iterations == max_retries:
            raise bq_error.BigqueryCommunicationError(
                'Cannot contact server. Please try again.\nTraceback: %s'
                % (traceback.format_exc(),)
            )
      except IOError as e:
        if iterations == max_retries:
          raise bq_error.BigqueryCommunicationError(
              'Cannot contact server. Please try again.\nError: %r' % (e,)
          )
      except googleapiclient.errors.UnknownApiNameOrVersion as e:
        # We can't resolve the discovery url for the given server.
        # Don't retry in this case.
        raise bq_error.BigqueryCommunicationError(
            'Invalid API name or version: %s' % (str(e),)
        )
    return discovery_document

  def BuildApiClient(
      self,
      service: Service,
      discovery_url: Optional[str] = None,
      discovery_root_url: Optional[str] = None,
      api_version: Optional[str] = None,
      domain_root: Optional[str] = None,
      labels: Optional[str] = None,
  ) -> discovery.Resource:
    """Build and return BigQuery Dynamic client from discovery document."""
    logging.info(
        'BuildApiClient discovery_url: %s, discovery_root_url: %s',
        discovery_url,
        discovery_root_url,
    )
    if api_version is None:
      api_version = self.api_version
    # If self.credentials is of type google.auth, it has to be cleared of the
    # _quota_project_id value later on in this function for discovery requests.
    # bigquery_model has to be built with the quota project retained, so in this
    # version of the implementation, it's built before discovery requests take
    # place.
    bigquery_model = bigquery_http.BigqueryModel(
        trace=self.trace,
        quota_project_id=bq_utils.GetEffectiveQuotaProjectIDForHTTPHeader(
            quota_project_id=self.quota_project_id,
            project_id=self.project_id,
            use_google_auth=self.use_google_auth,
            credentials=self.credentials,
        ),
    )
    bq_request_builder = bigquery_http.BigqueryHttp.Factory(
        bigquery_model,
        self.use_google_auth,
    )
    # Clean up quota project ID from Google Auth credentials.
    # This is specifically needed to construct a http object used for discovery
    # requests below as quota project ID shouldn't participate in discovery
    # document retrieval, otherwise the discovery request would result in a
    # permission error seen in b/321286043.
    if self.use_google_auth and hasattr(self.credentials, '_quota_project_id'):
      self.credentials._quota_project_id = None  # pylint: disable=protected-access
    http_client = self.GetHttp()
    http = self.GetAuthorizedHttp(self.credentials, http_client)

    discovery_document = None
    # First, trying to load the discovery document from the local package.
    if discovery_document is None:
      discovery_document = self._LoadDiscoveryDocumentLocal(
          service=service,
          discovery_url=discovery_url,
          api_version=api_version,
      )

    # If document was not loaded from the local package and
    # discovery_url is not provided, we will generate the url to fetch from the
    # server.
    discovery_url_not_provided = discovery_url is None
    if discovery_document is None and discovery_url is None:
      discovery_url = self.GetDiscoveryUrl(
          service=service,
          api_version=api_version,
          domain_root=domain_root,
          labels=labels,
      )


    # If discovery_document is still not loaded, fetch it from the server.
    if not discovery_document:
      discovery_document = self._LoadDiscoveryDocumentUrl(
          service=service,
          http=http,
          discovery_url=discovery_url,
      )

    discovery_document_to_build_client = self.OverrideEndpoint(
        discovery_document=discovery_document, service=service
    )

    bq_logging.SaveStringToLogDirectoryIfAvailable(
        file_prefix='discovery_document',
        content=discovery_document_to_build_client,
        apilog=bq_flags.APILOG.value,
    )

    try:
      # If the underlying credentials object used for authentication is of type
      # google.auth, its quota project ID will have been removed earlier in this
      # function if one was provided explicitly. This specific http object
      # created from that modified credentials object must be the one used for
      # the discovery requests, otherwise they would result in a permission
      # error as seen in b/321286043.
      built_client = discovery.build_from_document(
          discovery_document_to_build_client,
          http=http,
          model=bigquery_model,
          requestBuilder=bq_request_builder,
      )
    except Exception:
      logging.error(
          'Error building from the "%s" discovery document: %s',
          service,
          discovery_document,
      )
      raise


    return built_client

  @property
  def apiclient(self) -> discovery.Resource:
    """Returns a singleton ApiClient built for the BigQuery core API."""
    if self._apiclient:
      logging.info('Using the cached BigQuery API client')
    else:
      self._apiclient = self.BuildApiClient(service=Service.BIGQUERY)
    return self._apiclient

  def GetModelsApiClient(self) -> discovery.Resource:
    """Returns the apiclient attached to self."""
    if self._models_apiclient is None:
      self._models_apiclient = self.BuildApiClient(service=Service.BIGQUERY)
    return self._models_apiclient

  def GetRoutinesApiClient(self) -> discovery.Resource:
    """Return the apiclient attached to self."""
    if self._routines_apiclient is None:
      self._routines_apiclient = self.BuildApiClient(service=Service.BIGQUERY)
    return self._routines_apiclient

  def GetRowAccessPoliciesApiClient(self) -> discovery.Resource:
    """Return the apiclient attached to self."""
    if self._row_access_policies_apiclient is None:
      self._row_access_policies_apiclient = self.BuildApiClient(
          service=Service.BIGQUERY
      )
    return self._row_access_policies_apiclient

  def GetIAMPolicyApiClient(self) -> discovery.Resource:
    """Return the apiclient attached to self."""
    if self._iam_policy_apiclient is None:
      self._iam_policy_apiclient = self.BuildApiClient(
          service=Service.BQ_IAM,
      )
    return self._iam_policy_apiclient

  def GetInsertApiClient(self) -> discovery.Resource:
    """Return the apiclient that supports insert operation."""
    insert_client = self.apiclient
    return insert_client

  def GetTransferV1ApiClient(
      self, transferserver_address: Optional[str] = None
  ) -> discovery.Resource:
    """Return the apiclient that supports Transfer v1 operation."""
    logging.info(
        'GetTransferV1ApiClient transferserver_address: %s',
        transferserver_address,
    )

    if self._op_transfer_client:
      logging.info('Using the cached Transfer API client')
    else:
      path = transferserver_address or bq_api_utils.get_tpc_root_url_from_flags(
          service=Service.DTS, inputted_flags=bq_flags
      )
      self._op_transfer_client = self.BuildApiClient(
          domain_root=path,
          api_version='v1',
          service=Service.DTS,
      )
    return self._op_transfer_client

  def GetReservationApiClient(
      self, reservationserver_address: Optional[str] = None
  ) -> discovery.Resource:
    """Return the apiclient that supports reservation operations."""
    if self._op_reservation_client:
      logging.info('Using the cached Reservations API client')
    else:
      path = (
          reservationserver_address
          or bq_api_utils.get_tpc_root_url_from_flags(
              service=Service.RESERVATIONS,
              inputted_flags=bq_flags,
          )
      )
      reservation_version = 'v1'
      labels = None
      self._op_reservation_client = self.BuildApiClient(
          service=Service.RESERVATIONS,
          domain_root=path,
          api_version=reservation_version,
          labels=labels,
      )
    return self._op_reservation_client

  def GetConnectionV1ApiClient(
      self, connection_service_address: Optional[str] = None
  ) -> discovery.Resource:
    """Return the apiclient that supports connections operations."""
    if self._op_connection_service_client:
      logging.info('Using the cached Connections API client')
    else:
      path = (
          connection_service_address
          or bq_api_utils.get_tpc_root_url_from_flags(
              service=Service.CONNECTIONS,
              inputted_flags=bq_flags,
          )
      )
      discovery_url = bq_api_utils.get_discovery_url_from_root_url(
          path, api_version='v1'
      )
      discovery_url = bq_api_utils.add_api_key_to_discovery_url(
          discovery_url=discovery_url,
          universe_domain=bq_flags.UNIVERSE_DOMAIN.value,
          inputted_flags=bq_flags,
      )
      self._op_connection_service_client = self.BuildApiClient(
          discovery_url=discovery_url,
          service=Service.CONNECTIONS,
      )
    return self._op_connection_service_client

  def OverrideEndpoint(
      self, discovery_document: Union[str, bytes], service: Service
  ) -> Optional[str]:
    """Override rootUrl for regional endpoints.

    Args:
      discovery_document: BigQuery discovery document.
      service: The BigQuery service being used.

    Returns:
      discovery_document updated discovery document.

    Raises:
      bq_error.BigqueryClientError: if location is not set and
        use_regional_endpoints is.
    """
    if discovery_document is None:
      return discovery_document

    discovery_document = bq_api_utils.parse_discovery_doc(discovery_document)

    logging.info(
        'Discovery doc routing values being considered for updates: rootUrl:'
        ' (%s), basePath: (%s), baseUrl: (%s)',
        discovery_document['rootUrl'],
        discovery_document['basePath'],
        discovery_document['baseUrl'],
    )

    is_prod = True

    if is_prod:
      discovery_document['rootUrl'] = bq_api_utils.get_tpc_root_url_from_flags(
          service=service, inputted_flags=bq_flags
      )


    discovery_document['baseUrl'] = urllib.parse.urljoin(
        discovery_document['rootUrl'], discovery_document['servicePath']
    )

    logging.info(
        'Discovery doc routing values post updates: rootUrl: (%s), basePath:'
        ' (%s), baseUrl: (%s)',
        discovery_document['rootUrl'],
        discovery_document['basePath'],
        discovery_document['baseUrl'],
    )

    return json.dumps(discovery_document)