HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/396/platform/bq/clients/utils.py
#!/usr/bin/env python
"""Bigquery Client library for Python."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import abc
import enum
import hashlib
import json
import logging
import os
import random
import re
import sys
import time
from typing import Dict, List, NamedTuple, Optional, Tuple, Union

from absl import flags

from utils import bq_error
from utils import bq_id_utils
from utils import bq_processor_utils

MAX_SUPPORTED_IAM_POLICY_VERSION = 3


class UpdateMode(enum.Enum):
  """Enum for update modes."""

  UPDATE_METADATA = 'UPDATE_METADATA'
  UPDATE_ACL = 'UPDATE_ACL'
  UPDATE_FULL = 'UPDATE_FULL'


def _ParseJobIdentifier(
    identifier: str,
) -> Tuple[Optional[str], Optional[str], Optional[str]]:
  """Parses a job identifier string into its components.

  Args:
    identifier: String specifying the job identifier in the format
      "project_id:job_id", "project_id:location.job_id", or "job_id".

  Returns:
    A tuple of three elements: containing project_id, location,
    job_id. If an element is not found, it is represented by
    None. If no elements are found, the tuple contains three None
    values.
  """
  project_id_pattern = r'[\w:\-.]*[\w:\-]+'
  location_pattern = r'[a-zA-Z\-0-9]+'
  job_id_pattern = r'[\w\-]+'

  pattern = re.compile(
      r"""
    ^((?P<project_id>%(PROJECT_ID)s)
    :)?
    ((?P<location>%(LOCATION)s)
    \.)?
    (?P<job_id>%(JOB_ID)s)
    $
  """
      % {
          'PROJECT_ID': project_id_pattern,
          'LOCATION': location_pattern,
          'JOB_ID': job_id_pattern,
      },
      re.X,
  )

  match = re.search(pattern, identifier)
  if match:
    return (
        match.groupdict().get('project_id', None),
        match.groupdict().get('location', None),
        match.groupdict().get('job_id', None),
    )
  return (None, None, None)


def _ParseReservationIdentifier(
    identifier: str,
) -> Tuple[Optional[str], Optional[str], Optional[str]]:
  """Parses the reservation identifier string into its components.

  Args:
    identifier: String specifying the reservation identifier in the format
      "project_id:reservation_id", "project_id:location.reservation_id", or
      "reservation_id".

  Returns:
    A tuple of three elements: containing project_id, location, and
    reservation_id. If an element is not found, it is represented by None.

  Raises:
    bq_error.BigqueryError: if the identifier could not be parsed.
  """

  pattern = re.compile(
      r"""
  ^((?P<project_id>[\w:\-.]*[\w:\-]+):)?
  ((?P<location>[\w\-]+)\.)?
  (?P<reservation_id>[\w\-]*)$
  """,
      re.X,
  )

  match = re.search(pattern, identifier)
  if not match:
    raise bq_error.BigqueryError(
        'Could not parse reservation identifier: %s' % identifier
    )

  project_id = match.groupdict().get('project_id', None)
  location = match.groupdict().get('location', None)
  reservation_id = match.groupdict().get('reservation_id', None)
  return (project_id, location, reservation_id)


def ParseReservationPath(
    path: str,
) -> Tuple[Optional[str], Optional[str], Optional[str]]:
  """Parses the reservation path string into its components.

  Args:
    path: String specifying the reservation path in the format
      projects/<project_id>/locations/<location>/reservations/<reservation_id>
      or projects/<project_id>/locations/<location>/biReservation

  Returns:
    A tuple of three elements: containing project_id, location and
    reservation_id. If an element is not found, it is represented by None.

  Raises:
    bq_error.BigqueryError: if the path could not be parsed.
  """

  pattern = re.compile(
      r'^projects/(?P<project_id>[\w:\-.]*[\w:\-]+)?'
      + r'/locations/(?P<location>[\w\-]+)?'
      +
      # Accept a suffix of '/reservations/<reservation ID>' or
      # one of '/biReservation'
      r'/(reservations/(?P<reservation_id>[\w\-/]+)'
      + r'|(?P<bi_id>biReservation)'
      + r')$',
      re.X,
  )

  match = re.search(pattern, path)
  if not match:
    raise bq_error.BigqueryError('Could not parse reservation path: %s' % path)

  group = lambda key: match.groupdict().get(key, None)
  project_id = group('project_id')
  location = group('location')
  reservation_id = group('reservation_id') or group('bi_id')
  return (project_id, location, reservation_id)


def _ParseCapacityCommitmentIdentifier(
    identifier: str, allow_commas: Optional[bool]
) -> Tuple[Optional[str], Optional[str], Optional[str]]:
  """Parses the capacity commitment identifier string into its components.

  Args:
    identifier: String specifying the capacity commitment identifier in the
      format "project_id:capacity_commitment_id",
      "project_id:location.capacity_commitment_id", or "capacity_commitment_id".
    allow_commas: whether to allow commas in the capacity commitment id.

  Returns:
    A tuple of three elements: containing project_id, location
    and capacity_commitment_id. If an element is not found, it is represented by
    None.

  Raises:
    bq_error.BigqueryError: if the identifier could not be parsed.
  """
  pattern = None
  if allow_commas:
    pattern = re.compile(
        r"""
    ^((?P<project_id>[\w:\-.]*[\w:\-]+):)?
    ((?P<location>[\w\-]+)\.)?
    (?P<capacity_commitment_id>[\w|,-]*)$
    """,
        re.X,
    )
  else:
    pattern = re.compile(
        r"""
    ^((?P<project_id>[\w:\-.]*[\w:\-]+):)?
    ((?P<location>[\w\-]+)\.)?
    (?P<capacity_commitment_id>[\w|-]*)$
    """,
        re.X,
    )

  match = re.search(pattern, identifier)
  if not match:
    raise bq_error.BigqueryError(
        'Could not parse capacity commitment identifier: %s' % identifier
    )

  project_id = match.groupdict().get('project_id', None)
  location = match.groupdict().get('location', None)
  capacity_commitment_id = match.groupdict().get('capacity_commitment_id', None)
  return (project_id, location, capacity_commitment_id)


def ParseCapacityCommitmentPath(
    path: str,
) -> Tuple[Optional[str], Optional[str], Optional[str]]:
  """Parses the capacity commitment path string into its components.

  Args:
    path: String specifying the capacity commitment path in the format
      projects/<project_id>/locations/<location>/capacityCommitments/<capacity_commitment_id>

  Returns:
    A tuple of three elements: containing project_id, location,
    and capacity_commitment_id. If an element is not found, it is represented by
    None.

  Raises:
    bq_error.BigqueryError: if the path could not be parsed.
  """
  pattern = re.compile(
      r"""
  ^projects\/(?P<project_id>[\w:\-.]*[\w:\-]+)?
  \/locations\/(?P<location>[\w\-]+)?
  \/capacityCommitments\/(?P<capacity_commitment_id>[\w|-]+)$
  """,
      re.X,
  )

  match = re.search(pattern, path)
  if not match:
    raise bq_error.BigqueryError(
        'Could not parse capacity commitment path: %s' % path
    )

  project_id = match.groupdict().get('project_id', None)
  location = match.groupdict().get('location', None)
  capacity_commitment_id = match.groupdict().get('capacity_commitment_id', None)
  return (project_id, location, capacity_commitment_id)


def _ParseReservationAssignmentIdentifier(
    identifier: str,
) -> Tuple[Optional[str], Optional[str], Optional[str], Optional[str]]:
  """Parses the reservation assignment identifier string into its components.

  Args:
    identifier: String specifying the reservation assignment identifier in the
      format "project_id:reservation_id.assignment_id",
      "project_id:location.reservation_id.assignment_id", or
      "reservation_id.assignment_id".

  Returns:
    A tuple of three elements: containing project_id, location, and
    reservation_assignment_id. If an element is not found, it is represented by
    None.

  Raises:
    bq_error.BigqueryError: if the identifier could not be parsed.
  """

  pattern = re.compile(
      r"""
  ^((?P<project_id>[\w:\-.]*[\w:\-]+):)?
  ((?P<location>[\w\-]+)\.)?
  (?P<reservation_id>[\w\-\/]+)\.
  (?P<reservation_assignment_id>[\w\-_]+)$
  """,
      re.X,
  )

  match = re.search(pattern, identifier)
  if not match:
    raise bq_error.BigqueryError(
        'Could not parse reservation assignment identifier: %s' % identifier
    )

  project_id = match.groupdict().get('project_id', None)
  location = match.groupdict().get('location', None)
  reservation_id = match.groupdict().get('reservation_id', None)
  reservation_assignment_id = match.groupdict().get(
      'reservation_assignment_id', None
  )
  return (project_id, location, reservation_id, reservation_assignment_id)


def ParseReservationAssignmentPath(
    path: str,
) -> Tuple[Optional[str], Optional[str], Optional[str], Optional[str]]:
  """Parses the reservation assignment path string into its components.

  Args:
    path: String specifying the reservation assignment path in the format
      projects/<project_id>/locations/<location>/
      reservations/<reservation_id>/assignments/<assignment_id> The
      reservation_id must be that of a top level reservation.

  Returns:
    A tuple of three elements: containing project_id, location and
    reservation_assignment_id. If an element is not found, it is represented by
    None.

  Raises:
    bq_error.BigqueryError: if the path could not be parsed.
  """

  pattern = re.compile(
      r"""
  ^projects\/(?P<project_id>[\w:\-.]*[\w:\-]+)?
  \/locations\/(?P<location>[\w\-]+)?
  \/reservations\/(?P<reservation_id>[\w\-]+)
  \/assignments\/(?P<reservation_assignment_id>[\w\-_]+)$
  """,
      re.X,
  )

  match = re.search(pattern, path)
  if not match:
    raise bq_error.BigqueryError(
        'Could not parse reservation assignment path: %s' % path
    )

  project_id = match.groupdict().get('project_id', None)
  location = match.groupdict().get('location', None)
  reservation_id = match.groupdict().get('reservation_id', None)
  reservation_assignment_id = match.groupdict().get(
      'reservation_assignment_id', None
  )
  return (project_id, location, reservation_id, reservation_assignment_id)


def _ParseReservationGroupIdentifier(
    identifier: str,
) -> Tuple[Optional[str], Optional[str], Optional[str]]:
  """Parses the reservation group identifier string into its components.

  Args:
    identifier: String specifying the reservation group identifier in the format
      "project_id:reservation_group_id",
      "project_id:location.reservation_group_id", or "reservation_group_id".

  Returns:
    A tuple of three elements: containing project_id, location, and
    reservation_group_id. If an element is not found, it is represented by None.

  Raises:
    bq_error.BigqueryError: if the identifier could not be parsed.
  """

  pattern = re.compile(
      r"""
  ^((?P<project_id>[\w:\-.]*[\w:\-]+):)?
  ((?P<location>[\w\-]+)\.)?
  (?P<reservation_group_id>[\w\-]*)$
  """,
      re.X,
  )

  match = re.search(pattern, identifier)
  if not match:
    raise bq_error.BigqueryError(
        'Could not parse reservation group identifier: %s' % identifier
    )

  project_id = match.groupdict().get('project_id', None)
  location = match.groupdict().get('location', None)
  reservation_group_id = match.groupdict().get('reservation_group_id', None)
  return (project_id, location, reservation_group_id)


def ParseReservationGroupPath(
    path: str,
) -> Tuple[Optional[str], Optional[str], Optional[str]]:
  """Parses the reservation group path string into its components.

  Args:
    path: String specifying the reservation group path in the format
      projects/<project_id>/locations/<location>/reservationGroups/<reservation_group_id>

  Returns:
    A tuple of three elements: containing project_id, location and
    reservation_group_id. If an element is not found, it is represented by None.

  Raises:
    bq_error.BigqueryError: if the path could not be parsed.
  """

  pattern = re.compile(
      r"""
  ^projects\/(?P<project_id>[\w:\-.]*[\w:\-]+)?
  \/locations\/(?P<location>[\w\-]+)?
  \/reservationGroups\/(?P<reservation_group_id>[\w|-]+)$
  """,
      re.X,
  )

  match = re.search(pattern, path)
  if not match:
    raise bq_error.BigqueryError(
        'Could not parse reservation group path: %s' % path
    )

  group = lambda key: match.groupdict().get(key, None)
  project_id = group('project_id')
  location = group('location')
  reservation_group_id = group('reservation_group_id')
  return (project_id, location, reservation_group_id)


def _ParseConnectionIdentifier(
    identifier: str,
) -> Tuple[Optional[str], Optional[str], Optional[str]]:
  """Parses the connection identifier string into its components.

  Args:
    identifier: String specifying the connection identifier in the format
      "connection_id", "location.connection_id",
      "project_id.location.connection_id"

  Returns:
    A tuple of four elements: containing project_id, location, connection_id
    If an element is not found, it is represented by None.

  Raises:
    bq_error.BigqueryError: if the identifier could not be parsed.
  """

  if not identifier:
    raise bq_error.BigqueryError('Empty connection identifier')

  tokens = identifier.split('.')
  num_tokens = len(tokens)
  if num_tokens > 4:
    raise bq_error.BigqueryError(
        'Could not parse connection identifier: %s' % identifier
    )
  connection_id = tokens[num_tokens - 1]
  location = tokens[num_tokens - 2] if num_tokens > 1 else None
  project_id = '.'.join(tokens[: num_tokens - 2]) if num_tokens > 2 else None

  return (project_id, location, connection_id)


def ParseConnectionPath(
    path: str,
) -> Tuple[Optional[str], Optional[str], Optional[str]]:
  """Parses the connection path string into its components.

  Args:
    path: String specifying the connection path in the format
      projects/<project_id>/locations/<location>/connections/<connection_id>

  Returns:
    A tuple of three elements: containing project_id, location and
    connection_id. If an element is not found, it is represented by None.

  Raises:
    bq_error.BigqueryError: if the path could not be parsed.
  """

  pattern = re.compile(
      r"""
  ^projects\/(?P<project_id>[\w:\-.]*[\w:\-]+)?
  \/locations\/(?P<location>[\w\-]+)?
  \/connections\/(?P<connection_id>[\w\-\/]+)$
  """,
      re.X,
  )

  match = re.search(pattern, path)
  if not match:
    raise bq_error.BigqueryError('Could not parse connection path: %s' % path)

  project_id = match.groupdict().get('project_id', None)
  location = match.groupdict().get('location', None)
  connection_id = match.groupdict().get('connection_id', None)
  return (project_id, location, connection_id)


def ReadTableConstrants(table_constraints: str):
  """Create table constraints json object from string or a file name.

  Args:
    table_constraints: Either a json string that presents a table_constraints
      proto or name of a file that contains the json string.

  Returns:
    The table_constraints (as a json object).

  Raises:
    bq_error.BigqueryTableConstraintsError: If load the table constraints
      from the string or file failed.
  """
  if not table_constraints:
    raise bq_error.BigqueryTableConstraintsError(
        'table_constraints cannot be empty'
    )
  if os.path.exists(table_constraints):
    with open(table_constraints) as f:
      try:
        loaded_json = json.load(f)
      except ValueError as e:
        raise bq_error.BigqueryTableConstraintsError(
            'Error decoding JSON table constraints from file %s.'
            % (table_constraints,)
        ) from e
    return loaded_json
  if re.search(r'^[./~\\]', table_constraints) is not None:
    # The table_constraints looks like a file name but the file does not
    # exist.
    raise bq_error.BigqueryTableConstraintsError(
        'Error reading table constraints: "%s" looks like a filename, '
        'but was not found.' % (table_constraints,)
    )
  try:
    loaded_json = json.loads(table_constraints)
  except ValueError as e:
    raise bq_error.BigqueryTableConstraintsError(
        'Error decoding JSON table constraints from string %s.'
        % (table_constraints,)
    ) from e
  return loaded_json


def RaiseErrorFromHttpError(e):
  """Raises a BigQueryError given an HttpError."""
  if e.resp.get('content-type', '').startswith('application/json'):
    content = json.loads(e.content.decode('utf-8'))
    RaiseError(content)
  else:
    # If the HttpError is not a json object, it is a communication error.
    error_details = ''
    if flags.FLAGS.use_regional_endpoints:
      error_details = ' The specified regional endpoint may not be supported.'
    raise bq_error.BigqueryCommunicationError(
        'Could not connect with BigQuery server.%s\n'
        'Http response status: %s\n'
        'Http response content:\n%s'
        % (error_details, e.resp.get('status', '(unexpected)'), e.content)
    )


def RaiseErrorFromNonHttpError(e):
  """Raises a BigQueryError given a non-HttpError."""
  raise bq_error.BigqueryCommunicationError(
      'Could not connect with BigQuery server due to: %r' % (e,)
  )


class JobIdGenerator(abc.ABC):
  """Base class for job id generators."""

  def __init__(self):
    pass

  @abc.abstractmethod
  def Generate(self, job_configuration):
    """Generates a job_id to use for job_configuration."""


class JobIdGeneratorNone(JobIdGenerator):
  """Job id generator that returns None, letting the server pick the job id."""

  def Generate(self, job_configuration):
    return None


class JobIdGeneratorRandom(JobIdGenerator):
  """Generates random job id_fallbacks."""

  def Generate(self, job_configuration):
    return 'bqjob_r%08x_%016x' % (
        random.SystemRandom().randint(0, sys.maxsize),
        int(time.time() * 1000),
    )


class JobIdGeneratorFingerprint(JobIdGenerator):
  """Generates job ids that uniquely match the job config."""

  def _HashableRepr(self, obj):
    if isinstance(obj, bytes):
      return obj
    return str(obj).encode('utf-8')

  def _Hash(self, config, sha1):
    """Computes the sha1 hash of a dict."""
    keys = list(config.keys())
    # Python dict enumeration ordering is random. Sort the keys
    # so that we will visit them in a stable order.
    keys.sort()
    for key in keys:
      sha1.update(self._HashableRepr(key))
      v = config[key]
      if isinstance(v, dict):
        logging.info('Hashing: %s...', key)
        self._Hash(v, sha1)
      elif isinstance(v, list):
        logging.info('Hashing: %s ...', key)
        for inner_v in v:
          self._Hash(inner_v, sha1)
      else:
        logging.info('Hashing: %s:%s', key, v)
        sha1.update(self._HashableRepr(v))

  def Generate(self, job_configuration):
    s1 = hashlib.sha1()
    self._Hash(job_configuration, s1)
    job_id = 'bqjob_c%s' % (s1.hexdigest(),)
    logging.info('Fingerprinting: %s:\n%s', job_configuration, job_id)
    return job_id


class JobIdGeneratorIncrementing(JobIdGenerator):
  """Generates job ids that increment each time we're asked."""

  def __init__(self, inner):
    super().__init__()
    self._inner = inner
    self._retry = 0

  def Generate(self, job_configuration):
    self._retry += 1
    return '%s_%d' % (self._inner.Generate(job_configuration), self._retry)


def NormalizeWait(wait):
  try:
    return int(wait)
  except ValueError:
    raise ValueError('Invalid value for wait: %s' % (wait,))


def _ParseDatasetIdentifier(identifier: str) -> Tuple[str, str]:
  # We need to parse plx datasets separately.
  if identifier.startswith('plx.google:'):
    return 'plx.google', identifier[len('plx.google:') :]
  else:
    project_id, _, dataset_id = identifier.rpartition(':')
    return project_id, dataset_id


def _ShiftInformationSchema(dataset_id: str, table_id: str) -> Tuple[str, str]:
  """Moves "INFORMATION_SCHEMA" to table_id for dataset qualified tables."""
  if not dataset_id or not table_id:
    return dataset_id, table_id

  dataset_parts = dataset_id.split('.')
  if dataset_parts[-1] != 'INFORMATION_SCHEMA' or table_id in (
      'SCHEMATA',
      'SCHEMATA_OPTIONS',
  ):
    # We don't shift unless INFORMATION_SCHEMA is present and table_id is for
    # a dataset qualified table.
    return dataset_id, table_id

  return '.'.join(dataset_parts[:-1]), 'INFORMATION_SCHEMA.' + table_id


def _ParseIdentifier(identifier: str) -> Tuple[str, str, str]:
  """Parses identifier into a tuple of (possibly empty) identifiers.

  This will parse the identifier into a tuple of the form
  (project_id, dataset_id, table_id) without doing any validation on
  the resulting names; missing names are returned as ''. The
  interpretation of these identifiers depends on the context of the
  caller. For example, if you know the identifier must be a job_id,
  then you can assume dataset_id is the job_id.

  Args:
    identifier: string, identifier to parse

  Returns:
    project_id, dataset_id, table_id: (string, string, string)
  """
  # We need to handle the case of a lone project identifier of the
  # form domain.com:proj separately.
  if re.search(r'^\w[\w.]*\.[\w.]+:\w[\w\d_-]*:?$', identifier):
    return identifier, '', ''
  project_id, _, dataset_and_table_id = identifier.rpartition(':')

  if '.' in dataset_and_table_id:
    dataset_id, _, table_id = dataset_and_table_id.rpartition('.')
  elif project_id:
    # Identifier was a project : <something without dots>.
    # We must have a dataset id because there was a project
    dataset_id = dataset_and_table_id
    table_id = ''
  else:
    # Identifier was just a bare id with no dots or colons.
    # Return this as a table_id.
    dataset_id = ''
    table_id = dataset_and_table_id

  dataset_id, table_id = _ShiftInformationSchema(dataset_id, table_id)

  return project_id, dataset_id, table_id


def GetProjectReference(
    id_fallbacks: NamedTuple(
        'IDS',
        [
            ('project_id', Optional[str]),
        ],
    ),
    identifier: str = '',
) -> bq_id_utils.ApiClientHelper.ProjectReference:
  """Determine a project reference from an identifier or fallback."""
  project_id, dataset_id, table_id = _ParseIdentifier(identifier)
  try:
    # ParseIdentifier('foo') is just a table_id, but we want to read
    # it as a project_id.
    project_id = project_id or table_id or id_fallbacks.project_id
    if not dataset_id and project_id:
      return bq_id_utils.ApiClientHelper.ProjectReference.Create(
          projectId=project_id
      )
  except ValueError:
    pass
  if project_id == '':
    raise bq_error.BigqueryClientError('Please provide a project ID.')
  else:
    raise bq_error.BigqueryClientError(
        'Cannot determine project described by %s' % (identifier,)
    )


def GetDatasetReference(
    id_fallbacks: NamedTuple(
        'IDS',
        [
            ('project_id', Optional[str]),
            ('dataset_id', Optional[str]),
        ],
    ),
    identifier: str = '',
) -> bq_id_utils.ApiClientHelper.DatasetReference:
  """Determine a DatasetReference from an identifier or fallback."""
  identifier = id_fallbacks.dataset_id if not identifier else identifier
  project_id, dataset_id, table_id = _ParseIdentifier(identifier)
  if table_id and not project_id and not dataset_id:
    # identifier is 'foo'
    project_id = id_fallbacks.project_id
    dataset_id = table_id
  elif project_id and dataset_id and table_id:
    # Identifier was foo::bar.baz.qux.
    dataset_id = dataset_id + '.' + table_id
  elif project_id and dataset_id and not table_id:
    # identifier is 'foo:bar'
    pass
  else:
    raise bq_error.BigqueryError(
        'Cannot determine dataset described by %s' % (identifier,)
    )

  try:
    return bq_id_utils.ApiClientHelper.DatasetReference.Create(
        projectId=project_id, datasetId=dataset_id
    )
  except ValueError as e:
    raise bq_error.BigqueryError(
        'Cannot determine dataset described by %s' % (identifier,)
    ) from e


def GetTableReference(
    id_fallbacks: NamedTuple(
        'IDS',
        [
            ('project_id', Optional[str]),
            ('dataset_id', Optional[str]),
        ],
    ),
    identifier: str = '',
    default_dataset_id: str = '',
) -> bq_id_utils.ApiClientHelper.TableReference:
  """Determine a TableReference from an identifier and fallbacks."""
  project_id, dataset_id, table_id = _ParseIdentifier(identifier)
  if not dataset_id:
    project_id, dataset_id = _ParseDatasetIdentifier(id_fallbacks.dataset_id)
  if default_dataset_id and not dataset_id:
    dataset_id = default_dataset_id
  try:
    return bq_id_utils.ApiClientHelper.TableReference.Create(
        projectId=project_id or id_fallbacks.project_id,
        datasetId=dataset_id,
        tableId=table_id,
    )
  except ValueError as e:
    raise bq_error.BigqueryError(
        'Cannot determine table described by %s' % (identifier,)
    ) from e


def GetRowAccessPolicyReference(
    id_fallbacks: NamedTuple(
        'IDS',
        [
            ('project_id', Optional[str]),
            ('dataset_id', Optional[str]),
        ],
    ),
    table_identifier: str = '',
    policy_id: str = '',
) -> bq_id_utils.ApiClientHelper.RowAccessPolicyReference:
  """Determine a RowAccessPolicyReference from an identifier and fallbacks."""
  try:
    table_reference = GetTableReference(id_fallbacks, table_identifier)
    return bq_id_utils.ApiClientHelper.RowAccessPolicyReference.Create(
        projectId=table_reference.projectId,
        datasetId=table_reference.datasetId,
        tableId=table_reference.tableId,
        policyId=policy_id,
    )
  except ValueError as e:
    raise bq_error.BigqueryError(
        'Cannot determine row access policy described by %s and %s'
        % (table_identifier, policy_id)
    ) from e


def GetModelReference(
    id_fallbacks: NamedTuple(
        'IDS',
        [
            ('project_id', Optional[str]),
            ('dataset_id', Optional[str]),
        ],
    ),
    identifier: str = '',
) -> bq_id_utils.ApiClientHelper.ModelReference:
  """Returns a ModelReference from an identifier."""
  project_id, dataset_id, table_id = _ParseIdentifier(identifier)
  if not dataset_id:
    project_id, dataset_id = _ParseDatasetIdentifier(id_fallbacks.dataset_id)
  try:
    return bq_id_utils.ApiClientHelper.ModelReference.Create(
        projectId=project_id or id_fallbacks.project_id,
        datasetId=dataset_id,
        modelId=table_id,
    )
  except ValueError as e:
    raise bq_error.BigqueryError(
        'Cannot determine model described by %s' % identifier
    ) from e


def GetRoutineReference(
    id_fallbacks: NamedTuple(
        'IDS',
        [
            ('project_id', Optional[str]),
            ('dataset_id', Optional[str]),
        ],
    ),
    identifier: str = '',
) -> bq_id_utils.ApiClientHelper.RoutineReference:
  """Returns a RoutineReference from an identifier."""
  project_id, dataset_id, table_id = _ParseIdentifier(identifier)
  if not dataset_id:
    project_id, dataset_id = _ParseDatasetIdentifier(id_fallbacks.dataset_id)
  try:
    return bq_id_utils.ApiClientHelper.RoutineReference.Create(
        projectId=project_id or id_fallbacks.project_id,
        datasetId=dataset_id,
        routineId=table_id,
    )
  except ValueError as e:
    raise bq_error.BigqueryError(
        'Cannot determine routine described by %s' % identifier
    ) from e


def GetQueryDefaultDataset(identifier: str) -> Dict[str, str]:
  parsed_project_id, parsed_dataset_id = _ParseDatasetIdentifier(identifier)
  result = dict(datasetId=parsed_dataset_id)
  if parsed_project_id:
    result['projectId'] = parsed_project_id
  return result


def GetReference(
    id_fallbacks: NamedTuple(
        'IDS',
        [
            ('project_id', Optional[str]),
            ('dataset_id', Optional[str]),
        ],
    ),
    identifier: str = '',
) -> Union[
    bq_id_utils.ApiClientHelper.TableReference,
    bq_id_utils.ApiClientHelper.DatasetReference,
    bq_id_utils.ApiClientHelper.ProjectReference,
]:
  """Try to deduce a project/dataset/table reference from a string.

  If the identifier is not compound, treat it as the most specific
  identifier we don't have as a flag, or as the table_id. If it is
  compound, fill in any unspecified part.

  Args:
    id_fallbacks: The IDs cached on BigqueryClient.
    identifier: string, Identifier to create a reference for.

  Returns:
    A valid ProjectReference, DatasetReference, or TableReference.

  Raises:
    bq_error.BigqueryError: if no valid reference can be determined.
  """
  try:
    return GetTableReference(id_fallbacks, identifier)
  except bq_error.BigqueryError:
    pass
  try:
    return GetDatasetReference(id_fallbacks, identifier)
  except bq_error.BigqueryError:
    pass
  try:
    return GetProjectReference(id_fallbacks, identifier)
  except bq_error.BigqueryError:
    pass
  raise bq_error.BigqueryError(
      'Cannot determine reference for "%s"' % (identifier,)
  )


def GetJobReference(
    id_fallbacks: NamedTuple(
        'IDS',
        [
            ('project_id', Optional[str]),
        ],
    ),
    identifier: str = '',
    default_location: Optional[str] = None,
) -> bq_id_utils.ApiClientHelper.JobReference:
  """Determine a JobReference from an identifier, location, and fallbacks."""
  project_id, location, job_id = _ParseJobIdentifier(identifier)
  if not project_id:
    project_id = id_fallbacks.project_id
  if not location:
    location = default_location
  if job_id:
    try:
      return bq_id_utils.ApiClientHelper.JobReference.Create(
          projectId=project_id, jobId=job_id, location=location
      )
    except ValueError:
      pass
  raise bq_error.BigqueryError(
      'Cannot determine job described by %s' % (identifier,)
  )


def GetReservationReference(
    id_fallbacks: NamedTuple(
        'IDS',
        [
            ('project_id', Optional[str]),
            ('api_version', Optional[str]),
        ],
    ),
    identifier: Optional[str] = None,
    default_location: Optional[str] = None,
    default_reservation_id: Optional[str] = None,
    check_reservation_project: bool = True,
) -> bq_id_utils.ApiClientHelper.ReservationReference:
  """Determine a ReservationReference from an identifier and location."""
  project_id, location, reservation_id = _ParseReservationIdentifier(
      identifier=identifier
  )
  # For MoveAssignment rpc, reservation reference project can be different
  # from the project_id_fallback. We'll skip this check in this case.
  if (
      check_reservation_project
      and project_id
      and id_fallbacks.project_id
      and project_id != id_fallbacks.project_id
  ):
    raise bq_error.BigqueryError(
        "Specified project '%s' should be the same as the project of the "
        "reservation '%s'." % (id_fallbacks.project_id, project_id)
    )
  project_id = project_id or id_fallbacks.project_id
  if not project_id:
    raise bq_error.BigqueryError('Project id not specified.')
  location = location or default_location
  if not location:
    raise bq_error.BigqueryError('Location not specified.')
  if default_location and location.lower() != default_location.lower():
    raise bq_error.BigqueryError(
        "Specified location '%s' should be the same as the location of the "
        "reservation '%s'." % (default_location, location)
    )
  reservation_id = reservation_id or default_reservation_id
  if not reservation_id:
    raise bq_error.BigqueryError('Reservation name not specified.')
  else:
    return bq_id_utils.ApiClientHelper.ReservationReference(
        projectId=project_id, location=location, reservationId=reservation_id
    )


def GetBiReservationReference(
    id_fallbacks: NamedTuple(
        'IDS',
        [
            ('project_id', Optional[str]),
        ],
    ),
    default_location: Optional[str] = None,
) -> bq_id_utils.ApiClientHelper.BiReservationReference:
  """Determine a ReservationReference from an identifier and location."""
  project_id = id_fallbacks.project_id
  if not project_id:
    raise bq_error.BigqueryError('Project id not specified.')
  location = default_location
  if not location:
    raise bq_error.BigqueryError('Location not specified.')
  return bq_id_utils.ApiClientHelper.BiReservationReference.Create(
      projectId=project_id, location=location
  )


def GetCapacityCommitmentReference(
    id_fallbacks: NamedTuple(
        'IDS',
        [
            ('project_id', Optional[str]),
        ],
    ),
    identifier: Optional[str] = None,
    path: Optional[str] = None,
    default_location: Optional[str] = None,
    default_capacity_commitment_id: Optional[str] = None,
    allow_commas: Optional[bool] = None,
) -> bq_id_utils.ApiClientHelper.CapacityCommitmentReference:
  """Determine a CapacityCommitmentReference from an identifier and location."""
  if identifier is not None:
    project_id, location, capacity_commitment_id = (
        _ParseCapacityCommitmentIdentifier(identifier, allow_commas)
    )
  elif path is not None:
    project_id, location, capacity_commitment_id = ParseCapacityCommitmentPath(
        path
    )
  else:
    raise bq_error.BigqueryError('Either identifier or path must be specified.')
  project_id = project_id or id_fallbacks.project_id
  if not project_id:
    raise bq_error.BigqueryError('Project id not specified.')
  location = location or default_location
  if not location:
    raise bq_error.BigqueryError('Location not specified.')
  capacity_commitment_id = (
      capacity_commitment_id or default_capacity_commitment_id
  )
  if not capacity_commitment_id:
    raise bq_error.BigqueryError('Capacity commitment id not specified.')

  return bq_id_utils.ApiClientHelper.CapacityCommitmentReference.Create(
      projectId=project_id,
      location=location,
      capacityCommitmentId=capacity_commitment_id,
  )


def GetReservationAssignmentReference(
    id_fallbacks: NamedTuple(
        'IDS',
        [
            ('project_id', Optional[str]),
            ('api_version', Optional[str]),
        ],
    ),
    identifier: Optional[str] = None,
    path: Optional[str] = None,
    default_location: Optional[str] = None,
    default_reservation_id: Optional[str] = None,
    default_reservation_assignment_id: Optional[str] = None,
) -> bq_id_utils.ApiClientHelper.ReservationAssignmentReference:
  """Determine a ReservationAssignmentReference from inputs."""
  if identifier is not None:
    (project_id, location, reservation_id, reservation_assignment_id) = (
        _ParseReservationAssignmentIdentifier(identifier)
    )
  elif path is not None:
    (project_id, location, reservation_id, reservation_assignment_id) = (
        ParseReservationAssignmentPath(path)
    )
  else:
    raise bq_error.BigqueryError('Either identifier or path must be specified.')
  project_id = project_id or id_fallbacks.project_id
  if not project_id:
    raise bq_error.BigqueryError('Project id not specified.')
  location = location or default_location
  if not location:
    raise bq_error.BigqueryError('Location not specified.')
  reservation_id = reservation_id or default_reservation_id
  reservation_assignment_id = (
      reservation_assignment_id or default_reservation_assignment_id
  )
  return bq_id_utils.ApiClientHelper.ReservationAssignmentReference.Create(
      projectId=project_id,
      location=location,
      reservationId=reservation_id,
      reservationAssignmentId=reservation_assignment_id,
  )


def GetReservationGroupReference(
    id_fallbacks: NamedTuple(
        'IDS',
        [
            ('project_id', Optional[str]),
        ],
    ),
    identifier: Optional[str] = None,
    path: Optional[str] = None,
    default_location: Optional[str] = None,
    default_reservation_group_id: Optional[str] = None,
    check_reservation_group_project: bool = True,
) -> bq_id_utils.ApiClientHelper.ReservationGroupReference:
  """Determines a ReservationGroupReference from inputs."""
  if identifier is not None:
    (project_id, location, reservation_group_id) = (
        _ParseReservationGroupIdentifier(identifier)
    )
  elif path is not None:
    (project_id, location, reservation_group_id) = ParseReservationGroupPath(
        path
    )
  else:
    raise bq_error.BigqueryError('Either identifier or path must be specified.')

  if (
      check_reservation_group_project
      and project_id
      and id_fallbacks.project_id
      and project_id != id_fallbacks.project_id
  ):
    raise bq_error.BigqueryError(
        "Specified project '%s' should be the same as the project of the "
        "reservation group '%s'." % (id_fallbacks.project_id, project_id)
    )
  project_id = project_id or id_fallbacks.project_id
  if not project_id:
    raise bq_error.BigqueryError('Project id not specified.')

  location = location or default_location
  if not location:
    raise bq_error.BigqueryError('Location not specified.')
  if default_location and location.lower() != default_location.lower():
    raise bq_error.BigqueryError(
        "Specified location '%s' should be the same as the location of the "
        "reservation group '%s'." % (default_location, location)
    )

  reservation_group_id = reservation_group_id or default_reservation_group_id
  if not reservation_group_id:
    raise bq_error.BigqueryError('Reservation group id not specified.')
  else:
    return bq_id_utils.ApiClientHelper.ReservationGroupReference(
        projectId=project_id,
        location=location,
        reservationGroupId=reservation_group_id,
    )


def GetConnectionReference(
    id_fallbacks: NamedTuple(
        'IDS',
        [
            ('project_id', Optional[str]),
            ('api_version', Optional[str]),
        ],
    ),
    identifier: Optional[str] = None,
    path: Optional[str] = None,
    default_location: Optional[str] = None,
    default_connection_id: Optional[str] = None,
) -> bq_id_utils.ApiClientHelper.ConnectionReference:
  """Determine a ConnectionReference from an identifier and location."""
  if identifier is not None:
    (project_id, location, connection_id) = _ParseConnectionIdentifier(
        identifier
    )
  elif path is not None:
    (project_id, location, connection_id) = ParseConnectionPath(path)
  project_id = project_id or id_fallbacks.project_id
  if not project_id:
    raise bq_error.BigqueryError('Project id not specified.')
  location = location or default_location
  if not location:
    raise bq_error.BigqueryError('Location not specified.')
  connection_id = connection_id or default_connection_id
  if not connection_id:
    raise bq_error.BigqueryError('Connection name not specified.')
  return bq_id_utils.ApiClientHelper.ConnectionReference.Create(
      projectId=project_id, location=location, connectionId=connection_id
  )


def RaiseError(result):
  """Raises an appropriate BigQuery error given the json error result."""
  error = result.get('error', {}).get('errors', [{}])[0]
  raise bq_error.CreateBigqueryError(error, result, [])


def IsFailedJob(job):
  """Predicate to determine whether or not a job failed."""
  return 'errorResult' in job.get('status', {})


def RaiseIfJobError(job):
  """Raises a BigQueryError if the job is in an error state.

  Args:
    job: a Job resource.

  Returns:
    job, if it is not in an error state.

  Raises:
    bq_error.BigqueryError: A bq_error.BigqueryError instance
      based on the job's error description.
  """
  if IsFailedJob(job):
    error = job['status']['errorResult']
    error_ls = job['status'].get('errors', [])
    raise bq_error.CreateBigqueryError(
        error,
        error,
        error_ls,
        job_ref=str(bq_processor_utils.ConstructObjectReference(job)),
        session_id=bq_processor_utils.GetSessionId(job),
    )
  return job


def ReadSchema(schema: str) -> List[str]:
  """Create a schema from a string or a filename.

  If schema does not contain ':' and is the name of an existing
  file, read it as a JSON schema. If not, it must be a
  comma-separated list of fields in the form name:type.

  Args:
    schema: A filename or schema.

  Returns:
    The new schema (as a dict).

  Raises:
    bq_error.BigquerySchemaError: If the schema is invalid or the
      filename does not exist.
  """

  if schema.startswith(bq_processor_utils.GCS_SCHEME_PREFIX):
    raise bq_error.BigquerySchemaError('Cannot load schema files from GCS.')

  def NewField(entry):
    name, _, field_type = entry.partition(':')
    if entry.count(':') > 1 or not name.strip():
      raise bq_error.BigquerySchemaError('Invalid schema entry: %s' % (entry,))
    return {
        'name': name.strip(),
        'type': field_type.strip().upper() or 'STRING',
    }

  if not schema:
    raise bq_error.BigquerySchemaError('Schema cannot be empty')
  elif os.path.exists(schema):
    with open(schema) as f:
      try:
        loaded_json = json.load(f)
      except ValueError as e:
        raise bq_error.BigquerySchemaError(
            (
                'Error decoding JSON schema from file %s: %s\n'
                'To specify a one-column schema, use "name:string".'
            )
            % (schema, e)
        )
    if not isinstance(loaded_json, list):
      raise bq_error.BigquerySchemaError(
          'Error in "%s": Table schemas must be specified as JSON lists.'
          % schema
      )
    return loaded_json
  elif re.match(r'[./\\]', schema) is not None:
    # We have something that looks like a filename, but we didn't
    # find it. Tell the user about the problem now, rather than wait
    # for a round-trip to the server.
    raise bq_error.BigquerySchemaError(
        'Error reading schema: "%s" looks like a filename, but was not found.'
        % (schema,)
    )
  else:
    return [NewField(entry) for entry in schema.split(',')]  # pytype: disable=bad-return-type


def NormalizeProjectReference(
    id_fallbacks: NamedTuple(
        'IDS',
        [
            ('project_id', Optional[str]),
        ],
    ),
    reference: bq_id_utils.ApiClientHelper.ProjectReference,
) -> bq_id_utils.ApiClientHelper.ProjectReference:
  if reference is None:
    try:
      return GetProjectReference(id_fallbacks=id_fallbacks)
    except bq_error.BigqueryClientError as e:
      raise bq_error.BigqueryClientError(
          'Project reference or a default project is required'
      ) from e
  return reference