HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/platform/bq/frontend/command_load.py
#!/usr/bin/env python
"""All the BigQuery CLI commands."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import datetime
import time
from typing import Optional

from absl import flags

import bq_flags
from clients import client_job
from clients import utils as bq_client_utils
from frontend import bigquery_command
from frontend import bq_cached_client
from frontend import flags as frontend_flags
from frontend import utils as frontend_utils
from frontend import utils_flags
from frontend import utils_formatting

# These aren't relevant for user-facing docstrings:
# pylint: disable=g-doc-return-or-yield
# pylint: disable=g-doc-args


class Load(bigquery_command.BigqueryCmd):
  usage = """load <destination_table> <source> <schema>"""

  def __init__(self, name: str, fv: flags.FlagValues):
    super(Load, self).__init__(name, fv)
    flags.DEFINE_string(
        'field_delimiter',
        None,
        'The character that indicates the boundary between columns in the '
        'input file. "\\t" and "tab" are accepted names for tab.',
        short_name='F',
        flag_values=fv,
    )
    flags.DEFINE_enum(
        'encoding',
        None,
        ['UTF-8', 'ISO-8859-1', 'UTF-16BE', 'UTF-16LE', 'UTF-32BE', 'UTF-32LE'],
        (
            'The character encoding used by the input file.  Options include:'
            '\n ISO-8859-1 (also known as Latin-1)'
            '\n UTF-8'
            '\n UTF-16BE (UTF-16 BigEndian)'
            '\n UTF-16LE (UTF-16 LittleEndian)'
            '\n UTF-32BE (UTF-32 BigEndian)'
            '\n UTF-32LE (UTF-32 LittleEndian)'
        ),
        short_name='E',
        flag_values=fv,
    )
    flags.DEFINE_integer(
        'skip_leading_rows',
        None,
        'The number of rows at the beginning of the source file to skip.',
        flag_values=fv,
    )
    flags.DEFINE_string(
        'schema',
        None,
        'Either a filename or a comma-separated list of fields in the form '
        'name[:type].',
        flag_values=fv,
    )
    flags.DEFINE_boolean(
        'replace',
        False,
        'If true existing data is erased when new data is loaded.',
        flag_values=fv,
    )
    flags.DEFINE_boolean(
        'replace_data',
        False,
        'If true, erase existing contents but not other table metadata like'
        ' schema before loading new data.',
        flag_values=fv,
    )
    flags.DEFINE_string(
        'quote',
        None,
        'Quote character to use to enclose records. Default is ". '
        'To indicate no quote character at all, use an empty string.',
        flag_values=fv,
    )
    flags.DEFINE_integer(
        'max_bad_records',
        None,
        'Maximum number of bad records allowed before the entire job fails. '
        'Only supported for CSV and NEWLINE_DELIMITED_JSON file formats.',
        flag_values=fv,
    )
    flags.DEFINE_boolean(
        'allow_quoted_newlines',
        None,
        'Whether to allow quoted newlines in CSV import data.',
        flag_values=fv,
    )
    flags.DEFINE_boolean(
        'allow_jagged_rows',
        None,
        'Whether to allow missing trailing optional columns '
        'in CSV import data.',
        flag_values=fv,
    )
    flags.DEFINE_boolean(
        'preserve_ascii_control_characters',
        None,
        'Whether to preserve embedded Ascii Control characters in CSV import '
        'data.',
        flag_values=fv,
    )
    flags.DEFINE_boolean(
        'ignore_unknown_values',
        None,
        'Whether to allow and ignore extra, unrecognized values in CSV or JSON '
        'import data.',
        flag_values=fv,
    )
    flags.DEFINE_enum(
        'source_format',
        None,
        [
            'CSV',
            'NEWLINE_DELIMITED_JSON',
            'DATASTORE_BACKUP',
            'AVRO',
            'PARQUET',
            'ORC',
            'THRIFT',
        ],
        'Format of source data. Options include:'
        '\n CSV'
        '\n NEWLINE_DELIMITED_JSON'
        '\n DATASTORE_BACKUP'
        '\n AVRO'
        '\n PARQUET'
        '\n ORC'
        '\n THRIFT',
        flag_values=fv,
    )
    flags.DEFINE_list(
        'projection_fields',
        [],
        'If sourceFormat is set to "DATASTORE_BACKUP", indicates which entity '
        'properties to load into BigQuery from a Cloud Datastore backup. '
        'Property names are case sensitive and must refer to top-level '
        'properties.',
        flag_values=fv,
    )
    flags.DEFINE_boolean(
        'autodetect',
        None,
        'Enable auto detection of schema and options for formats that are not '
        'self describing like CSV and JSON.',
        flag_values=fv,
    )
    flags.DEFINE_multi_string(
        'schema_update_option',
        None,
        'Can be specified when append to a table, or replace a table partition.'
        ' When specified, the schema of the destination table will be updated '
        'with the schema of the new data. One or more of the following options '
        'can be specified:'
        '\n ALLOW_FIELD_ADDITION: allow new fields to be added'
        '\n ALLOW_FIELD_RELAXATION: allow relaxing required fields to nullable',
        flag_values=fv,
    )
    flags.DEFINE_string(
        'null_marker',
        None,
        'An optional custom string that will represent a NULL value'
        'in CSV import data.',
        flag_values=fv,
    )
    flags.DEFINE_list(
        'null_markers',
        [],
        'An optional list of custom strings that will represent a NULL value'
        'in CSV import data.',
        flag_values=fv,
    )
    flags.DEFINE_string(
        'time_partitioning_type',
        None,
        'Enables time based partitioning on the table and set the type. The '
        'default value is DAY, which will generate one partition per day. '
        'Other supported values are HOUR, MONTH, and YEAR.',
        flag_values=fv,
    )
    flags.DEFINE_integer(
        'time_partitioning_expiration',
        None,
        'Enables time based partitioning on the table and sets the number of '
        'seconds for which to keep the storage for the partitions in the table.'
        ' The storage in a partition will have an expiration time of its '
        'partition time plus this value.',
        flag_values=fv,
    )
    flags.DEFINE_string(
        'range_partitioning',
        None,
        'Enables range partitioning on the table. The format should be '
        '"field,start,end,interval". The table will be partitioned based on the'
        ' value of the field. Field must be a top-level, non-repeated INT64 '
        'field. Start, end, and interval are INT64 values defining the ranges.',
        flag_values=fv,
    )
    flags.DEFINE_string(
        'time_partitioning_field',
        None,
        'Enables time based partitioning on the table and the table will be '
        'partitioned based on the value of this field. If time based '
        'partitioning is enabled without this value, the table will be '
        'partitioned based on the loading time.',
        flag_values=fv,
    )
    flags.DEFINE_string(
        'destination_kms_key',
        None,
        'Cloud KMS key for encryption of the destination table data.',
        flag_values=fv,
    )
    flags.DEFINE_boolean(
        'require_partition_filter',
        None,
        'Whether to require partition filter for queries over this table. '
        'Only apply to partitioned table.',
        flag_values=fv,
    )
    flags.DEFINE_string(
        'clustering_fields',
        None,
        'Comma-separated list of field names that specifies the columns on '
        'which a table is clustered. To remove the clustering, set an empty '
        'value.',
        flag_values=fv,
    )
    flags.DEFINE_boolean(
        'use_avro_logical_types',
        None,
        'If sourceFormat is set to "AVRO", indicates whether to enable '
        'interpreting logical types into their corresponding types '
        '(ie. TIMESTAMP), instead of only using their raw types (ie. INTEGER).',
        flag_values=fv,
    )
    flags.DEFINE_string(
        'reference_file_schema_uri',
        None,
        'provide a reference file with the reader schema, currently '
        'enabled for the format: AVRO, PARQUET, ORC.',
        flag_values=fv,
    )
    flags.DEFINE_boolean(
        'parquet_enum_as_string',
        None,
        'Infer Parquet ENUM logical type as STRING '
        '(instead of BYTES by default).',
        flag_values=fv,
    )
    flags.DEFINE_boolean(
        'parquet_enable_list_inference',
        None,
        frontend_utils.PARQUET_LIST_INFERENCE_DESCRIPTION,
        flag_values=fv,
    )
    flags.DEFINE_string(
        'hive_partitioning_mode',
        None,
        'Enables hive partitioning.  AUTO indicates to perform '
        'automatic type inference.  STRINGS indicates to treat all hive '
        'partition keys as STRING typed.  No other values are accepted',
        flag_values=fv,
    )
    flags.DEFINE_string(
        'hive_partitioning_source_uri_prefix',
        None,
        'Prefix after which hive partition '
        'encoding begins.  For URIs like gs://bucket/path/key1=value/file, '
        'the value should be gs://bucket/path.',
        flag_values=fv,
    )
    flags.DEFINE_multi_enum(
        'decimal_target_types',
        None,
        ['NUMERIC', 'BIGNUMERIC', 'STRING'],
        'Specifies the list of possible BigQuery data types to '
        'which the source decimal values are converted. This list and the '
        'precision and the scale parameters of the decimal field determine the '
        'target type in the following preference order, and '
        'one or more of the following options could be specified: '
        '\n NUMERIC: decimal values could be converted to NUMERIC type, '
        'depending on the precision and scale of the decimal schema.'
        '\n BIGNUMERIC: decimal values could be converted to BIGNUMERIC type, '
        'depending on the precision and scale of the decimal schema.'
        '\n STRING: decimal values could be converted to STRING type, '
        'depending on the precision and scale of the decimal schema.',
        flag_values=fv,
    )
    flags.DEFINE_enum(
        'file_set_spec_type',
        None,
        ['FILE_SYSTEM_MATCH', 'NEW_LINE_DELIMITED_MANIFEST'],
        'Specifies how to discover files given source URIs. '
        'Options include: '
        '\n FILE_SYSTEM_MATCH: expand source URIs by listing files from the '
        'underlying object store. This is the default behavior.'
        '\n NEW_LINE_DELIMITED_MANIFEST: indicate the source URIs provided are '
        'new line delimited manifest files, where each line contains a URI '
        'with no wild-card.',
        flag_values=fv,
    )
    flags.DEFINE_string(
        'thrift_schema_idl_root_dir',
        None,
        'If "source_format" is set to "THRIFT", indicates the root directory '
        'of the Thrift IDL bundle containing all Thrift files that should be '
        'used to parse the schema of the serialized Thrift records.',
        flag_values=fv,
    )
    flags.DEFINE_string(
        'thrift_schema_idl_uri',
        None,
        'If "source_format" is set to "THRIFT", indicates the file uri that '
        'contains the Thrift IDL struct to be parsed as schema. This file will '
        'be used as the entry point to parse the schema and all its included '
        'Thrift IDL files should be in "thrift_schema_idl_root_dir".',
        flag_values=fv,
    )
    flags.DEFINE_string(
        'thrift_schema_struct',
        None,
        'If "source_format" is set to "THRIFT", indicates the root Thrift '
        'struct that should be used as the schema. This struct should be '
        'defined in the "thrift_schema_idl_uri" file.',
        flag_values=fv,
    )
    flags.DEFINE_enum(
        'thrift_deserialization',
        None,
        ['T_BINARY_PROTOCOL'],
        'If "source_format" is set to "THRIFT", configures how serialized '
        'Thrift record should be deserialized (using TProtocol). '
        'Options include: '
        '\n T_BINARY_PROTOCOL',
        flag_values=fv,
    )
    flags.DEFINE_enum(
        'thrift_framing',
        None,
        ['NOT_FRAMED', 'FRAMED_WITH_BIG_ENDIAN', 'FRAMED_WITH_LITTLE_ENDIAN'],
        'If "source_format" is set to "THRIFT", configures how Thrift records '
        'or data blocks are framed (e.g. using TFramedTransport). '
        'Options includes: '
        '\n NOT_FRAMED, '
        '\n FRAMED_WITH_BIG_ENDIAN, '
        '\n FRAMED_WITH_LITTLE_ENDIAN',
        flag_values=fv,
    )
    flags.DEFINE_string(
        'boundary_bytes_base64',
        None,
        'If "source_format" is set to "THRIFT", indicates the sequence of '
        'boundary bytes (encoded in base64) that are added in front of the '
        'serialized Thrift records, or data blocks, or the frame when used '
        'with `thrift_framing`.',
        flag_values=fv,
    )
    flags.DEFINE_enum(
        'json_extension',
        None,
        ['GEOJSON'],
        '(experimental) Allowed values: '
        'GEOJSON: only allowed when source_format is specified as '
        'NEWLINE_DELIMITED_JSON. When specified, the input is loaded as '
        'newline-delimited GeoJSON.',
        flag_values=fv,
    )
    flags.DEFINE_enum(
        'column_name_character_map',
        None,
        ['STRICT', 'V1', 'V2'],
        'Indicates the character map used for column names: '
        '\n STRICT: The latest character map and reject invalid column names.'
        '\n V1: Supports alphanumeric + underscore and name must start with a '
        'letter or underscore. Invalid column names will be normalized.'
        '\n V2: Supports flexible column name. Invalid column names will be '
        'normalized.',
        flag_values=fv,
    )
    flags.DEFINE_string(
        'session_id',
        None,
        'If loading to a temporary table, specifies the session ID of the '
        'temporary table',
        flag_values=fv,
    )
    flags.DEFINE_boolean(
        'copy_files_only',
        None,
        '[Experimental] Configures the load job to only copy files to the '
        'destination BigLake managed table, without reading file content and '
        'writing them to new files.',
        flag_values=fv,
    )
    flags.DEFINE_string(
        'time_zone',
        None,
        'Default time zone that will apply when parsing timestamp values that'
        ' have no specific time zone. For example, "America/Los_Angeles".',
        flag_values=fv,
    )
    flags.DEFINE_string(
        'date_format',
        None,
        'Format elements that define how the DATE values are formatted in the'
        ' input files. For example, "MM/DD/YYYY".',
        flag_values=fv,
    )
    flags.DEFINE_string(
        'datetime_format',
        None,
        'Format elements that define how the DATETIME values are formatted in'
        ' the input files. For example, "MM/DD/YYYY HH24:MI:SS.FF3".',
        flag_values=fv,
    )
    flags.DEFINE_string(
        'time_format',
        None,
        'Format elements that define how the TIME values are formatted in the'
        ' input files. For example, "HH24:MI:SS.FF3".',
        flag_values=fv,
    )
    flags.DEFINE_string(
        'timestamp_format',
        None,
        'Format elements that define how the TIMESTAMP values are formatted in'
        ' the input files. For example, "MM/DD/YYYY HH24:MI:SS.FF3".',
        flag_values=fv,
    )
    flags.DEFINE_enum(
        'source_column_match',
        None,
        ['POSITION', 'NAME'],
        'Controls the strategy used to match loaded columns to the schema.'
        ' Options include:'
        '\n POSITION: matches by position. This option assumes that the columns'
        ' are ordered the same way as the schema.'
        '\n NAME: matches by name. This option reads the header row as column'
        ' names and reorders columns to match the field names in the schema.',
        flag_values=fv,
    )
    self.timestamp_target_precision_flag = (
        frontend_flags.define_timestamp_target_precision(flag_values=fv)
    )
    self.parquet_map_target_type_flag = (
        frontend_flags.define_parquet_map_target_type(flag_values=fv)
    )
    self.reservation_id_for_a_job_flag = (
        frontend_flags.define_reservation_id_for_a_job(flag_values=fv)
    )
    self._ProcessCommandRc(fv)

  def RunWithArgs(
      self, destination_table: str, source: str, schema: Optional[str] = None
  ) -> Optional[int]:
    """Perform a load operation of source into destination_table.

    Usage:
      load <destination_table> <source> [<schema>] [--session_id=[session]]

    The <destination_table> is the fully-qualified table name of table to
    create, or append to if the table already exists.

    To load to a temporary table, specify the table name in <destination_table>
    without a dataset and specify the session id with --session_id.

    The <source> argument can be a path to a single local file, or a
    comma-separated list of URIs.

    The <schema> argument should be either the name of a JSON file or a text
    schema. This schema should be omitted if the table already has one.

    In the case that the schema is provided in text form, it should be a
    comma-separated list of entries of the form name[:type], where type will
    default to string if not specified.

    In the case that <schema> is a filename, it should be a JSON file
    containing a single array, each entry of which should be an object with
    properties 'name', 'type', and (optionally) 'mode'. For more detail:
    https://cloud.google.com/bigquery/docs/schemas#specifying_a_json_schema_file

    Note: the case of a single-entry schema with no type specified is
    ambiguous; one can use name:string to force interpretation as a
    text schema.

    Examples:
      bq load ds.new_tbl ./info.csv ./info_schema.json
      bq load ds.new_tbl gs://mybucket/info.csv ./info_schema.json
      bq load ds.small gs://mybucket/small.csv name:integer,value:string
      bq load ds.small gs://mybucket/small.csv field1,field2,field3
      bq load temp_tbl --session_id=my_session ./info.csv ./info_schema.json

    Arguments:
      destination_table: Destination table name.
      source: Name of local file to import, or a comma-separated list of URI
        paths to data to import.
      schema: Either a text schema or JSON file, as above.
    """
    client = bq_cached_client.Client.Get()
    default_dataset_id = ''
    if self.session_id:
      default_dataset_id = '_SESSION'
    table_reference = bq_client_utils.GetTableReference(
        id_fallbacks=client,
        identifier=destination_table,
        default_dataset_id=default_dataset_id,
    )
    opts = {
        'encoding': self.encoding,
        'skip_leading_rows': self.skip_leading_rows,
        'allow_quoted_newlines': self.allow_quoted_newlines,
        'job_id': utils_flags.get_job_id_from_flags(),
        'source_format': self.source_format,
        'projection_fields': self.projection_fields,
    }
    if self.max_bad_records:
      opts['max_bad_records'] = self.max_bad_records
    if bq_flags.LOCATION.value:
      opts['location'] = bq_flags.LOCATION.value
    if self.session_id:
      opts['connection_properties'] = [
          {'key': 'session_id', 'value': self.session_id}
      ]
    if self.copy_files_only:
      opts['copy_files_only'] = self.copy_files_only
    if self.replace:
      opts['write_disposition'] = 'WRITE_TRUNCATE'
    elif self.replace_data:
      opts['write_disposition'] = 'WRITE_TRUNCATE_DATA'
    if self.field_delimiter is not None:
      opts['field_delimiter'] = frontend_utils.NormalizeFieldDelimiter(
          self.field_delimiter
      )
    if self.quote is not None:
      opts['quote'] = frontend_utils.NormalizeFieldDelimiter(self.quote)
    if self.allow_jagged_rows is not None:
      opts['allow_jagged_rows'] = self.allow_jagged_rows
    if self.preserve_ascii_control_characters is not None:
      opts['preserve_ascii_control_characters'] = (
          self.preserve_ascii_control_characters
      )
    if self.ignore_unknown_values is not None:
      opts['ignore_unknown_values'] = self.ignore_unknown_values
    if self.autodetect is not None:
      opts['autodetect'] = self.autodetect
    if self.schema_update_option:
      opts['schema_update_options'] = self.schema_update_option
    if self.null_marker:
      opts['null_marker'] = self.null_marker
    if self.null_markers is not None:
      opts['null_markers'] = self.null_markers
    time_partitioning = frontend_utils.ParseTimePartitioning(
        self.time_partitioning_type,
        self.time_partitioning_expiration,
        self.time_partitioning_field,
        None,
        self.require_partition_filter,
    )
    if time_partitioning is not None:
      opts['time_partitioning'] = time_partitioning
    range_partitioning = frontend_utils.ParseRangePartitioning(
        self.range_partitioning
    )
    if range_partitioning:
      opts['range_partitioning'] = range_partitioning
    clustering = frontend_utils.ParseClustering(self.clustering_fields)
    if clustering:
      opts['clustering'] = clustering
    if self.destination_kms_key is not None:
      opts['destination_encryption_configuration'] = {
          'kmsKeyName': self.destination_kms_key
      }
    if self.use_avro_logical_types is not None:
      opts['use_avro_logical_types'] = self.use_avro_logical_types
    if self.reference_file_schema_uri is not None:
      opts['reference_file_schema_uri'] = self.reference_file_schema_uri
    if self.hive_partitioning_mode is not None:
      frontend_utils.ValidateHivePartitioningOptions(
          self.hive_partitioning_mode
      )
      hive_partitioning_options = {}
      hive_partitioning_options['mode'] = self.hive_partitioning_mode
      if self.hive_partitioning_source_uri_prefix is not None:
        hive_partitioning_options['sourceUriPrefix'] = (
            self.hive_partitioning_source_uri_prefix
        )
      opts['hive_partitioning_options'] = hive_partitioning_options
    if self.json_extension is not None:
      opts['json_extension'] = self.json_extension
    if self.column_name_character_map is not None:
      opts['column_name_character_map'] = self.column_name_character_map
    opts['decimal_target_types'] = self.decimal_target_types
    if self.file_set_spec_type is not None:
      opts['file_set_spec_type'] = frontend_utils.ParseFileSetSpecType(
          self.file_set_spec_type
      )
    if self.time_zone is not None:
      opts['time_zone'] = self.time_zone
    if self.date_format is not None:
      opts['date_format'] = self.date_format
    if self.datetime_format is not None:
      opts['datetime_format'] = self.datetime_format
    if self.time_format is not None:
      opts['time_format'] = self.time_format
    if self.timestamp_format is not None:
      opts['timestamp_format'] = self.timestamp_format
    if self.source_column_match is not None:
      opts['source_column_match'] = self.source_column_match
    if self.timestamp_target_precision_flag.value is not None:
      opts['timestamp_target_precision'] = (
          self.timestamp_target_precision_flag.value
      )
    if opts['source_format'] == 'THRIFT':
      thrift_options = {}
      if self.thrift_schema_idl_root_dir is not None:
        thrift_options['schema_idl_root_dir'] = self.thrift_schema_idl_root_dir
      if self.thrift_schema_idl_uri is not None:
        thrift_options['schema_idl_uri'] = self.thrift_schema_idl_uri
      if self.thrift_schema_struct is not None:
        thrift_options['schema_struct'] = self.thrift_schema_struct
      # Default to 'THRIFT_BINARY_PROTOCOL_OPTION'.
      thrift_options['deserialization_option'] = 'THRIFT_BINARY_PROTOCOL_OPTION'
      if self.thrift_deserialization is not None:
        if self.thrift_deserialization == 'T_BINARY_PROTOCOL':
          thrift_options['deserialization_option'] = (
              'THRIFT_BINARY_PROTOCOL_OPTION'
          )
      # Default to 'NOT_FRAMED'.
      thrift_options['framing_option'] = 'NOT_FRAMED'
      if self.thrift_framing is not None:
        thrift_options['framing_option'] = self.thrift_framing
      if self.boundary_bytes_base64 is not None:
        thrift_options['boundary_bytes'] = self.boundary_bytes_base64
      opts['thrift_options'] = thrift_options
    if opts['source_format'] == 'PARQUET':
      parquet_options = {}
      if self.parquet_enum_as_string is not None:
        parquet_options['enum_as_string'] = self.parquet_enum_as_string
      if self.parquet_enable_list_inference is not None:
        parquet_options['enable_list_inference'] = (
            self.parquet_enable_list_inference
        )
      if self.parquet_map_target_type_flag.value is not None:
        parquet_options['mapTargetType'] = (
            self.parquet_map_target_type_flag.value
        )
      if parquet_options:
        opts['parquet_options'] = parquet_options
    if self.reservation_id_for_a_job_flag.present:
      opts['reservation_id'] = self.reservation_id_for_a_job_flag.value
    job = client_job.Load(
        client, table_reference, source, schema=schema, **opts
    )
    if bq_flags.SYNCHRONOUS_MODE.value:
      frontend_utils.PrintJobMessages(utils_formatting.format_job_info(job))
    else:
      self.PrintJobStartInfo(job)