HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/command_lib/dataflow/sql_util.py
# -*- coding: utf-8 -*- #
# Copyright 2020 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Helpers for writing commands interacting with Cloud Dataflow SQL.
"""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import collections
import json
from googlecloudsdk.calliope import arg_parsers
from googlecloudsdk.calliope import exceptions
from googlecloudsdk.calliope.concepts import concepts
from googlecloudsdk.command_lib.dataflow import dataflow_util
from googlecloudsdk.command_lib.dataflow import job_utils
from googlecloudsdk.command_lib.util.concepts import concept_parsers
from googlecloudsdk.command_lib.util.concepts import presentation_specs
from googlecloudsdk.core import properties


def ArgsForSqlQuery(parser):
  """Register flags for running a SQL query.

  Args:
    parser: The argparse.ArgParser to configure with query arguments.
  """
  job_utils.CommonArgs(parser)

  parser.add_argument(
      'query', metavar='QUERY', help='The SQL query to execute.')

  parser.add_argument(
      '--job-name',
      help='The unique name to assign to the Cloud Dataflow job.',
      required=True)

  parser.add_argument(
      '--region',
      type=arg_parsers.RegexpValidator(r'\w+-\w+\d',
                                       'must provide a valid region'),
      help=('Region ID of the job\'s regional endpoint. '
            + dataflow_util.DEFAULT_REGION_MESSAGE),
      required=True)

  output_group = parser.add_group(
      required=True, help='The destination(s) for the output of the query.')

  concept_parsers.ConceptParser([
      presentation_specs.ResourcePresentationSpec(
          '--bigquery-table',
          concepts.ResourceSpec(
              'bigquery.tables',
              resource_name='BigQuery table',
              tableId=concepts.ResourceParameterAttributeConfig(
                  name='bigquery-table', help_text='The BigQuery table ID.'),
              projectId=concepts.ResourceParameterAttributeConfig(
                  name='bigquery-project',
                  help_text='The BigQuery project ID.'),
              datasetId=concepts.ResourceParameterAttributeConfig(
                  name='bigquery-dataset',
                  help_text='The BigQuery dataset ID.')),
          'The BigQuery table to write query output to.',
          prefixes=False,
          group=output_group),
      presentation_specs.ResourcePresentationSpec(
          '--pubsub-topic',
          concepts.ResourceSpec(
              'pubsub.projects.topics',
              resource_name='Pub/Sub topic',
              topicsId=concepts.ResourceParameterAttributeConfig(
                  name='pubsub-topic', help_text='The Pub/Sub topic ID.'),
              projectsId=concepts.ResourceParameterAttributeConfig(
                  name='pubsub-project',
                  help_text='The Pub/Sub project ID.')),
          'The Cloud Pub/Sub topic to write query output to.',
          prefixes=False,
          group=output_group),
  ]).AddToParser(parser)

  parser.add_argument(
      '--bigquery-write-disposition',
      help='The behavior of the BigQuery write operation.',
      choices=['write-empty', 'write-truncate', 'write-append'],
      default='write-empty')

  parser.add_argument(
      '--pubsub-create-disposition',
      help='The behavior of the Pub/Sub create operation.',
      choices=['create-if-not-found', 'fail-if-not-found'],
      default='create-if-not-found')

  parameter_group = parser.add_mutually_exclusive_group()

  parameter_group.add_argument(
      '--parameter',
      action='append',
      help='Parameters to pass to a query. Parameters must use the format '
      'name:type:value, for example min_word_count:INT64:250.')

  parameter_group.add_argument(
      '--parameters-file',
      help='Path to a file containing query parameters in JSON format.'
      ' e.g. [{"parameterType": {"type": "STRING"}, "parameterValue":'
      ' {"value": "foo"}, "name": "x"}, {"parameterType": {"type":'
      ' "FLOAT64"}, "parameterValue": {"value": "1.0"}, "name": "y"}]')

  parser.add_argument(
      '--dry-run',
      action='store_true',
      help='Construct but do not run the SQL pipeline, for smoke testing.')

  parser.add_argument(
      '--sql-launcher-template-engine',
      hidden=True,
      help='The template engine to use for the SQL launcher template.',
      choices=['flex', 'dynamic'],
      default='flex')

  parser.add_argument(
      '--sql-launcher-template',
      hidden=True,
      help='The full GCS path to a SQL launcher template spec, e.g. '
      'gs://dataflow-sql-templates-us-west1/cloud_dataflow_sql_launcher_template_20201208_RC00/sql_launcher_flex_template. '
      'If None is specified, default to the latest release in the region. '
      'Note that older releases are not guaranteed to be compatible.')


def ExtractOutputs(args):
  """Parses outputs from args, returning a JSON string with the results."""
  outputs = []
  if args.bigquery_table:
    bq_project = None
    dataset = None
    table = None

    table_parts = args.bigquery_table.split('.')
    if len(table_parts) == 3:
      bq_project, dataset, table = table_parts
    elif len(table_parts) == 2:
      dataset, table = table_parts
    elif len(table_parts) == 1:
      table, = table_parts
    else:
      raise exceptions.InvalidArgumentException(
          '--bigquery-table',
          'Malformed table identifier. Use format "project.dataset.table".')

    if bq_project is None:
      bq_project = args.bigquery_project if args.bigquery_project else properties.VALUES.core.project.GetOrFail(
      )
    elif args.bigquery_project and args.bigquery_project != bq_project:
      raise exceptions.InvalidArgumentException(
          '--bigquery-project',
          '"{}" does not match project "{}" set in qualified `--bigquery-table`.'
          .format(args.bigquery_project, bq_project))

    if dataset is None:
      if not args.bigquery_dataset:
        raise exceptions.RequiredArgumentException(
            '--bigquery-dataset',
            'Must be specified when `--bigquery-table` is unqualified.')
      dataset = args.bigquery_dataset
    elif args.bigquery_dataset and args.bigquery_dataset != dataset:
      raise exceptions.InvalidArgumentException(
          '--bigquery-dataset',
          '"{}" does not match dataset "{}" set in qualified `--bigquery-table`.'
          .format(args.bigquery_dataset, dataset))

    table_config = collections.OrderedDict([('projectId', bq_project),
                                            ('datasetId', dataset),
                                            ('tableId', table)])
    write_disposition = {
        'write-empty': 'WRITE_EMPTY',
        'write-truncate': 'WRITE_TRUNCATE',
        'write-append': 'WRITE_APPEND'
    }[args.bigquery_write_disposition]
    bq_config = collections.OrderedDict([('type', 'bigquery'),
                                         ('table', table_config),
                                         ('writeDisposition',
                                          write_disposition)])
    outputs.append(bq_config)
  if args.pubsub_topic:
    create_disposition = {
        'create-if-not-found': 'CREATE_IF_NOT_FOUND',
        'fail-if-not-found': 'FAIL_IF_NOT_FOUND'
    }[args.pubsub_create_disposition]
    pubsub_config = collections.OrderedDict([
        ('type', 'pubsub'),
        ('projectId',
         args.pubsub_project if args.pubsub_project else
         properties.VALUES.core.project.GetOrFail()),
        ('topic', args.pubsub_topic),
        ('createDisposition', create_disposition)
    ])
    outputs.append(pubsub_config)
  return json.dumps(outputs)