HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/394/lib/googlecloudsdk/api_lib/sql/import_util.py
# -*- coding: utf-8 -*- #
# Copyright 2017 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Common command-agnostic utility functions for sql import commands."""


def ParseBakType(sql_messages, bak_type):
  if bak_type is None:
    return (
        sql_messages.ImportContext.BakImportOptionsValue.BakTypeValueValuesEnum.FULL
    )
  return sql_messages.ImportContext.BakImportOptionsValue.BakTypeValueValuesEnum.lookup_by_name(
      bak_type.upper()
  )


def SqlImportContext(
    sql_messages,
    uri,
    database=None,
    user=None,
    parallel=False,
    threads=None,
    clean=False,
    if_exists=False,
):
  """Generates the ImportContext for the given args, for importing from SQL.

  Args:
    sql_messages: module, The messages module that should be used.
    uri: The URI of the bucket to import from; the output of the 'uri' arg.
    database: The database to import to; the output of the '--database' flag.
    user: The Postgres user to import as; the output of the '--user' flag.
    parallel: Whether to use parallel import or not; the output of the
      '--parallel' flag.
    threads: The number of threads to use; the output of the '--threads' flag.
      Only applicable for parallel import.
    clean: Clean (DROP) database objects before recreating them. Corresponds to
      the --clean flag on pg_restore. Only applies if --parallel is set.
      PostgreSQL only.
    if_exists: Include SQL statement (IF EXISTS) with each
      DROP statement produced by --clean; Corresponds to the --if-exists  flag
      on pg_restore. Only applies if --parallel is set. PostgreSQL only.

  Returns:
    ImportContext, for use in InstancesImportRequest.importContext.
  """
  if parallel:
    postgres_import_options = None
    if clean or if_exists:
      postgres_import_options = (
          sql_messages.ImportContext.SqlImportOptionsValue
          .PostgresImportOptionsValue(
              clean=clean,
              ifExists=if_exists,
          )
      )

    return sql_messages.ImportContext(
        kind='sql#importContext',
        uri=uri,
        database=database,
        fileType=sql_messages.ImportContext.FileTypeValueValuesEnum.SQL,
        importUser=user,
        sqlImportOptions=sql_messages.ImportContext.SqlImportOptionsValue(
            parallel=parallel,
            threads=threads,
            postgresImportOptions=postgres_import_options,
        ),
    )
  else:
    return sql_messages.ImportContext(
        kind='sql#importContext',
        uri=uri,
        database=database,
        fileType=sql_messages.ImportContext.FileTypeValueValuesEnum.SQL,
        importUser=user,
        sqlImportOptions=sql_messages.ImportContext.SqlImportOptionsValue(
            threads=threads
        ),
    )


def CsvImportContext(sql_messages,
                     uri,
                     database,
                     table,
                     columns=None,
                     user=None,
                     quote=None,
                     escape=None,
                     fields_terminated_by=None,
                     lines_terminated_by=None):
  """Generates the ImportContext for the given args, for importing from CSV.

  Args:
    sql_messages: module, The messages module that should be used.
    uri: The URI of the bucket to import from; the output of the 'uri' arg.
    database: The database to import into; the output of the '--database' flag.
    table: The table to import into; the output of the '--table' flag.
    columns: The CSV columns to import form; the output of the '--columns' flag.
    user: The Postgres user to import as; the output of the '--user' flag.
    quote: character in Hex. The quote character for CSV format; the output of
      the '--quote' flag.
    escape: character in Hex. The escape character for CSV format; the output of
      the '--escape' flag.
    fields_terminated_by: character in Hex. The fields delimiter character for
      CSV format; the output of the '--fields-terminated-by' flag.
    lines_terminated_by: character in Hex. The lines delimiter character for CSV
      format; the output of the '--lines-terminated-by' flag.

  Returns:
    ImportContext, for use in InstancesImportRequest.importContext.
  """
  return sql_messages.ImportContext(
      kind='sql#importContext',
      csvImportOptions=sql_messages.ImportContext.CsvImportOptionsValue(
          columns=columns or [], table=table,
          quoteCharacter=quote,
          escapeCharacter=escape,
          fieldsTerminatedBy=fields_terminated_by,
          linesTerminatedBy=lines_terminated_by),
      uri=uri,
      database=database,
      fileType=sql_messages.ImportContext.FileTypeValueValuesEnum.CSV,
      importUser=user)


def BakImportContext(
    sql_messages,
    uri,
    database,
    cert_path,
    pvk_path,
    pvk_password,
    keep_encrypted,
    striped,
    no_recovery,
    recovery_only,
    bak_type,
    stop_at,
    stop_at_mark,
):
  """Generates the ImportContext for the given args, for importing from BAK.

  Args:
    sql_messages: module, The messages module that should be used.
    uri: The URI of the bucket to import from; the output of the `uri` arg.
    database: The database to import to; the output of the `--database` flag.
    cert_path: The certificate used for encrypted .bak; the output of the
      `--cert-path` flag.
    pvk_path: The private key used for encrypted .bak; the output of the
      `--pvk-path` flag.
    pvk_password: The private key password used for encrypted .bak; the output
      of the `--pvk-password` or `--prompt-for-pvk-password` flag.
    keep_encrypted: Whether or not to decrypt the imported encrypted BAK file.
    striped: Whether or not the import is striped.
    no_recovery: Whether the import executes with NORECOVERY keyword.
    recovery_only: Whether the import skip download and bring database online.
    bak_type: Type of the bak file.
    stop_at: Equivalent to SQL Server STOPAT keyword.
    stop_at_mark: Equivalent to SQL Server STOPATMARK keyword.

  Returns:
    ImportContext, for use in InstancesImportRequest.importContext.
  """
  bak_import_options = None
  if cert_path and pvk_path and pvk_password:
    bak_import_options = sql_messages.ImportContext.BakImportOptionsValue(
        encryptionOptions=sql_messages.ImportContext.BakImportOptionsValue
        .EncryptionOptionsValue(
            certPath=cert_path,
            pvkPath=pvk_path,
            pvkPassword=pvk_password,
        ))
    if keep_encrypted:
      bak_import_options.encryptionOptions.keepEncrypted = keep_encrypted
  else:
    bak_import_options = sql_messages.ImportContext.BakImportOptionsValue()

  if striped:
    bak_import_options.striped = striped

  bak_import_options.noRecovery = no_recovery
  bak_import_options.recoveryOnly = recovery_only
  bak_import_options.bakType = ParseBakType(sql_messages, bak_type)
  if stop_at is not None:
    bak_import_options.stopAt = stop_at.strftime('%Y-%m-%dT%H:%M:%S.%fZ')
  bak_import_options.stopAtMark = stop_at_mark

  return sql_messages.ImportContext(
      kind='sql#importContext',
      uri=uri,
      database=database,
      fileType=sql_messages.ImportContext.FileTypeValueValuesEnum.BAK,
      bakImportOptions=bak_import_options)


def TdeImportContext(
    sql_messages,
    certificate,
    cert_path,
    pvk_path,
    pvk_password,
):
  """Generates the ImportContext for the given args, for importing from TDE.

  Args:
    sql_messages: module, The messages module that should be used.
    certificate: The certificate name; the output of the
      `--certificate` flag.
    cert_path: The certificate path in Google Cloud Storage; the output of the
      `--cert-path` flag.
    pvk_path: The private key path in Google Cloud Storage; the output of the
      `--pvk-path` flag.
    pvk_password: The password that encrypts the private key; the output
      of the `--pvk-password` or `--prompt-for-pvk-password` flag.

  Returns:
    ImportContext, for use in InstancesImportRequest.importContext.
  """
  tde_import_options = sql_messages.ImportContext.TdeImportOptionsValue(
      name=certificate,
      certificatePath=cert_path,
      privateKeyPath=pvk_path,
      privateKeyPassword=pvk_password)

  return sql_messages.ImportContext(
      kind='sql#importContext',
      fileType=sql_messages.ImportContext.FileTypeValueValuesEnum.TDE,
      tdeImportOptions=tde_import_options)