HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/396/platform/bq/frontend/utils.py
#!/usr/bin/env python
"""Python script for interacting with BigQuery."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import collections
import datetime
import functools
import json
import logging
import os
import re
import sys
from typing import Any, Dict, List, NamedTuple, Optional, Tuple, Type

from absl import app
from absl import flags
import yaml

import table_formatter
import bq_utils
from clients import utils as bq_client_utils
from frontend import utils_flags
from frontend import utils_formatting
from utils import bq_consts
from utils import bq_error
from utils import bq_id_utils
from pyglib import stringutil

# pylint: disable=g-multiple-import
if sys.version_info < (3, 11):
  from typing_extensions import TypedDict, NotRequired  # pylint: disable=g-import-not-at-top
else:
  from typing import TypedDict, NotRequired  # pylint: disable=g-import-not-at-top
# pylint: enable=g-multiple-import

FLAGS = flags.FLAGS

PARQUET_LIST_INFERENCE_DESCRIPTION = (
    'Use schema inference specifically for Parquet LIST logical type.\n It'
    ' checks whether the LIST node is in the standard form as documented in:\n'
    ' https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists\n'
    '  <optional | required> group <name> (LIST) {\n    repeated group list {\n'
    '      <optional | required> <element-type> element;\n    }\n  }\n Returns'
    ' the "element" node in list_element_node. The corresponding field for the'
    ' LIST node in the converted schema is treated as if the node has the'
    ' following schema:\n repeated <element-type> <name>\n This means nodes'
    ' "list" and "element" are omitted.\n\n Otherwise, the LIST node must be in'
    ' one of the forms described by the backward-compatibility rules as'
    ' documented in:\n'
    ' https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules\n'
    ' <optional | required> group <name> (LIST) {\n   repeated <element-type>'
    ' <element-name>\n }\n Returns the <element-name> node in'
    ' list_element_node. The corresponding field for the LIST node in the'
    ' converted schema is treated as if the node has the following schema:\n'
    ' repeated <element-type> <name>\n This means the element node is omitted.'
)

CONNECTION_ID_PATTERN = re.compile(r'[\w-]+')
_RANGE_PATTERN = re.compile(r'^\[(\S+.+\S+), (\S+.+\S+)\)$')

_PARAMETERS_KEY = 'parameters'
_DEFAULT_STORAGE_LOCATION_URI_KEY = 'defaultStorageLocationUri'
_STORAGE_DESCRIPTOR_KEY = 'storageDescriptor'
_CONNECTION_ID_KEY = 'connectionId'

_DELIMITER_MAP = {
    'tab': '\t',
    '\\t': '\t',
}
_DDL_OPERATION_MAP = {
    'SKIP': 'Skipped',
    'CREATE': 'Created',
    'REPLACE': 'Replaced',
    'ALTER': 'Altered',
    'DROP': 'Dropped',
}


def ValidateGlobalFlags():
  """Validate combinations of global flag values."""
  if FLAGS.service_account and FLAGS.use_gce_service_account:
    raise app.UsageError(
        'Cannot specify both --service_account and --use_gce_service_account.'
    )


def ValidateAtMostOneSelected(*args: Any) -> bool:
  """Validates that at most one of the argument flags is selected.

  Args:
    *args: Each flag to be tested parsed in as a separate arg.

  Returns:
    True if more than 1 flag was selected, False if 1 or 0 were selected.
  """
  count = 0
  for arg in args:
    if arg:
      count += 1
  return count > 1


def ValidateAtMostOneSelectedAllowsDefault(*args: Any) -> bool:
  """Validates that at most one of the argument flags is selected.

    if the arg exists but the value is the default value,
    then it won't be counted. This is uself when users want to clear the
    value while setting another value. For example, 'update --arg1=0 --arg2=100'
    when arg1 and arg2 shouldn't coexist.

  Args:
    *args: Each flag to be tested parsed in as a separate arg.

  Returns:
    True if more than 1 flag was selected, False if 1 or 0 were selected.
  """
  count = 0
  for arg in args:
    if arg and type(arg)() != arg:
      count += 1
  return count > 1


def ProcessSource(description: str, source: str) -> Tuple[Any, Any]:
  """Process "source" parameter used for bq update and bq mk command.

  Args:
    description: Description of the dataset.
    source: source file path attached by "--source" parameter.

  Returns:
    new description if the source file updates the description, otherwise return
    the original description.
    acl if the source file updates the acl, otherwise return None.
  """
  acl = None
  if source is None:
    return (description, acl)
  if not os.path.exists(source):
    raise app.UsageError('Source file not found: %s' % (source,))
  if not os.path.isfile(source):
    raise app.UsageError('Source path is not a file: %s' % (source,))
  with open(source) as f:
    try:
      payload = json.load(f)
      if 'description' in payload:
        description = payload['description']
        logging.debug(
            'Both source file and description flag exist, using the value in'
            ' the source file.'
        )
      if 'access' in payload:
        acl = payload['access']
    except ValueError as e:
      raise app.UsageError(
          'Error decoding JSON schema from file %s: %s' % (source, e)
      )

  return (description, acl)


def PrintDryRunInfo(job):
  """Prints the dry run info."""
  if FLAGS.format in ['prettyjson', 'json']:
    bq_utils.PrintFormattedJsonObject(job)
    return
  # TODO: b/422281423 - Revert this check once server returns this field for
  # all dry run queries again.
  if 'totalBytesProcessed' not in job['statistics']['query']:
    print(
        'Query successfully validated. No information about number of bytes'
        ' processed. To see the full details of the job, run this query with'
        ' `--format=json` or `--format=prettyjson`.'
    )
    return
  num_bytes = job['statistics']['query']['totalBytesProcessed']
  num_bytes_accuracy = job['statistics']['query'].get(
      'totalBytesProcessedAccuracy', 'PRECISE'
  )
  if FLAGS.format == 'csv':
    print(num_bytes)
  else:
    if job['statistics']['query'].get('statementType', '') == 'LOAD_DATA':
      print(
          'Query successfully validated. Assuming the files are not modified, '
          'running this query will process %s files loading %s bytes of data.'
          % (
              job['statistics']['query']['loadQueryStatistics']['inputFiles'],
              job['statistics']['query']['loadQueryStatistics'][
                  'inputFileBytes'
              ],
          )
      )
    elif num_bytes_accuracy == 'PRECISE':
      print(
          'Query successfully validated. Assuming the tables are not modified, '
          'running this query will process %s bytes of data.' % (num_bytes,)
      )
    elif num_bytes_accuracy == 'LOWER_BOUND':
      print(
          'Query successfully validated. Assuming the tables are not modified, '
          'running this query will process lower bound of %s bytes of data.'
          % (num_bytes,)
      )
    elif num_bytes_accuracy == 'UPPER_BOUND':
      print(
          'Query successfully validated. Assuming the tables are not modified, '
          'running this query will process upper bound of %s bytes of data.'
          % (num_bytes,)
      )
    else:
      if job['statistics']['query']['statementType'] == 'CREATE_MODEL':
        print(
            'Query successfully validated. The number of bytes that will '
            'be processed by this query cannot be calculated automatically. '
            'More information about this can be seen in '
            'https://cloud.google.com/bigquery-ml/pricing#dry_run'
        )
      else:
        print(
            'Query successfully validated. Assuming the tables are not '
            'modified, running this query will process %s of data and the '
            'accuracy is unknown because of federated tables or clustered '
            'tables.' % (num_bytes,)
        )


def RawInput(message: str) -> str:
  try:
    return input(message)
  except EOFError:
    if sys.stdin.isatty():
      print('\nGot EOF; exiting.')
    else:
      print('\nGot EOF; exiting. Is your input from a terminal?')
    sys.exit(1)


def PromptWithDefault(message: str) -> str:
  """Prompts user with message, return key pressed or '' on enter."""
  if FLAGS.headless:
    print('Running --headless, accepting default for prompt: %s' % (message,))
    return ''
  return RawInput(message).lower()


def PromptYN(message: str) -> Optional[str]:
  """Prompts user with message, returning the key 'y', 'n', or '' on enter."""
  response = None
  while response not in ['y', 'n', '']:
    response = PromptWithDefault(message)
  return response


def NormalizeFieldDelimiter(field_delimiter: str) -> str:
  """Validates and returns the correct field_delimiter."""
  # The only non-string delimiter we allow is None, which represents
  # no field delimiter specified by the user.
  if field_delimiter is None:
    return field_delimiter

  # Allow TAB and \\t substitution.
  key = field_delimiter.lower()
  return _DELIMITER_MAP.get(key, field_delimiter)


def ValidateHivePartitioningOptions(hive_partitioning_mode):
  """Validates the string provided is one the API accepts.

  Should not receive None as an input, since that will fail the comparison.
  Args:
    hive_partitioning_mode: String representing which hive partitioning mode is
      requested.  Only 'AUTO' and 'STRINGS' are supported.
  """
  if hive_partitioning_mode not in ['AUTO', 'STRINGS', 'CUSTOM']:
    raise app.UsageError(
        'Only the following hive partitioning modes are supported: "AUTO", '
        '"STRINGS" and "CUSTOM"'
    )


def ParseLabels(labels: List[str]) -> Dict[str, str]:
  """Parses a list of user-supplied strings representing labels.

  Args:
    labels: A list of user-supplied strings representing labels.  It is expected
      to be in the format "key:value".

  Returns:
    A dict mapping label keys to label values.

  Raises:
    UsageError: Incorrect label arguments were supplied.
  """
  labels_dict = {}
  for key_value in labels:
    k, _, v = key_value.partition(':')
    k = k.strip()
    if k in labels_dict:
      raise app.UsageError('Cannot specify label key "%s" multiple times' % k)
    if k.strip():
      labels_dict[k.strip()] = v.strip()
  return labels_dict


def IsRangeBoundaryUnbounded(value: str) -> bool:
  return value.upper() == 'UNBOUNDED' or value.upper() == 'NULL'


def ParseRangeString(value: str) -> Optional[Tuple[str, str]]:
  match = _RANGE_PATTERN.match(value)
  if not match:
    return None
  start, end = match.groups()
  return start, end


class TablePrinter(object):
  """Base class for printing a table, with a default implementation."""

  def __init__(self, **kwds):
    super(TablePrinter, self).__init__()
    # Most extended classes will require state.
    for key, value in kwds.items():
      setattr(self, key, value)

  @staticmethod
  def _ValidateFields(fields, formatter):
    if isinstance(formatter, table_formatter.CsvFormatter):
      for field in fields:
        if field['type'].upper() == 'RECORD':
          raise app.UsageError(
              (
                  'Error printing table: Cannot print record '
                  'field "%s" in CSV format.'
              )
              % field['name']
          )
        if field.get('mode', 'NULLABLE').upper() == 'REPEATED':
          raise app.UsageError(
              (
                  'Error printing table: Cannot print repeated '
                  'field "%s" in CSV format.'
              )
              % (field['name'])
          )

  @staticmethod
  def _NormalizeRecord(field, value, use_full_timestamp):
    """Returns bq-specific formatting of a RECORD type."""
    result = collections.OrderedDict()
    for subfield, subvalue in zip(field.get('fields', []), value):
      result[subfield.get('name', '')] = TablePrinter.NormalizeField(
          subfield, subvalue, use_full_timestamp
      )
    return result

  @staticmethod
  def _NormalizeTimestamp(unused_field, value, use_full_timestamp):
    """Returns bq-specific formatting of a TIMESTAMP type."""
    try:
      if use_full_timestamp:
        return value
      else:
        date = datetime.datetime.fromtimestamp(
            0, tz=datetime.timezone.utc
        ) + datetime.timedelta(seconds=float(value))
        # Remove the extra timezone info "+00:00" at the end of the date.
        date = date.replace(tzinfo=None)
        # Our goal is the equivalent of '%Y-%m-%d %H:%M:%S' via strftime but that
        # doesn't work for dates with years prior to 1900.  Instead we zero out
        # fractional seconds then call isoformat with a space separator.
        date = date.replace(microsecond=0)
        return date.isoformat(' ')
    except (ValueError, OverflowError):
      return '<date out of range for display>'

  @staticmethod
  def _NormalizeRange(field, value, use_full_timestamp):
    """Returns bq-specific formatting of a RANGE type."""
    parsed = ParseRangeString(value)
    if parsed is None:
      return '<invalid range>'
    start, end = parsed

    if field.get('rangeElementType').get('type').upper() != 'TIMESTAMP':
      start = start.upper() if IsRangeBoundaryUnbounded(start) else start
      end = end.upper() if IsRangeBoundaryUnbounded(end) else end
      return '[%s, %s)' % (start, end)

    if IsRangeBoundaryUnbounded(start):
      normalized_start = start.upper()
    else:
      normalized_start = TablePrinter._NormalizeTimestamp(
          field, start, use_full_timestamp
      )
    if IsRangeBoundaryUnbounded(end):
      normalized_end = end.upper()
    else:
      normalized_end = TablePrinter._NormalizeTimestamp(
          field, end, use_full_timestamp
      )
    return '[%s, %s)' % (normalized_start, normalized_end)

  @staticmethod
  def NormalizeField(field, value, use_full_timestamp: bool):
    """Returns bq-specific formatting of a field."""
    if value is None:
      return None
    if field.get('mode', '').upper() == 'REPEATED':
      return [
          TablePrinter._NormalizeSingleValue(field, value, use_full_timestamp)
          for value in value
      ]
    return TablePrinter._NormalizeSingleValue(field, value, use_full_timestamp)

  @staticmethod
  def _NormalizeSingleValue(field, value, use_full_timestamp: bool):
    """Returns formatting of a single field value."""
    if field.get('type', '').upper() == 'RECORD':
      return TablePrinter._NormalizeRecord(field, value, use_full_timestamp)
    elif field.get('type', '').upper() == 'TIMESTAMP':
      return TablePrinter._NormalizeTimestamp(field, value, use_full_timestamp)
    elif field.get('type', '').upper() == 'RANGE':
      return TablePrinter._NormalizeRange(field, value, use_full_timestamp)
    return value

  @staticmethod
  def MaybeConvertToJson(value):
    """Converts dicts and lists to JSON; returns everything else as-is."""
    if isinstance(value, dict) or isinstance(value, list):
      return json.dumps(value, separators=(',', ':'), ensure_ascii=False)
    return value

  @staticmethod
  def FormatRow(fields, row, formatter, use_full_timestamp: bool):
    """Convert fields in a single row to bq-specific formatting."""
    values = [
        TablePrinter.NormalizeField(field, value, use_full_timestamp)
        for field, value in zip(fields, row)
    ]
    # Convert complex values to JSON if we're not already outputting as such.
    if not isinstance(formatter, table_formatter.JsonFormatter):
      values = map(TablePrinter.MaybeConvertToJson, values)
    # Convert NULL values to strings for CSV and non-JSON formats.
    if isinstance(formatter, table_formatter.CsvFormatter):
      values = ['' if value is None else value for value in values]
    elif not isinstance(formatter, table_formatter.JsonFormatter):
      values = ['NULL' if value is None else value for value in values]
    return values

  def PrintTable(self, fields, rows, use_full_timestamp: bool):
    formatter = utils_flags.get_formatter_from_flags(secondary_format='pretty')
    self._ValidateFields(fields, formatter)
    formatter.AddFields(fields)
    formatter.AddRows(
        TablePrinter.FormatRow(fields, row, formatter, use_full_timestamp)
        for row in rows
    )
    formatter.Print()


def CreateExternalTableDefinition(
    source_format,
    source_uris,
    schema,
    autodetect,
    connection_id=None,
    ignore_unknown_values=False,
    hive_partitioning_mode=None,
    hive_partitioning_source_uri_prefix=None,
    require_hive_partition_filter=None,
    use_avro_logical_types=False,
    parquet_enum_as_string=False,
    parquet_enable_list_inference=False,
    metadata_cache_mode=None,
    object_metadata=None,
    preserve_ascii_control_characters=False,
    reference_file_schema_uri=None,
    encoding=None,
    file_set_spec_type=None,
    null_marker=None,
    null_markers=None,
    time_zone=None,
    date_format=None,
    datetime_format=None,
    time_format=None,
    timestamp_format=None,
    source_column_match=None,
    parquet_map_target_type=None,
    timestamp_target_precision=None,
):
  """Creates an external table definition with the given URIs and the schema.

  Arguments:
    source_format: Format of source data. For CSV files, specify 'CSV'. For
      Google spreadsheet files, specify 'GOOGLE_SHEETS'. For newline-delimited
      JSON, specify 'NEWLINE_DELIMITED_JSON'. For Cloud Datastore backup,
      specify 'DATASTORE_BACKUP'. For Avro files, specify 'AVRO'. For Orc files,
      specify 'ORC'. For Parquet files, specify 'PARQUET'. For Iceberg tables,
      specify 'ICEBERG'.
    source_uris: Comma separated list of URIs that contain data for this table.
    schema: Either an inline schema or path to a schema file.
    autodetect: Indicates if format options, compression mode and schema be auto
      detected from the source data. True - means that autodetect is on, False
      means that it is off. None means format specific default: - For CSV it
      means autodetect is OFF - For JSON it means that autodetect is ON. For
      JSON, defaulting to autodetection is safer because the only option
      autodetected is compression. If a schema is passed, then the user-supplied
      schema is used.
    connection_id: The user flag with the same name defined for the _Load
      BigqueryCmd
    ignore_unknown_values:  Indicates if BigQuery should allow extra values that
      are not represented in the table schema. If true, the extra values are
      ignored. If false, records with extra columns are treated as bad records,
      and if there are too many bad records, an invalid error is returned in the
      job result. The default value is false. The sourceFormat property
      determines what BigQuery treats as an extra value: - CSV: Trailing columns
      - JSON: Named values that don't match any column names.
    hive_partitioning_mode: Enables hive partitioning.  AUTO indicates to
      perform automatic type inference.  STRINGS indicates to treat all hive
      partition keys as STRING typed.  No other values are accepted.
    hive_partitioning_source_uri_prefix: Shared prefix for all files until hive
      partitioning encoding begins.
    require_hive_partition_filter: The user flag with the same name defined for
      the _Load BigqueryCmd
    use_avro_logical_types: The user flag with the same name defined for the
      _Load BigqueryCmd
    parquet_enum_as_string: The user flag with the same name defined for the
      _Load BigqueryCmd
    parquet_enable_list_inference: The user flag with the same name defined for
      the _Load BigqueryCmd
    metadata_cache_mode: Enables metadata cache for an external table with a
      connection. Specify 'AUTOMATIC' to automatically refresh the cached
      metadata. Specify 'MANUAL' to stop the automatic refresh.
    object_metadata: Object Metadata Type.
    preserve_ascii_control_characters: The user flag with the same name defined
      for the _Load BigqueryCmd
    reference_file_schema_uri: The user flag with the same name defined for the
      _Load BigqueryCmd
    encoding: Encoding types for CSV files. Available options are: 'UTF-8',
      'ISO-8859-1', 'UTF-16BE', 'UTF-16LE', 'UTF-32BE', and 'UTF-32LE'. The
      default value is 'UTF-8'.
    file_set_spec_type: Set how to discover files given source URIs. Specify
      'FILE_SYSTEM_MATCH' (default behavior) to expand source URIs by listing
      files from the underlying object store. Specify
      'NEW_LINE_DELIMITED_MANIFEST' to parse the URIs as new line delimited
      manifest files, where each line contains a URI (No wild-card URIs are
      supported).
    null_marker: Specifies a string that represents a null value in a CSV file.
    null_markers: Specifies a list of strings that represent null values in a
      CSV file.
    time_zone: Specifies the time zone for a CSV or JSON file.
    date_format: Specifies the date format for a CSV or JSON file.
    datetime_format: Specifies the datetime format for a CSV or JSON file.
    time_format: Specifies the time format for a CSV or JSON file.
    timestamp_format: Specifies the timestamp format for a CSV or JSON file.
    source_column_match: Controls the strategy used to match loaded columns to
      the schema.
    parquet_map_target_type: Indicate the target type for parquet maps. If
      unspecified, we represent parquet maps as map {repeated key_value {key,
      value}}. This option can simplify this by omiting the key_value record if
      it's equal to ARRAY_OF_STRUCT.
    timestamp_target_precision: Precision (maximum number of total digits in
      base 10) for seconds of TIMESTAMP type.

  Returns:
    A python dictionary that contains a external table definition for the given
    format with the most common options set.
  """
  try:
    supported_formats = [
        'CSV',
        'NEWLINE_DELIMITED_JSON',
        'DATASTORE_BACKUP',
        'DELTA_LAKE',
        'AVRO',
        'ORC',
        'PARQUET',
        'GOOGLE_SHEETS',
        'ICEBERG',
    ]

    if source_format not in supported_formats:
      raise app.UsageError('%s is not a supported format.' % source_format)

    external_table_def = {'sourceFormat': source_format}
    if file_set_spec_type is not None:
      external_table_def['fileSetSpecType'] = file_set_spec_type
    if metadata_cache_mode is not None:
      external_table_def['metadataCacheMode'] = metadata_cache_mode
    if time_zone is not None:
      external_table_def['timeZone'] = time_zone
    if date_format is not None:
      external_table_def['dateFormat'] = date_format
    if datetime_format is not None:
      external_table_def['datetimeFormat'] = datetime_format
    if time_format is not None:
      external_table_def['timeFormat'] = time_format
    if timestamp_format is not None:
      external_table_def['timestampFormat'] = timestamp_format
    if object_metadata is not None:
      supported_obj_metadata_types = ['DIRECTORY', 'SIMPLE']

      if object_metadata not in supported_obj_metadata_types:
        raise app.UsageError(
            '%s is not a supported Object Metadata Type.' % object_metadata
        )

      external_table_def['sourceFormat'] = None
      external_table_def['objectMetadata'] = object_metadata
    if timestamp_target_precision is not None:
      external_table_def['timestampTargetPrecision'] = timestamp_target_precision

    if external_table_def['sourceFormat'] == 'CSV':
      if autodetect:
        external_table_def['autodetect'] = True
        external_table_def['csvOptions'] = yaml.safe_load("""
            {
                "quote": '"',
                "encoding": "UTF-8"
            }
        """)
      else:
        external_table_def['csvOptions'] = yaml.safe_load("""
            {
                "allowJaggedRows": false,
                "fieldDelimiter": ",",
                "allowQuotedNewlines": false,
                "quote": '"',
                "skipLeadingRows": 0,
                "encoding": "UTF-8"
            }
        """)
      external_table_def['csvOptions'][
          'preserveAsciiControlCharacters'
      ] = preserve_ascii_control_characters
      external_table_def['csvOptions']['encoding'] = encoding or 'UTF-8'
      if null_marker is not None:
        external_table_def['csvOptions']['nullMarker'] = null_marker
      if null_markers is not None:
        external_table_def['csvOptions']['nullMarkers'] = null_markers
      if source_column_match is not None:
        external_table_def['csvOptions'][
            'sourceColumnMatch'
        ] = source_column_match
    elif external_table_def['sourceFormat'] == 'NEWLINE_DELIMITED_JSON':
      if autodetect is None or autodetect:
        external_table_def['autodetect'] = True
      external_table_def['jsonOptions'] = {'encoding': encoding or 'UTF-8'}
    elif external_table_def['sourceFormat'] == 'GOOGLE_SHEETS':
      if autodetect is None or autodetect:
        external_table_def['autodetect'] = True
      else:
        external_table_def['googleSheetsOptions'] = yaml.safe_load("""
            {
                "skipLeadingRows": 0
            }
        """)
    elif external_table_def['sourceFormat'] == 'AVRO':
      external_table_def['avroOptions'] = {
          'useAvroLogicalTypes': use_avro_logical_types
      }
      if reference_file_schema_uri is not None:
        external_table_def['referenceFileSchemaUri'] = reference_file_schema_uri
    elif external_table_def['sourceFormat'] == 'PARQUET':
      external_table_def['parquetOptions'] = {
          'enumAsString': parquet_enum_as_string,
          'enableListInference': parquet_enable_list_inference,
          'mapTargetType': parquet_map_target_type,
      }
      if reference_file_schema_uri is not None:
        external_table_def['referenceFileSchemaUri'] = reference_file_schema_uri
    elif external_table_def['sourceFormat'] == 'ORC':
      if reference_file_schema_uri is not None:
        external_table_def['referenceFileSchemaUri'] = reference_file_schema_uri
    elif (
        external_table_def['sourceFormat'] == 'ICEBERG'
        or external_table_def['sourceFormat'] == 'DELTA_LAKE'
    ):
      source_format = (
          'Iceberg'
          if external_table_def['sourceFormat'] == 'ICEBERG'
          else 'Delta Lake'
      )
      if autodetect is not None and not autodetect or schema:
        raise app.UsageError(
            'Cannot create %s table from user-specified schema.'
            % (source_format,)
        )
      # Always autodetect schema for ICEBERG from manifest
      external_table_def['autodetect'] = True
      if len(source_uris.split(',')) != 1:
        raise app.UsageError(
            'Must provide only one source_uri for %s table.' % (source_format,)
        )


    if ignore_unknown_values:
      external_table_def['ignoreUnknownValues'] = True


    if hive_partitioning_mode is not None:
      ValidateHivePartitioningOptions(hive_partitioning_mode)
      hive_partitioning_options = {}
      hive_partitioning_options['mode'] = hive_partitioning_mode
      if hive_partitioning_source_uri_prefix is not None:
        hive_partitioning_options['sourceUriPrefix'] = (
            hive_partitioning_source_uri_prefix
        )
      external_table_def['hivePartitioningOptions'] = hive_partitioning_options
      if require_hive_partition_filter:
        hive_partitioning_options['requirePartitionFilter'] = True

    if schema:
      fields = bq_client_utils.ReadSchema(schema)
      external_table_def['schema'] = {'fields': fields}

    if connection_id:
      external_table_def['connectionId'] = connection_id

    external_table_def['sourceUris'] = source_uris.split(',')

    return external_table_def

  except ValueError as e:
    raise app.UsageError(
        'Error occurred while creating table definition: %s' % e
    )


def GetExternalDataConfig(
    file_path_or_simple_spec,
    use_avro_logical_types=False,
    parquet_enum_as_string=False,
    parquet_enable_list_inference=False,
    metadata_cache_mode=None,
    object_metadata=None,
    preserve_ascii_control_characters=None,
    reference_file_schema_uri=None,
    file_set_spec_type=None,
    null_marker=None,
    null_markers=None,
    time_zone=None,
    date_format=None,
    datetime_format=None,
    time_format=None,
    timestamp_format=None,
    source_column_match=None,
    parquet_map_target_type=None,
    timestamp_target_precision=None,
):
  """Returns a ExternalDataConfiguration from the file or specification string.

  Determines if the input string is a file path or a string,
  then returns either the parsed file contents, or the parsed configuration from
  string. The file content is expected to be JSON representation of
  ExternalDataConfiguration. The specification is expected to be of the form
  schema@format=uri i.e. schema is separated from format and uri by '@'. If the
  uri itself contains '@' or '=' then the JSON file option should be used.
  "format=" can be omitted for CSV files.

  Raises:
    UsageError: when incorrect usage or invalid args are used.
  """
  maybe_filepath = os.path.expanduser(file_path_or_simple_spec)
  if os.path.isfile(maybe_filepath):
    try:
      with open(maybe_filepath) as external_config_file:
        return yaml.safe_load(external_config_file)
    except yaml.error.YAMLError as e:
      raise app.UsageError(
          'Error decoding YAML external table definition from file %s: %s'
          % (maybe_filepath, e)
      )
  else:
    source_format = 'CSV'
    schema = None
    connection_id = None
    error_msg = (
        'Error decoding external_table_definition. '
        'external_table_definition should either be the name of a '
        'JSON file or the text representation of an external table '
        'definition. Given:%s'
    ) % (file_path_or_simple_spec)

    parts = file_path_or_simple_spec.split('@')
    if len(parts) == 1:
      # Schema and connection are not specified.
      format_and_uri = parts[0]
    elif len(parts) == 2:
      # when there are 2 components, it can be:
      # 1. format=uri@connection_id.e.g csv=gs://bucket/file@us.conn1
      # 2. schema@format=uri        e.g.col1::INTEGER@csv=gs://bucket/file
      # if the first element is format=uri, then second element is connnection.
      # Else, the first is schema, second is format=uri.
      if parts[0].find('://') >= 0:
        # format=uri and connection specified.
        format_and_uri = parts[0]
        connection_id = parts[1]
      else:
        # Schema and format=uri are specified.
        schema = parts[0]
        format_and_uri = parts[1]
    elif len(parts) == 3:
      # Schema and connection both are specified
      schema = parts[0]
      format_and_uri = parts[1]
      connection_id = parts[2]
    else:
      raise app.UsageError(error_msg)

    separator_pos = format_and_uri.find('=')
    if separator_pos < 0:
      # Format is not specified
      uri = format_and_uri
    else:
      source_format = format_and_uri[0:separator_pos]
      uri = format_and_uri[separator_pos + 1 :]

    if not uri:
      raise app.UsageError(error_msg)
    # When using short notation for external table definition
    # autodetect is always performed.

    return CreateExternalTableDefinition(
        source_format,
        uri,
        schema,
        True,
        connection_id,
        use_avro_logical_types=use_avro_logical_types,
        parquet_enum_as_string=parquet_enum_as_string,
        parquet_enable_list_inference=parquet_enable_list_inference,
        metadata_cache_mode=metadata_cache_mode,
        object_metadata=object_metadata,
        preserve_ascii_control_characters=preserve_ascii_control_characters,
        reference_file_schema_uri=reference_file_schema_uri,
        file_set_spec_type=file_set_spec_type,
        null_marker=null_marker,
        null_markers=null_markers,
        time_zone=time_zone,
        date_format=date_format,
        datetime_format=datetime_format,
        time_format=time_format,
        timestamp_format=timestamp_format,
        source_column_match=source_column_match,
        parquet_map_target_type=parquet_map_target_type,
        timestamp_target_precision=timestamp_target_precision,
    )


def GetJson(
    file_path_or_json_string: str,
) -> Optional[Dict[str, Any]]:
  """Returns a JSON object from the file or a JSON string.

  Determines if the input string is a file path or a string,
  then returns either the parsed file contents, or the parsed JSON from
  string. The file content is expected to be a JSON string.

  Args:
    file_path_or_json_string: Path to the JSON file or a JSON string.

  Raises:
    UsageError: when incorrect usage or invalid args are used.
  """
  maybe_filepath = os.path.expanduser(file_path_or_json_string)
  if os.path.isfile(maybe_filepath):
    try:
      with open(maybe_filepath) as json_file:
        return json.load(json_file)
    except json.decoder.JSONDecodeError as e:
      raise app.UsageError(
          'Error decoding JSON from file %s: %s' % (maybe_filepath, e)
      )
  else:
    try:
      return json.loads(file_path_or_json_string)
    except json.decoder.JSONDecodeError as e:
      raise app.UsageError(
          'Error decoding JSON from string %s: %s'
          % (file_path_or_json_string, e)
      )


def UpdateExternalCatalogTableOptions(
    current_options: Dict[str, Any],
    external_options_str: str,
) -> Dict[str, Any]:
  """Updates the external catalog table options.

  Args:
    current_options: The current external catalog table options.
    external_options_str: The new external catalog table options as a JSON
      string or a file path.

  Returns:
    The updated external catalog table options.
  """
  # Clear the parameters if they are present in the existing options but not
  # in the new external catalog table options.
  if current_options.get(_PARAMETERS_KEY) is not None:
    current_options[_PARAMETERS_KEY] = {
        k: None for k in current_options[_PARAMETERS_KEY]
    }
  # Clear the storage descriptor if they are present in the existing options
  # but not in the new external catalog table options.
  if current_options.get(_STORAGE_DESCRIPTOR_KEY) is not None:
    current_options[_STORAGE_DESCRIPTOR_KEY] = {
        k: None for k in current_options[_STORAGE_DESCRIPTOR_KEY]
    }

  external_catalog_table_options_dict = GetJson(external_options_str)
  if _PARAMETERS_KEY in external_catalog_table_options_dict:
    current_options.setdefault(_PARAMETERS_KEY, {})
    current_options[_PARAMETERS_KEY].update(
        external_catalog_table_options_dict[_PARAMETERS_KEY]
    )
  else:
    current_options[_PARAMETERS_KEY] = None
  if _STORAGE_DESCRIPTOR_KEY in external_catalog_table_options_dict:
    current_options[_STORAGE_DESCRIPTOR_KEY] = (
        external_catalog_table_options_dict[_STORAGE_DESCRIPTOR_KEY]
    )
  else:
    current_options[_STORAGE_DESCRIPTOR_KEY] = None
  if _CONNECTION_ID_KEY in external_catalog_table_options_dict:
    current_options[_CONNECTION_ID_KEY] = external_catalog_table_options_dict[
        _CONNECTION_ID_KEY
    ]
  else:
    current_options[_CONNECTION_ID_KEY] = None
  return current_options


def UpdateExternalCatalogDatasetOptions(
    current_options: Dict[str, Any],
    external_options_str: str,
) -> Dict[str, Any]:
  """Updates the external catalog dataset options.

  Args:
    current_options: The current external catalog dataset options.
    external_options_str: The new external catalog dataset options as a JSON
      string or a file path.

  Returns:
    The updated external catalog dataset options.
  """
  # Clear the parameters if they are present in the existing dataset but not
  # in the new external catalog dataset options.
  if current_options.get(_PARAMETERS_KEY) is not None:
    current_options[_PARAMETERS_KEY] = {
        k: None for k in current_options[_PARAMETERS_KEY]
    }
  external_catalog_dataset_options_dict = GetJson(external_options_str)
  if _PARAMETERS_KEY in external_catalog_dataset_options_dict:
    current_options.setdefault(_PARAMETERS_KEY, {})
    for key, value in external_catalog_dataset_options_dict[
        _PARAMETERS_KEY
    ].items():
      current_options[_PARAMETERS_KEY][key] = value
  else:
    current_options[_PARAMETERS_KEY] = None
  if _DEFAULT_STORAGE_LOCATION_URI_KEY in external_catalog_dataset_options_dict:
    current_options[_DEFAULT_STORAGE_LOCATION_URI_KEY] = (
        external_catalog_dataset_options_dict[_DEFAULT_STORAGE_LOCATION_URI_KEY]
    )
  else:
    current_options[_DEFAULT_STORAGE_LOCATION_URI_KEY] = None
  return current_options


def PrintPageToken(page_token):
  """Prints the page token in the pretty format.

  Args:
    page_token: The dictionary mapping of pageToken with string 'nextPageToken'.
  """
  formatter = utils_flags.get_formatter_from_flags(secondary_format='pretty')
  utils_formatting.configure_formatter(
      formatter, bq_id_utils.ApiClientHelper.NextPageTokenReference
  )
  formatter.AddDict(page_token)
  formatter.Print()


def ParseTimePartitioning(
    partitioning_type=None,
    partitioning_expiration=None,
    partitioning_field=None,
    partitioning_minimum_partition_date=None,
    partitioning_require_partition_filter=None,
):
  """Parses time partitioning from the arguments.

  Args:
    partitioning_type: type for the time partitioning. Supported types are HOUR,
      DAY, MONTH, and YEAR. The default value is DAY when other arguments are
      specified, which generates one partition per day.
    partitioning_expiration: number of seconds to keep the storage for a
      partition. A negative value clears this setting.
    partitioning_field: if not set, the table is partitioned based on the
      loading time; if set, the table is partitioned based on the value of this
      field.
    partitioning_minimum_partition_date: lower boundary of partition date for
      field based partitioning table.
    partitioning_require_partition_filter: if true, queries on the table must
      have a partition filter so not all partitions are scanned.

  Returns:
    Time partitioning if any of the arguments is not None, otherwise None.

  Raises:
    UsageError: when failed to parse.
  """

  time_partitioning = {}
  key_type = 'type'
  key_expiration = 'expirationMs'
  key_field = 'field'
  key_minimum_partition_date = 'minimumPartitionDate'
  key_require_partition_filter = 'requirePartitionFilter'
  if partitioning_type is not None:
    time_partitioning[key_type] = partitioning_type
  if partitioning_expiration is not None:
    time_partitioning[key_expiration] = partitioning_expiration * 1000
  if partitioning_field is not None:
    time_partitioning[key_field] = partitioning_field
  if partitioning_minimum_partition_date is not None:
    if partitioning_field is not None:
      time_partitioning[key_minimum_partition_date] = (
          partitioning_minimum_partition_date
      )
    else:
      raise app.UsageError(
          'Need to specify --time_partitioning_field for '
          '--time_partitioning_minimum_partition_date.'
      )
  if partitioning_require_partition_filter is not None:
    if time_partitioning:
      time_partitioning[key_require_partition_filter] = (
          partitioning_require_partition_filter
      )

  if time_partitioning:
    if key_type not in time_partitioning:
      time_partitioning[key_type] = 'DAY'
    if (
        key_expiration in time_partitioning
        and time_partitioning[key_expiration] <= 0
    ):
      time_partitioning[key_expiration] = None
    return time_partitioning
  else:
    return None


def ParseFileSetSpecType(file_set_spec_type=None):
  """Parses the file set specification type from the arguments.

  Args:
    file_set_spec_type: specifies how to discover files given source URIs.

  Returns:
    file set specification type.
  Raises:
    UsageError: when an illegal value is passed.
  """
  if file_set_spec_type is None:
    return None
  valid_spec_types = ['FILE_SYSTEM_MATCH', 'NEW_LINE_DELIMITED_MANIFEST']
  if file_set_spec_type not in valid_spec_types:
    raise app.UsageError(
        'Error parsing file_set_spec_type, only FILE_SYSTEM_MATCH, '
        'NEW_LINE_DELIMITED_MANIFEST or no value are accepted'
    )
  return 'FILE_SET_SPEC_TYPE_' + file_set_spec_type


def ParseClustering(
    clustering_fields: Optional[str] = None,
) -> Optional[Dict[str, List[str]]]:
  """Parses clustering from the arguments.

  Args:
    clustering_fields: Comma-separated field names.

  Returns:
    Clustering if any of the arguments is not None, otherwise None. Special
    case if clustering_fields is passed in as an empty string instead of None,
    in which case we'll return {}, to support the scenario where user wants to
    update a table and remove the clustering spec.
  """

  if clustering_fields == '':  # pylint: disable=g-explicit-bool-comparison
    return {}
  elif clustering_fields is not None:
    return {'fields': clustering_fields.split(',')}
  else:
    return None


def ParseNumericTypeConversionMode(
    numeric_type_conversion_mode: Optional[str] = None,
) -> Optional[str]:
  """Parses the numeric type conversion mode from the arguments.

  Args:
    numeric_type_conversion_mode: specifies how the numeric values are handled
      when the value is out of scale.

  Returns:
    The conversion mode.

  Raises:
    UsageError: when an illegal value is passed.
  """

  if numeric_type_conversion_mode is None:
    return None
  elif numeric_type_conversion_mode == 'ROUND':
    return 'NUMERIC_TYPE_VALUE_ROUND'
  else:
    raise app.UsageError(
        'Error parsing numeric_type_conversion_mode, only ROUND or no value '
        'are accepted'
    )


def ParseRangePartitioning(range_partitioning_spec=None):
  """Parses range partitioning from the arguments.

  Args:
    range_partitioning_spec: specification for range partitioning in the format
      of field,start,end,interval.

  Returns:
    Range partitioning if range_partitioning_spec is not None, otherwise None.
  Raises:
    UsageError: when the spec fails to parse.
  """

  range_partitioning = {}
  key_field = 'field'
  key_range = 'range'
  key_range_start = 'start'
  key_range_end = 'end'
  key_range_interval = 'interval'

  if range_partitioning_spec is not None:
    parts = range_partitioning_spec.split(',')
    if len(parts) != 4:
      raise app.UsageError(
          'Error parsing range_partitioning. range_partitioning should be in '
          'the format of "field,start,end,interval"'
      )
    range_partitioning[key_field] = parts[0]
    range_spec = {}
    range_spec[key_range_start] = parts[1]
    range_spec[key_range_end] = parts[2]
    range_spec[key_range_interval] = parts[3]
    range_partitioning[key_range] = range_spec

  if range_partitioning:
    return range_partitioning
  else:
    return None


def IsSuccessfulDmlOrDdlJob(printable_job_info: str) -> bool:
  """Returns True iff the job is successful and is a DML/DDL query job."""
  return (
      'Affected Rows' in printable_job_info
      or 'DDL Operation Performed' in printable_job_info
  )


def MaybeGetSessionTempObjectName(
    dataset_id: str, object_id: str
) -> Optional[str]:
  """If we have a session temporary object, returns the user name of the object.

  Args:
    dataset_id: Dataset of object
    object_id: Id of object

  Returns:
    If the object is a session temp object, the name of the object after
    stripping out internal stuff such as session prefix and signature encodings.

    If the object is not a session temp object, the return value is None.
  """
  if not re.fullmatch('_[0-9a-f]{40}', dataset_id):
    return None  # Not an anonymous dataset

  session_prefix_regexp = (
      '_[0-9a-f]{8}_[0-9a-f]{4}_[0-9a-f]{4}_[0-9a-f]{4}_[0-9a-f]{12}_'
  )
  opt_signature_encoding_regexp = '(?:_b0a98f6_.*)?'
  match = re.fullmatch(
      session_prefix_regexp + '(.*?)' + opt_signature_encoding_regexp, object_id
  )
  if not match:
    return None  # No session prefix
  return match.group(1)


def PrintJobMessages(printable_job_info):
  """Prints additional info from a job formatted for printing.

  If the job had a fatal error, non-fatal warnings are not shown.

  If any error/warning does not have a 'message' key, printable_job_info must
  have 'jobReference' identifying the job.

  For DML queries prints number of affected rows.
  For DDL queries prints the performed operation and the target.
  """
  messages = GetJobMessagesForPrinting(printable_job_info)
  if messages:
    print(messages)


def GetJobMessagesForPrinting(printable_job_info):
  """Similar to _PrintJobMessages(), but returns a string, rather than printing."""
  result_lines = []

  job_ref = '(unknown)'  # Should never be seen, but beats a weird crash.
  if 'jobReference' in printable_job_info:
    job_ref = printable_job_info['jobReference']

  # For failing jobs, display the error but not any warnings, because those
  # may be more distracting than helpful.
  if printable_job_info['State'] == 'FAILURE':
    error_result = printable_job_info['status']['errorResult']
    error_ls = printable_job_info['status'].get('errors', [])
    error = bq_error.CreateBigqueryError(error_result, error_result, error_ls)
    result_lines.append(
        'Error encountered during job execution:\n%s\n' % (error,)
    )
  elif 'errors' in printable_job_info['status']:
    warnings = printable_job_info['status']['errors']
    result_lines.append((
        'Warning%s encountered during job execution:\n'
        % ('' if len(warnings) == 1 else 's')
    ))
    recommend_show = False
    for w in warnings:
      # Some warnings include detailed error messages, and some just
      # include programmatic error codes.  Some have a 'location'
      # separately, and some put it in the 'message' text.
      if 'message' not in w:
        recommend_show = True
      else:
        if 'location' in w:
          message = '[%s] %s' % (w['location'], w['message'])
        else:
          message = w['message']
        if message is not None:
          message = message.encode('utf-8')
        result_lines.append('%s\n' % message)
    if recommend_show:
      result_lines.append('Use "bq show -j %s" to view job warnings.' % job_ref)
  elif 'Affected Rows' in printable_job_info:
    result_lines.append(
        'Number of affected rows: %s\n' % printable_job_info['Affected Rows']
    )
  elif 'DDL Target Table' in printable_job_info:
    ddl_target_table = printable_job_info['DDL Target Table']
    project_id = ddl_target_table.get('projectId')
    dataset_id = ddl_target_table.get('datasetId')
    table_id = ddl_target_table.get('tableId')
    op = _DDL_OPERATION_MAP.get(
        printable_job_info.get('DDL Operation Performed')
    )
    # DDL Target Table is returned for both TABLE DDL and DROP ALL ROW ACCESS
    # POLICIES DDL statements.
    if project_id and dataset_id and table_id and op:
      if 'DDL Affected Row Access Policy Count' in printable_job_info:
        ddl_affected_row_access_policy_count = printable_job_info[
            'DDL Affected Row Access Policy Count'
        ]
        result_lines.append(
            '{op} {count} row access policies on table '
            '{project}.{dataset}.{table}\n'.format(
                op=op,
                count=ddl_affected_row_access_policy_count,
                project=project_id,
                dataset=dataset_id,
                table=table_id,
            )
        )
      elif (
          'Statement Type' in printable_job_info
          and 'INDEX' in printable_job_info['Statement Type']
      ):
        if 'SEARCH_INDEX' in printable_job_info['Statement Type']:
          result_lines.append(
              '%s search index on table %s.%s.%s\n'
              % (
                  stringutil.ensure_str(op),
                  stringutil.ensure_str(project_id),
                  stringutil.ensure_str(dataset_id),
                  stringutil.ensure_str(table_id),
              )
          )
        elif 'VECTOR_INDEX' in printable_job_info['Statement Type']:
          index_progress_instruction = ''
          if printable_job_info.get('DDL Operation Performed') in (
              'CREATE',
              'REPLACE',
          ):
            index_progress_instruction = (
                'Please query %s.%s.INFORMATION_SCHEMA to check the progress '
                ' of the index.\n'
            ) % (project_id, dataset_id)
          result_lines.append(
              '%s vector index on table %s.%s.%s\n%s'
              % (
                  stringutil.ensure_str(op),
                  stringutil.ensure_str(project_id),
                  stringutil.ensure_str(dataset_id),
                  stringutil.ensure_str(table_id),
                  stringutil.ensure_str(index_progress_instruction),
              )
          )
      else:
        result_lines.append(
            '%s %s.%s.%s\n'
            % (
                stringutil.ensure_str(op),
                stringutil.ensure_str(project_id),
                stringutil.ensure_str(dataset_id),
                stringutil.ensure_str(table_id),
            )
        )
        if 'Default Connection Stats' in printable_job_info:
          default_connection_stats = printable_job_info[
              'Default Connection Stats'
          ]
          location_id = job_ref['location']
          if 'provisioned' in default_connection_stats:
            if printable_job_info['Statement Type'] == 'CREATE_MODEL':
              target_type = 'model'
            else:
              target_type = 'table'
            result_lines.append(
                'Default connection created for %s [%s] in project [%s] in'
                ' region [%s]\n'
                % (
                    stringutil.ensure_str(target_type),
                    stringutil.ensure_str(table_id),
                    stringutil.ensure_str(project_id),
                    stringutil.ensure_str(location_id),
                )
            )
          if 'permissionUpdated' in default_connection_stats:
            result_lines.append(
                'Your IAM policy has been updated for the default connection\n'
            )
  elif 'DDL Target Routine' in printable_job_info:
    ddl_target_routine = printable_job_info['DDL Target Routine']
    project_id = ddl_target_routine.get('projectId')
    dataset_id = ddl_target_routine.get('datasetId')
    routine_id = ddl_target_routine.get('routineId')
    op = _DDL_OPERATION_MAP.get(
        printable_job_info.get('DDL Operation Performed')
    )
    temp_object_name = MaybeGetSessionTempObjectName(dataset_id, routine_id)
    if temp_object_name is not None:
      result_lines.append('%s temporary routine %s' % (op, temp_object_name))
    else:
      result_lines.append(
          '%s %s.%s.%s' % (op, project_id, dataset_id, routine_id)
      )
  elif 'DDL Target Row Access Policy' in printable_job_info:
    ddl_target_row_access_policy = printable_job_info[
        'DDL Target Row Access Policy'
    ]
    project_id = ddl_target_row_access_policy.get('projectId')
    dataset_id = ddl_target_row_access_policy.get('datasetId')
    table_id = ddl_target_row_access_policy.get('tableId')
    row_access_policy_id = ddl_target_row_access_policy.get('policyId')
    op = _DDL_OPERATION_MAP.get(
        printable_job_info.get('DDL Operation Performed')
    )
    if project_id and dataset_id and table_id and row_access_policy_id and op:
      result_lines.append(
          '{op} row access policy {policy} on table {project}.{dataset}.{table}'
          .format(
              op=op,
              policy=row_access_policy_id,
              project=project_id,
              dataset=dataset_id,
              table=table_id,
          )
      )
  elif 'Assertion' in printable_job_info:
    result_lines.append('Assertion successful')

  if 'Session Id' in printable_job_info:
    result_lines.append('In session: %s' % printable_job_info['Session Id'])

  return '\n'.join(result_lines)


def PrintObjectInfo(
    object_info,
    reference: bq_id_utils.ApiClientHelper.Reference,
    custom_format: bq_consts.CustomPrintFormat,
    print_reference: bool = True,
) -> None:
  """Prints the object with various formats."""
  # The JSON formats are handled separately so that they don't print
  # the record as a list of one record.
  if custom_format == 'schema':
    if 'schema' not in object_info or 'fields' not in object_info['schema']:
      raise app.UsageError('Unable to retrieve schema from specified table.')
    bq_utils.PrintFormattedJsonObject(object_info['schema']['fields'])
  elif FLAGS.format in ['prettyjson', 'json']:
    bq_utils.PrintFormattedJsonObject(object_info)
  elif FLAGS.format in [None, 'sparse', 'pretty']:
    formatter = utils_flags.get_formatter_from_flags()
    utils_formatting.configure_formatter(
        formatter,
        type(reference),
        print_format=custom_format,
        object_info=object_info,
    )
    object_info = utils_formatting.format_info_by_type(
        object_info, type(reference)
    )
    if object_info:
      formatter.AddDict(object_info)
    if reference.typename and print_reference:
      print('%s %s\n' % (reference.typename.capitalize(), reference))
    formatter.Print()
    print()
    if isinstance(reference, bq_id_utils.ApiClientHelper.JobReference):
      PrintJobMessages(object_info)
  else:
    formatter = utils_flags.get_formatter_from_flags()
    formatter.AddColumns(list(object_info.keys()))
    formatter.AddDict(object_info)
    formatter.Print()


def PrintObjectsArray(object_infos, objects_type):
  if FLAGS.format in ['prettyjson', 'json']:
    bq_utils.PrintFormattedJsonObject(object_infos)
  elif FLAGS.format in [None, 'sparse', 'pretty']:
    if not object_infos:
      return
    formatter = utils_flags.get_formatter_from_flags()
    utils_formatting.configure_formatter(
        formatter, objects_type, print_format='list'
    )
    formatted_infos = list(
        map(
            functools.partial(
                utils_formatting.format_info_by_type,
                object_type=objects_type,
            ),
            object_infos,
        )
    )
    for info in formatted_infos:
      formatter.AddDict(info)
    formatter.Print()
  elif object_infos:
    formatter = utils_flags.get_formatter_from_flags()
    formatter.AddColumns(list(object_infos[0].keys()))
    for info in object_infos:
      formatter.AddDict(info)
    formatter.Print()


class ResourceMetadata(TypedDict):
  token: NotRequired[str] = None
  unreachable: NotRequired[List[str]] = None


def PrintObjectsArrayWithMetadata(
    objects_list: List[Any],
    objects_type: Type[bq_id_utils.ApiClientHelper.Reference],
    passed_flags: NamedTuple(
        'PassedFlags',
        [
            ('print_last_token', bool),
            ('print_unreachable', bool),
        ],
    ),
    objects_metadata: Optional[ResourceMetadata],
) -> None:
  """Prints the objects array with metadata configured to print using flags.

  If there is no `objects_metadata` passed in, then this function has the same
  behaviour as `PrintObjectsArray`.

  Different metadata can be printed by setting flags in `passed_flags`. With
  a `format` of 'sparse' or 'pretty' then nothing will be printed if no flags
  are set. With a format of 'prettyjson' or 'json' then the `objects_list`
  will be printed as a `results` value even if no flags are set to print
  metadata but if some are set, these will also be printed.

  Arguments:
    objects_list: The list of resources to print.
    objects_type: The type of the resources to be printed.
    passed_flags: Flags used to configure the printing behaviour.
    objects_metadata: Optional metadata to be printed.
  """
  if FLAGS.format in ['prettyjson', 'json']:
    if passed_flags.print_last_token or passed_flags.print_unreachable:
      json_object = {'results': objects_list}
      if passed_flags.print_last_token and 'token' in objects_metadata:
        json_object['token'] = objects_metadata['token']
      if passed_flags.print_unreachable and 'unreachable' in objects_metadata:
        json_object['unreachable'] = objects_metadata['unreachable']
    else:
      json_object = objects_list
    bq_utils.PrintFormattedJsonObject(json_object)
  elif FLAGS.format in [None, 'sparse', 'pretty']:
    PrintObjectsArray(objects_list, objects_type)
    if objects_metadata is None:
      return
    if passed_flags.print_last_token and 'token' in objects_metadata:
      print('\nNext token: ' + objects_metadata['token'])
    if passed_flags.print_unreachable and 'unreachable' in objects_metadata:
      print('\nUnreachable: ' + ', '.join(objects_metadata['unreachable']))


def ParseUdfResources(udf_resources):
  """Parses UDF resources from an array of resource URIs.

  Arguments:
    udf_resources: Array of udf resource URIs.

  Returns:
    Array of UDF resources parsed into the format expected by the BigQuery API
    client.
  """

  if udf_resources is None:
    return None
  inline_udf_resources = []
  external_udf_resources = []
  for uris in udf_resources:
    for uri in uris.split(','):
      if os.path.isfile(uri):
        with open(uri) as udf_file:
          inline_udf_resources.append(udf_file.read())
      else:
        if not uri.startswith('gs://'):
          raise app.UsageError(
              'Non-inline resources must be Google Cloud Storage (gs://) URIs'
          )
        external_udf_resources.append(uri)
  udfs = []
  if inline_udf_resources:
    for udf_code in inline_udf_resources:
      udfs.append({'inlineCode': udf_code})
  if external_udf_resources:
    for uri in external_udf_resources:
      udfs.append({'resourceUri': uri})
  return udfs


def ValidateDatasetName(dataset_name: str) -> None:
  """A regex to ensure the dataset name is valid.


  Arguments:
    dataset_name: string name of the dataset to be validated.

  Raises:
    UsageError: An error occurred due to invalid dataset string.
  """
  is_valid = re.fullmatch(r'[a-zA-Z0-9\_]{1,1024}', dataset_name)
  if not is_valid:
    raise app.UsageError(
        'Dataset name: %s is invalid, must be letters '
        '(uppercase or lowercase), numbers, and underscores up to '
        '1024 characters.' % dataset_name
    )


def ParseParameters(parameters):
  """Parses query parameters from an array of name:type:value.

  Arguments:
    parameters: An iterable of string-form query parameters: name:type:value.
      Name may be omitted to indicate a positional parameter: :type:value. Type
      may be omitted to indicate a string: name::value, or ::value.

  Returns:
    A list of query parameters in the form for the BigQuery API client.
  """
  if not parameters:
    return None
  results = []
  for param_string in parameters:
    if os.path.isfile(param_string):
      with open(param_string) as f:
        results += json.load(f)
    else:
      results.append(ParseParameter(param_string))
  return results


def SplitParam(param_string):
  split = param_string.split(':', 1)
  if len(split) != 2:
    raise app.UsageError(
        'Query parameters must be of the form: '
        '"name:type:value", ":type:value", or "name::value". '
        'An empty name produces a positional parameter. '
        'An empty type produces a STRING parameter.'
    )
  return split


def ParseParameter(param_string):
  """Parse a string of the form <name><type>:<value> into each part."""
  name, param_string = SplitParam(param_string)
  try:
    type_dict, value_dict = ParseParameterTypeAndValue(param_string)
  except app.UsageError as e:
    print('Error parsing parameter %s: %s' % (name, e))
    sys.exit(1)
  result = {'parameterType': type_dict, 'parameterValue': value_dict}
  if name:
    result['name'] = name
  return result


def ParseParameterTypeAndValue(param_string):
  """Parse a string of the form <recursive_type>:<value> into each part."""
  type_string, value_string = SplitParam(param_string)
  if not type_string:
    type_string = 'STRING'
  type_dict = ParseParameterType(type_string)
  return type_dict, ParseParameterValue(type_dict, value_string)


def ParseParameterType(type_string):
  """Parse a parameter type string into a JSON dict for the BigQuery API."""
  type_dict = {'type': type_string.upper()}
  if type_string.upper().startswith('ARRAY<') and type_string.endswith('>'):
    type_dict = {
        'type': 'ARRAY',
        'arrayType': ParseParameterType(type_string[6:-1]),
    }
  if type_string.startswith('STRUCT<') and type_string.endswith('>'):
    type_dict = {
        'type': 'STRUCT',
        'structTypes': ParseStructType(type_string[7:-1]),
    }
  if type_string.startswith('RANGE<') and type_string.endswith('>'):
    type_dict = {
        'type': 'RANGE',
        'rangeElementType': ParseParameterType(type_string[6:-1]),
    }
  if not type_string:
    raise app.UsageError('Query parameter missing type')
  return type_dict


def ParseStructType(type_string):
  """Parse a Struct QueryParameter type into a JSON dict form."""
  subtypes = []
  for name, sub_type in StructTypeSplit(type_string):
    subtypes.append({'type': ParseParameterType(sub_type), 'name': name})
  return subtypes


def StructTypeSplit(type_string):
  """Yields single field-name, sub-types tuple from a StructType string.

  Raises:
    UsageError: When a field name is missing.
  """
  while type_string:
    next_span = type_string.split(',', 1)[0]
    if '<' in next_span:
      angle_count = 0
      i = 0
      for i in range(next_span.find('<'), len(type_string)):
        if type_string[i] == '<':
          angle_count += 1
        if type_string[i] == '>':
          angle_count -= 1
        if angle_count == 0:
          break
      if angle_count != 0:
        raise app.UsageError('Malformatted struct type')
      next_span = type_string[: i + 1]
    type_string = type_string[len(next_span) + 1 :]
    splits = next_span.split(None, 1)
    if len(splits) != 2:
      raise app.UsageError('Struct parameter missing name for field')
    yield splits


def FormatRfc3339(datetime_obj: datetime.datetime) -> str:
  """Formats a datetime.datetime object (UTC) in RFC3339.

  https://developers.google.com/protocol-buffers/docs/reference/google.protobuf#timestamp

  Args:
    datetime_obj: A datetime.datetime object representing a datetime in UTC.

  Returns:
    The string representation of the date in RFC3339.
  """
  return datetime_obj.isoformat('T') + 'Z'


def ParseRangeParameterValue(range_value: str) -> Tuple[str, str]:
  """Parse a range parameter value string into its components.

  Args:
    range_value: A range value string of the form "[<start>, <end>)".

  Returns:
    A tuple (<start>, <end>).

  Raises:
    app.UsageError: if the input range value string was not formatted correctly.
  """
  parsed = ParseRangeString(range_value)
  if parsed is None:
    raise app.UsageError(
        f'Invalid range parameter value: {range_value}. Expected format:'
        ' "[<start>, <end>)"'
    )
  return parsed


def ParseParameterValue(type_dict, value_input):
  """Parse a parameter value of type `type_dict` from value_input.

  Arguments:
    type_dict: The JSON-dict type as which to parse `value_input`.
    value_input: Either a string representing the value, or a JSON dict for
      array and value types.
  """
  if 'structTypes' in type_dict:
    if isinstance(value_input, str):
      if value_input == 'NULL':
        return {'structValues': None}
      value_input = json.loads(value_input)
    type_map = dict([(x['name'], x['type']) for x in type_dict['structTypes']])
    values = {}
    for field_name, value in value_input.items():
      values[field_name] = ParseParameterValue(type_map[field_name], value)
    return {'structValues': values}
  if 'arrayType' in type_dict:
    if isinstance(value_input, str):
      if value_input == 'NULL':
        return {'arrayValues': None}
      try:
        value_input = json.loads(value_input)
      except json.decoder.JSONDecodeError:
        tb = sys.exc_info()[2]
        # pylint: disable=raise-missing-from
        raise app.UsageError(
            'Error parsing string as JSON: %s' % value_input
        ).with_traceback(tb)
    values = [
        ParseParameterValue(type_dict['arrayType'], x) for x in value_input
    ]
    if not values:  # Workaround to pass empty array parameter.
      return {'value': {}}  # An empty arrayValues list is the same as NULL.
    return {'arrayValues': values}
  if 'rangeElementType' in type_dict:
    if value_input == 'NULL':
      return {'rangeValue': None}
    start, end = ParseRangeParameterValue(value_input)
    return {
        'rangeValue': {
            'start': ParseParameterValue(type_dict['rangeElementType'], start),
            'end': ParseParameterValue(type_dict['rangeElementType'], end),
        }
    }
  return {'value': value_input if value_input != 'NULL' else None}