HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/396/platform/bq/clients/table_reader.py
#!/usr/bin/env python
"""The different TableReader options for the BQ CLI."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

from typing import Optional

from googleapiclient import discovery

from utils import bq_error
from utils import bq_id_utils


class _TableReader:
  """Base class that defines the TableReader interface.

  _TableReaders provide a way to read paginated rows and schemas from a table.
  """

  def ReadRows(
      self,
      start_row: Optional[int] = 0,
      max_rows: Optional[int] = None,
      selected_fields: Optional[str] = None,
  ):
    """Read at most max_rows rows from a table.

    Args:
      start_row: first row to return.
      max_rows: maximum number of rows to return.
      selected_fields: a subset of fields to return.

    Raises:
      BigqueryInterfaceError: when bigquery returns something unexpected.

    Returns:
      list of rows, each of which is a list of field values.
    """
    (_, rows) = self.ReadSchemaAndRows(
        start_row=start_row,
        max_rows=max_rows,
        selected_fields=selected_fields,
    )
    return rows

  def ReadSchemaAndRows(
      self,
      start_row: Optional[int],
      max_rows: Optional[int],
      selected_fields: Optional[str] = None,
  ):
    """Read at most max_rows rows from a table and the schema.

    Args:
      start_row: first row to read.
      max_rows: maximum number of rows to return.
      selected_fields: a subset of fields to return.

    Raises:
      BigqueryInterfaceError: when bigquery returns something unexpected.
      ValueError: when start_row is None.
      ValueError: when max_rows is None.

    Returns:
      A tuple where the first item is the list of fields and the
      second item a list of rows.
    """
    if start_row is None:
      raise ValueError('start_row is required')
    if max_rows is None:
      raise ValueError('max_rows is required')
    page_token = None
    rows = []
    schema = {}
    while len(rows) < max_rows:
      rows_to_read = max_rows - len(rows)
      if not hasattr(self, 'max_rows_per_request'):
        raise NotImplementedError(
            'Subclass must have max_rows_per_request instance variable'
        )
      if self.max_rows_per_request:
        rows_to_read = min(self.max_rows_per_request, rows_to_read)
      (more_rows, page_token, current_schema) = self._ReadOnePage(
          None if page_token else start_row,
          max_rows=rows_to_read,
          page_token=page_token,
          selected_fields=selected_fields,
      )
      if not schema and current_schema:
        schema = current_schema.get('fields', [])
      for row in more_rows:
        rows.append(self._ConvertFromFV(schema, row))
        start_row += 1
      if not page_token or not more_rows:
        break
    return (schema, rows)

  def _ConvertFromFV(self, schema, row):
    """Converts from FV format to possibly nested lists of values."""
    if not row:
      return None
    values = [entry.get('v', '') for entry in row.get('f', [])]
    result = []
    for field, v in zip(schema, values):
      if 'type' not in field:
        raise bq_error.BigqueryCommunicationError(
            'Invalid response: missing type property'
        )
      if field['type'].upper() == 'RECORD':
        # Nested field.
        subfields = field.get('fields', [])
        if field.get('mode', 'NULLABLE').upper() == 'REPEATED':
          # Repeated and nested. Convert the array of v's of FV's.
          result.append([
              self._ConvertFromFV(subfields, subvalue.get('v', ''))
              for subvalue in v
          ])
        else:
          # Nested non-repeated field. Convert the nested f from FV.
          result.append(self._ConvertFromFV(subfields, v))
      elif field.get('mode', 'NULLABLE').upper() == 'REPEATED':
        # Repeated but not nested: an array of v's.
        result.append([subvalue.get('v', '') for subvalue in v])
      else:
        # Normal flat field.
        result.append(v)
    return result

  def __str__(self) -> str:
    return self._GetPrintContext()

  def __repr__(self) -> str:
    return self._GetPrintContext()

  def _GetPrintContext(self) -> str:
    """Returns context for what is being read."""
    raise NotImplementedError('Subclass must implement GetPrintContext')

  def _ReadOnePage(
      self,
      start_row: Optional[int],
      max_rows: Optional[int],
      page_token: Optional[str] = None,
      selected_fields: Optional[str] = None,
  ):
    """Read one page of data, up to max_rows rows.

    Assumes that the table is ready for reading. Will signal an error otherwise.

    Args:
      start_row: first row to read.
      max_rows: maximum number of rows to return.
      page_token: Optional. current page token.
      selected_fields: a subset of field to return.

    Returns:
      tuple of:
      rows: the actual rows of the table, in f,v format.
      page_token: the page token of the next page of results.
      schema: the schema of the table.
    """
    raise NotImplementedError('Subclass must implement _ReadOnePage')


class TableTableReader(_TableReader):
  """A TableReader that reads from a table."""

  def __init__(
      self,
      local_apiclient: discovery.Resource,
      max_rows_per_request: int,
      table_ref: bq_id_utils.ApiClientHelper.TableReference,
  ):
    self.table_ref = table_ref
    self.max_rows_per_request = max_rows_per_request
    self._apiclient = local_apiclient

  def _GetPrintContext(self) -> str:
    return '%r' % (self.table_ref,)

  def _ReadOnePage(
      self,
      start_row: Optional[int],
      max_rows: Optional[int],
      page_token: Optional[str] = None,
      selected_fields: Optional[str] = None,
  ):
    kwds = dict(self.table_ref)
    kwds['maxResults'] = max_rows
    if page_token:
      kwds['pageToken'] = page_token
    else:
      kwds['startIndex'] = start_row
    data = None
    if selected_fields is not None:
      kwds['selectedFields'] = selected_fields
    if data is None:
      data = self._apiclient.tabledata().list(**kwds).execute()
    page_token = data.get('pageToken', None)
    rows = data.get('rows', [])

    kwds = dict(self.table_ref)
    if selected_fields is not None:
      kwds['selectedFields'] = selected_fields
    table_info = self._apiclient.tables().get(**kwds).execute()
    schema = table_info.get('schema', {})

    return (rows, page_token, schema)


class JobTableReader(_TableReader):
  """A TableReader that reads from a completed job."""

  def __init__(
      self,
      local_apiclient: discovery.Resource,
      max_rows_per_request: int,
      job_ref: bq_id_utils.ApiClientHelper.JobReference,
  ):
    self.job_ref = job_ref
    self.max_rows_per_request = max_rows_per_request
    self._apiclient = local_apiclient

  def _GetPrintContext(self) -> str:
    return '%r' % (self.job_ref,)

  def _ReadOnePage(
      self,
      start_row: Optional[int],
      max_rows: Optional[int],
      page_token: Optional[str] = None,
      selected_fields: Optional[str] = None,
  ):
    kwds = dict(self.job_ref)
    kwds['maxResults'] = max_rows
    # Sets the timeout to 0 because we assume the table is already ready.
    kwds['timeoutMs'] = 0
    if page_token:
      kwds['pageToken'] = page_token
    else:
      kwds['startIndex'] = start_row
    data = self._apiclient.jobs().getQueryResults(**kwds).execute()
    if not data['jobComplete']:
      raise bq_error.BigqueryError('Job %s is not done' % (self,))
    page_token = data.get('pageToken', None)
    schema = data.get('schema', None)
    rows = data.get('rows', [])
    return (rows, page_token, schema)


class QueryTableReader(_TableReader):
  """A TableReader that reads from a completed query."""

  def __init__(
      self,
      local_apiclient: discovery.Resource,
      max_rows_per_request: int,
      job_ref: bq_id_utils.ApiClientHelper.JobReference,
      results,
  ):
    self.job_ref = job_ref
    self.max_rows_per_request = max_rows_per_request
    self._apiclient = local_apiclient
    self._results = results

  def _GetPrintContext(self) -> str:
    return '%r' % (self.job_ref,)

  def _ReadOnePage(
      self,
      start_row: Optional[int],
      max_rows: Optional[int],
      page_token: Optional[str] = None,
      selected_fields: Optional[str] = None,
  ):
    kwds = dict(self.job_ref) if self.job_ref else {}
    kwds['maxResults'] = max_rows
    # Sets the timeout to 0 because we assume the table is already ready.
    kwds['timeoutMs'] = 0
    if page_token:
      kwds['pageToken'] = page_token
    else:
      kwds['startIndex'] = start_row
    if not self._results['jobComplete']:
      raise bq_error.BigqueryError('Job %s is not done' % (self,))
    # DDL and DML statements return no rows, just delegate them to
    # getQueryResults.
    result_rows = self._results.get('rows', None)
    total_rows = self._results.get('totalRows', None)
    job_reference = self._results.get('jobReference', None)
    if job_reference is None and (
        total_rows is not None and int(total_rows) == 0
    ):
      # Handle the case when jobs.query requests with JOB_CREATION_OPTIONAL
      # return empty results. This will avoid a call to getQueryResults.
      schema = self._results.get('schema', None)
      rows = self._results.get('rows', [])
      page_token = None
    elif (
        total_rows is not None
        and result_rows is not None
        and start_row is not None
        and len(result_rows) >= min(int(total_rows), start_row + max_rows)
    ):
      page_token = self._results.get('pageToken', None)
      if len(result_rows) < int(total_rows) and page_token is None:
        raise bq_error.BigqueryError(
            'Synchronous query %s did not return all rows, yet it did not'
            ' return a page token' % (self,)
        )
      schema = self._results.get('schema', None)
      rows = self._results.get('rows', [])
    else:
      data = self._apiclient.jobs().getQueryResults(**kwds).execute()
      if not data['jobComplete']:
        raise bq_error.BigqueryError('Job %s is not done' % (self,))
      page_token = data.get('pageToken', None)
      schema = data.get('schema', None)
      rows = data.get('rows', [])
    return (rows, page_token, schema)