HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/command_lib/datastream/resource_args.py
# -*- coding: utf-8 -*- #
# Copyright 2020 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Shared resource flags for Datastream commands."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

from googlecloudsdk.calliope import base
from googlecloudsdk.calliope.concepts import concepts
from googlecloudsdk.command_lib.util.concepts import concept_parsers
from googlecloudsdk.command_lib.util.concepts import presentation_specs

_MYSQL_SOURCE_CONFIG_HELP_TEXT_BETA = """\
  Path to a YAML (or JSON) file containing the configuration for MySQL Source Config.

  The JSON file is formatted as follows, with snake_case field naming:

  ```
    {
      "allowlist": {},
      "rejectlist":  {
        "mysql_databases": [
            {
              "database_name":"sample_database",
              "mysql_tables": [
                {
                  "table_name": "sample_table",
                  "mysql_columns": [
                    {
                      "column_name": "sample_column",
                    }
                   ]
                }
              ]
            }
          ]
        }
    }
  ```
"""
_MYSQL_SOURCE_CONFIG_HELP_TEXT = """\
  Path to a YAML (or JSON) file containing the configuration for MySQL Source Config.

  The JSON file is formatted as follows, with camelCase field naming:

  ```
    {
      "includeObjects": {},
      "excludeObjects":  {
        "mysqlDatabases": [
            {
              "database":"sample_database",
              "mysqlTables": [
                {
                  "table": "sample_table",
                  "mysqlColumns": [
                    {
                      "column": "sample_column",
                    }
                   ]
                }
              ]
            }
          ]
        }
    }
  ```
"""
_ORACLE_SOURCE_CONFIG_HELP_TEXT_BETA = """\
  Path to a YAML (or JSON) file containing the configuration for Oracle Source Config.

  The JSON file is formatted as follows, with snake_case field naming:

  ```
    {
      "allowlist": {},
      "rejectlist": {
        "oracle_schemas": [
          {
            "schema_name": "SAMPLE",
            "oracle_tables": [
              {
                "table_name": "SAMPLE_TABLE",
                "oracle_columns": [
                  {
                    "column_name": "COL",
                  }
                ]
              }
            ]
          }
        ]
      }
    }
  ```
"""
_ORACLE_SOURCE_CONFIG_HELP_TEXT = """\
  Path to a YAML (or JSON) file containing the configuration for Oracle Source Config.

  The JSON file is formatted as follows, with camelCase field naming:

  ```
    {
      "includeObjects": {},
      "excludeObjects": {
        "oracleSchemas": [
          {
            "schema": "SAMPLE",
            "oracleTables": [
              {
                "table": "SAMPLE_TABLE",
                "oracleColumns": [
                  {
                    "column": "COL",
                  }
                ]
              }
            ]
          }
        ]
      }
    }
  ```
"""
_POSTGRESQL_CREATE_SOURCE_CONFIG_HELP_TEXT = """\
  Path to a YAML (or JSON) file containing the configuration for PostgreSQL Source Config.

  The JSON file is formatted as follows, with camelCase field naming:

  ```
    {
      "includeObjects": {},
      "excludeObjects": {
        "postgresqlSchemas": [
          {
            "schema": "SAMPLE",
            "postgresqlTables": [
              {
                "table": "SAMPLE_TABLE",
                "postgresqlColumns": [
                  {
                    "column": "COL",
                  }
                ]
              }
            ]
          }
        ]
      },
      "replicationSlot": "SAMPLE_REPLICATION_SLOT",
      "publication": "SAMPLE_PUBLICATION"
    }
  ```
"""

_POSTGRESQL_UPDATE_SOURCE_CONFIG_HELP_TEXT = """\
  Path to a YAML (or JSON) file containing the configuration for PostgreSQL Source Config.

  The JSON file is formatted as follows, with camelCase field naming:

  ```
    {
      "includeObjects": {},
      "excludeObjects": {
        "postgresqlSchemas": [
          {
            "schema": "SAMPLE",
            "postgresqlTables": [
              {
                "table": "SAMPLE_TABLE",
                "postgresqlColumns": [
                  {
                    "column": "COL",
                  }
                ]
              }
            ]
          }
        ]
      },
      "replicationSlot": "SAMPLE_REPLICATION_SLOT",
      "publication": "SAMPLE_PUBLICATION"
    }
  ```
"""

_SQLSERVER_CREATE_SOURCE_CONFIG_HELP_TEXT = """
  Path to a YAML (or JSON) file containing the configuration for SQL Server Source Config.

  The JSON file is formatted as follows, with camelCase field naming:

  ```
    {
      "includeObjects": {},
      "excludeObjects": {
        "schemas": [
          {
            "schema": "SAMPLE",
            "tables": [
              {
                "table": "SAMPLE_TABLE",
                "columns": [
                  {
                    "column": "COL",
                  }
                ]
              }
            ]
          }
        ]
      },
      "maxConcurrentCdcTasks": 2,
      "maxConcurrentBackfillTasks": 10,
      "transactionLogs": {}  # Or changeTables
    }
  ```
"""

_SQLSERVER_UPDATE_SOURCE_CONFIG_HELP_TEXT = """
  Path to a YAML (or JSON) file containing the configuration for SQL Server Source Config.

  The JSON file is formatted as follows, with camelCase field naming:

  ```
    {
      "includeObjects": {},
      "excludeObjects": {
        "schemas": [
          {
            "schema": "SAMPLE",
            "tables": [
              {
                "table": "SAMPLE_TABLE",
                "columns": [
                  {
                    "column": "COL",
                  }
                ]
              }
            ]
          }
        ]
      },
      "maxConcurrentCdcTasks": 2,
      "maxConcurrentBackfillTasks": 10,
      "transactionLogs": {}  # Or changeTables
    }
  ```
"""

_SALESFORCE_CREATE_SOURCE_CONFIG_HELP_TEXT = """
  Path to a YAML (or JSON) file containing the configuration for Salesforce Source Config.

  The JSON file is formatted as follows, with camelCase field naming:

  ```
    {
      "pollingInterval": "3000s",
      "includeObjects": {},
      "excludeObjects": {
        "objects": [
          {
            "objectName": "SAMPLE",
            "fields": [
              {
                "fieldName": "SAMPLE_FIELD",
              }
            ]
          }
        ]
      }
    }
  ```
"""

_SALESFORCE_UPDATE_SOURCE_CONFIG_HELP_TEXT = """
  Path to a YAML (or JSON) file containing the configuration for Salesforce Source Config.

  The JSON file is formatted as follows, with camelCase field naming:

  ```
    {
      "pollingInterval": "3000s",
      "includeObjects": {},
      "excludeObjects": {
        "objects": [
          {
            "objectName": "SAMPLE",
            "fields": [
              {
                "fieldName": "SAMPLE_FIELD",
              }
            ]
          }
        ]
      }
    }
  ```
"""

_MONGODB_SOURCE_CONFIG_HELP_TEXT = """\
  Path to a YAML (or JSON) file containing the configuration for MongoDB Source Config.

  The JSON file is formatted as follows, with snake_case field naming:

  ```
    {
      "includeObjects": {},
      "excludeObjects": {
        "databases": [
          {
            "database": "sampleDb",
            "collections": [
              {
                "collection": "sampleCollection",
                "fields": [
                  {
                    "field": "SAMPLE_FIELD",
                  }
                ]
              }
            ]
          }
        ]
      }
    }
  ```
"""


def ConnectionProfileAttributeConfig(name='connection_profile'):
  return concepts.ResourceParameterAttributeConfig(
      name=name,
      help_text='The connection profile of the {resource}.',
      completion_request_params={'fieldMask': 'name'},
      completion_id_field='id')


def PrivateConnectionAttributeConfig(name='private_connection'):
  return concepts.ResourceParameterAttributeConfig(
      name=name,
      help_text='The private connection of the {resource}.',
      completion_request_params={'fieldMask': 'name'},
      completion_id_field='id')


def StreamAttributeConfig(name='stream'):
  return concepts.ResourceParameterAttributeConfig(
      name=name,
      help_text='The stream of the {resource}.',
      completion_request_params={'fieldMask': 'name'},
      completion_id_field='id')


def RouteAttributeConfig(name='route'):
  return concepts.ResourceParameterAttributeConfig(
      name=name,
      help_text='The route of the {resource}.',
      completion_request_params={'fieldMask': 'name'},
      completion_id_field='id')


def LocationAttributeConfig():
  return concepts.ResourceParameterAttributeConfig(
      name='location', help_text='The Cloud location for the {resource}.')


def GetLocationResourceSpec(resource_name='location'):
  return concepts.ResourceSpec(
      'datastream.projects.locations',
      resource_name=resource_name,
      locationsId=LocationAttributeConfig(),
      projectsId=concepts.DEFAULT_PROJECT_ATTRIBUTE_CONFIG,
      disable_auto_completers=True)


def GetConnectionProfileResourceSpec(resource_name='connection_profile'):
  return concepts.ResourceSpec(
      'datastream.projects.locations.connectionProfiles',
      resource_name=resource_name,
      connectionProfilesId=ConnectionProfileAttributeConfig(name=resource_name),
      locationsId=LocationAttributeConfig(),
      projectsId=concepts.DEFAULT_PROJECT_ATTRIBUTE_CONFIG,
      disable_auto_completers=True)


def GetPrivateConnectionResourceSpec(resource_name='private_connection'):
  return concepts.ResourceSpec(
      'datastream.projects.locations.privateConnections',
      resource_name=resource_name,
      privateConnectionsId=PrivateConnectionAttributeConfig(name=resource_name),
      locationsId=LocationAttributeConfig(),
      projectsId=concepts.DEFAULT_PROJECT_ATTRIBUTE_CONFIG,
      disable_auto_completers=True)


def GetStreamResourceSpec(resource_name='stream'):
  return concepts.ResourceSpec(
      'datastream.projects.locations.streams',
      resource_name=resource_name,
      streamsId=StreamAttributeConfig(name=resource_name),
      locationsId=LocationAttributeConfig(),
      projectsId=concepts.DEFAULT_PROJECT_ATTRIBUTE_CONFIG,
      disable_auto_completers=True)


def GetRouteResourceSpec(resource_name='route'):
  return concepts.ResourceSpec(
      'datastream.projects.locations.privateConnections.routes',
      resource_name=resource_name,
      routesId=RouteAttributeConfig(name=resource_name),
      privateConnectionsId=PrivateConnectionAttributeConfig(
          'private-connection'),
      locationsId=LocationAttributeConfig(),
      projectsId=concepts.DEFAULT_PROJECT_ATTRIBUTE_CONFIG,
      disable_auto_completers=True)


def AddConnectionProfileResourceArg(parser,
                                    verb,
                                    release_track,
                                    positional=True,
                                    required=True):
  """Add a resource argument for a Datastream connection profile.

  Args:
    parser: the parser for the command.
    verb: str, the verb to describe the resource, such as 'to update'.
    release_track: Some arguments are added based on the command release
        track.
    positional: bool, if True, means that the resource is a positional rather
      than a flag.
    required: bool, if True, means that a flag is required.
  """
  if positional:
    name = 'connection_profile'
  else:
    name = '--connection-profile'

  connectivity_parser = parser.add_group(mutex=True)
  connectivity_parser.add_argument(
      '--static-ip-connectivity',
      action='store_true',
      help="""use static ip connectivity""")

  if release_track == base.ReleaseTrack.BETA:
    connectivity_parser.add_argument(
        '--no-connectivity', action='store_true', help="""no connectivity""")

  forward_ssh_parser = connectivity_parser.add_group()
  forward_ssh_parser.add_argument(
      '--forward-ssh-hostname',
      help="""Hostname for the SSH tunnel.""",
      required=required)
  forward_ssh_parser.add_argument(
      '--forward-ssh-username',
      help="""Username for the SSH tunnel.""",
      required=required)
  forward_ssh_parser.add_argument(
      '--forward-ssh-port',
      help="""Port for the SSH tunnel, default value is 22.""",
      type=int,
      default=22)
  password_group = forward_ssh_parser.add_group(required=required, mutex=True)
  password_group.add_argument(
      '--forward-ssh-password', help="""\
          SSH password.
          """)
  password_group.add_argument(
      '--forward-ssh-private-key', help='SSH private key..')

  # TODO(b/207467120): deprecate BETA client.
  private_connection_flag_name = 'private-connection'
  if release_track == base.ReleaseTrack.BETA:
    private_connection_flag_name = 'private-connection-name'

  resource_specs = [
      presentation_specs.ResourcePresentationSpec(
          name,
          GetConnectionProfileResourceSpec(),
          'The connection profile {}.'.format(verb),
          required=True),
      presentation_specs.ResourcePresentationSpec(
          '--%s' % private_connection_flag_name,
          GetPrivateConnectionResourceSpec(),
          'Resource ID of the private connection.',
          flag_name_overrides={'location': ''},
          group=connectivity_parser)
  ]
  concept_parsers.ConceptParser(
      resource_specs,
      command_level_fallthroughs={
          '--%s.location' % private_connection_flag_name: ['--location'],
      }).AddToParser(parser)


def AddConnectionProfileDiscoverResourceArg(parser):
  """Add a resource argument for a Datastream connection profile discover command.

  Args:
    parser: the parser for the command.
  """
  connection_profile_parser = parser.add_group(mutex=True, required=True)
  connection_profile_parser.add_argument(
      '--connection-profile-object-file',
      help="""Path to a YAML (or JSON) file containing the configuration
      for a connection profile object. If you pass - as the value of the
      flag the file content will be read from stdin."""
  )

  resource_specs = [
      presentation_specs.ResourcePresentationSpec(
          '--connection-profile-name',
          GetConnectionProfileResourceSpec(),
          'Resource ID of the connection profile.',
          flag_name_overrides={'location': ''},
          group=connection_profile_parser)
  ]
  concept_parsers.ConceptParser(
      resource_specs,
      command_level_fallthroughs={
          '--connection-profile-name.location': ['--location'],
      }).AddToParser(parser)


def GetVpcResourceSpec():
  """Constructs and returns the Resource specification for VPC."""

  def VpcAttributeConfig():
    return concepts.ResourceParameterAttributeConfig(
        name='vpc',
        help_text="""fully qualified name of the VPC Datastream will peer to."""
    )

  return concepts.ResourceSpec(
      'compute.networks',
      resource_name='vpc',
      network=VpcAttributeConfig(),
      project=concepts.DEFAULT_PROJECT_ATTRIBUTE_CONFIG)


def AddPrivateConnectionResourceArg(parser,
                                    verb,
                                    positional=True):
  """Add a resource argument for a Datastream private connection.

  Args:
    parser: the parser for the command.
    verb: str, the verb to describe the resource, such as 'to update'.
    positional: bool, if True, means that the resource is a positional rather
      than a flag.
  """
  if positional:
    name = 'private_connection'
  else:
    name = '--private-connection'

  resource_specs = [
      presentation_specs.ResourcePresentationSpec(
          name,
          GetPrivateConnectionResourceSpec(),
          'The private connection {}.'.format(verb),
          required=True),
  ]
  concept_parsers.ConceptParser(
      resource_specs).AddToParser(parser)


def AddStreamResourceArg(parser, verb, release_track, required=True):
  """Add resource arguments for creating/updating a stream.

  Args:
    parser: argparse.ArgumentParser, the parser for the command.
    verb: str, the verb to describe the resource, such as 'to update'.
    release_track: base.ReleaseTrack, some arguments are added based on the
        command release track.
    required: bool, if True, means that a flag is required.
  """
  source_parser = parser.add_group(required=required)
  source_config_parser_group = source_parser.add_group(
      required=required, mutex=True)
  source_config_parser_group.add_argument(
      '--oracle-source-config',
      help=_ORACLE_SOURCE_CONFIG_HELP_TEXT_BETA if release_track
      == base.ReleaseTrack.BETA else _ORACLE_SOURCE_CONFIG_HELP_TEXT)
  source_config_parser_group.add_argument(
      '--mysql-source-config',
      help=_MYSQL_SOURCE_CONFIG_HELP_TEXT_BETA if release_track
      == base.ReleaseTrack.BETA else _MYSQL_SOURCE_CONFIG_HELP_TEXT)
  source_config_parser_group.add_argument(
      '--postgresql-source-config',
      help=_POSTGRESQL_UPDATE_SOURCE_CONFIG_HELP_TEXT
      if verb == 'update'
      else _POSTGRESQL_CREATE_SOURCE_CONFIG_HELP_TEXT,
  )
  source_config_parser_group.add_argument(
      '--sqlserver-source-config',
      help=_SQLSERVER_UPDATE_SOURCE_CONFIG_HELP_TEXT
      if verb == 'update'
      else _SQLSERVER_CREATE_SOURCE_CONFIG_HELP_TEXT,
  )
  source_config_parser_group.add_argument(
      '--salesforce-source-config',
      help=_SALESFORCE_UPDATE_SOURCE_CONFIG_HELP_TEXT
      if verb == 'update'
      else _SALESFORCE_CREATE_SOURCE_CONFIG_HELP_TEXT,
  )
  source_config_parser_group.add_argument(
      '--mongodb-source-config',
      help=_MONGODB_SOURCE_CONFIG_HELP_TEXT,
  )

  destination_parser = parser.add_group(required=required)
  destination_config_parser_group = destination_parser.add_group(
      required=required, mutex=True)
  destination_config_parser_group.add_argument(
      '--gcs-destination-config',
      help="""\
      Path to a YAML (or JSON) file containing the configuration for Google Cloud Storage Destination Config.

      The JSON file is formatted as follows:

      ```
       {
       "path": "some/path",
       "fileRotationMb":5,
       "fileRotationInterval":"15s",
       "avroFileFormat": {}
       }
      ```
        """,
  )
  destination_config_parser_group.add_argument(
      '--bigquery-destination-config',
      help="""\
      Path to a YAML (or JSON) file containing the configuration for Google BigQuery Destination Config.

      The YAML (or JSON) file should be formatted as follows:

      BigQuery configuration with source hierarchy datasets and merge mode (merge mode is by default):

      ```
      {
        "sourceHierarchyDatasets": {
          "datasetTemplate": {
            "location": "us-central1",
            "datasetIdPrefix": "my_prefix",
            "kmsKeyName": "projects/{project}/locations/{location}/keyRings/{key_ring}/cryptoKeys/{cryptoKey}"
          }
        },
        "merge": {}
        "dataFreshness": "3600s"
      }
      ```

      BigQuery configuration with source hierarchy datasets and append only mode:
      ```
      {
        "sourceHierarchyDatasets": {
          "datasetTemplate": {
            "location": "us-central1",
            "datasetIdPrefix": "my_prefix",
            "kmsKeyName": "projects/{project}/locations/{location}/keyRings/{key_ring}/cryptoKeys/{cryptoKey}"
          }
        },
        "appendOnly": {}
      }
      ```

      BigQuery configuration with single target dataset and merge mode:

      ```
      {
        "singleTargetDataset": {
          "datasetId": "projectId:my_dataset"
        },
        "merge": {}
        "dataFreshness": "3600s"
      }
      ```

      BigQuery configuration with Big Lake table configuration:
      ```
      {
        "singleTargetDataset": {
          "datasetId": "projectId:datasetId"
        },
        "appendOnly": {},
        "blmtConfig": {
          "bucket": "bucketName",
          "tableFormat": "ICEBERG",
          "fileFormat": "PARQUET",
          "connectionName": "projectId.region.connectionName",
          "rootPath": "/root"
        }
      }
      ```
      """,
  )

  source_field = 'source'
  destination_field = 'destination'
  if release_track == base.ReleaseTrack.BETA:
    source_field = 'source-name'
    destination_field = 'destination-name'

  resource_specs = [
      presentation_specs.ResourcePresentationSpec(
          'stream',
          GetStreamResourceSpec(),
          'The stream to {}.'.format(verb),
          required=True),
      presentation_specs.ResourcePresentationSpec(
          '--%s' % source_field,
          GetConnectionProfileResourceSpec(),
          'Resource ID of the source connection profile.',
          required=required,
          flag_name_overrides={'location': ''},
          group=source_parser),
      presentation_specs.ResourcePresentationSpec(
          '--%s' % destination_field,
          GetConnectionProfileResourceSpec(),
          'Resource ID of the destination connection profile.',
          required=required,
          flag_name_overrides={'location': ''},
          group=destination_parser)
  ]
  concept_parsers.ConceptParser(
      resource_specs,
      command_level_fallthroughs={
          '--%s.location' % source_field: ['--location'],
          '--%s.location' % destination_field: ['--location']
      }).AddToParser(parser)


def AddStreamObjectResourceArg(parser):
  """Add a resource argument for a Datastream stream object.

  Args:
    parser: the parser for the command.
  """
  resource_specs = [
      presentation_specs.ResourcePresentationSpec(
          '--stream',
          GetStreamResourceSpec(),
          'The stream to list objects for.',
          required=True),
  ]
  concept_parsers.ConceptParser(
      resource_specs,
      command_level_fallthroughs={
          '--stream.location': ['--location'],
      }).AddToParser(parser)


def AddRouteResourceArg(parser, verb, positional=True):
  """Add a resource argument for a Datastream route.

  Args:
    parser: the parser for the command.
    verb: str, the verb to describe the resource, such as 'to create'.
    positional: bool, if True, means that the resource is a positional rather
      than a flag.
  """
  if positional:
    name = 'route'
  else:
    name = '--route'

  resource_specs = [
      presentation_specs.ResourcePresentationSpec(
          name,
          GetRouteResourceSpec(),
          'The route {}.'.format(verb),
          required=True)
  ]
  concept_parsers.ConceptParser(
      resource_specs).AddToParser(parser)