File: //snap/google-cloud-cli/394/lib/googlecloudsdk/command_lib/spanner/migration_backend.py
# -*- coding: utf-8 -*- #
# Copyright 2022 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Spanner migration library functions and utilities for the spanner-migration-tool binary."""
from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals
import copy
import os
from googlecloudsdk.command_lib.util.anthos import binary_operations
from googlecloudsdk.core import exceptions as c_except
def GetEnvArgsForCommand(extra_vars=None, exclude_vars=None):
"""Return an env dict to be passed on command invocation."""
env = copy.deepcopy(os.environ)
if extra_vars:
env.update(extra_vars)
if exclude_vars:
for k in exclude_vars:
env.pop(k)
return env
class SpannerMigrationException(c_except.Error):
"""Base Exception for any errors raised by gcloud spanner migration surface."""
class SpannerMigrationWrapper(binary_operations.StreamingBinaryBackedOperation):
"""Binary operation wrapper for spanner-migration-tool commands."""
def __init__(self, **kwargs):
super(SpannerMigrationWrapper, self).__init__(
binary='spanner-migration-tool', install_if_missing=True, **kwargs)
def _ParseSchemaArgs(self,
source,
prefix=None,
source_profile=None,
target=None,
target_profile=None,
dry_run=False,
log_level=None,
project=None,
**kwargs):
""""Parse args for the schema command."""
del kwargs
exec_args = ['schema']
if source:
exec_args.extend(['--source', source])
if prefix:
exec_args.extend(['--prefix', prefix])
if source_profile:
exec_args.extend(['--source-profile', source_profile])
if target:
exec_args.extend(['--target', target])
if target_profile:
exec_args.extend(['--target-profile', target_profile])
if dry_run:
exec_args.append('--dry-run')
if log_level:
exec_args.extend(['--log-level', log_level])
if project:
exec_args.extend(['--project', project])
return exec_args
def _ParseDataArgs(self,
source,
session,
prefix=None,
skip_foreign_keys=False,
source_profile=None,
target=None,
target_profile=None,
write_limit=None,
dry_run=False,
log_level=None,
project=None,
dataflow_template=None,
**kwargs):
""""Parse args for the data command."""
del kwargs
exec_args = ['data']
if source:
exec_args.extend(['--source', source])
if session:
exec_args.extend(['--session', session])
if prefix:
exec_args.extend(['--prefix', prefix])
if skip_foreign_keys:
exec_args.append('--skip-foreign-keys')
if source_profile:
exec_args.extend(['--source-profile', source_profile])
if target:
exec_args.extend(['--target', target])
if target_profile:
exec_args.extend(['--target-profile', target_profile])
if write_limit:
exec_args.extend(['--write-limit', write_limit])
if dry_run:
exec_args.append('--dry-run')
if log_level:
exec_args.extend(['--log-level', log_level])
if project:
exec_args.extend(['--project', project])
if dataflow_template:
exec_args.extend(['--dataflow-template', dataflow_template])
return exec_args
def _ParseSchemaAndDataArgs(self,
source,
prefix=None,
skip_foreign_keys=False,
source_profile=None,
target=None,
target_profile=None,
write_limit=None,
dry_run=False,
log_level=None,
project=None,
dataflow_template=None,
**kwargs):
""""Parse args for the schema-and-data command."""
del kwargs
exec_args = ['schema-and-data']
if source:
exec_args.extend(['--source', source])
if prefix:
exec_args.extend(['--prefix', prefix])
if skip_foreign_keys:
exec_args.append('--skip-foreign-keys')
if source_profile:
exec_args.extend(['--source-profile', source_profile])
if target:
exec_args.extend(['--target', target])
if target_profile:
exec_args.extend(['--target-profile', target_profile])
if write_limit:
exec_args.extend(['--write-limit', write_limit])
if dry_run:
exec_args.append('--dry-run')
if log_level:
exec_args.extend(['--log-level', log_level])
if project:
exec_args.extend(['--project', project])
if dataflow_template:
exec_args.extend(['--dataflow-template', dataflow_template])
return exec_args
def _ParseWebArgs(self,
open_flag=False,
port=None,
log_level=None,
dataflow_template=None,
**kwargs):
"""Parse args for the web command."""
del kwargs
exec_args = ['web']
if open_flag:
exec_args.append('--open')
if port:
exec_args.extend(['--port', port])
if log_level:
exec_args.extend(['--log-level', log_level])
if dataflow_template:
exec_args.extend(['--dataflow-template', dataflow_template])
return exec_args
def ParseCleanupArgs(self,
job_id,
data_shard_ids=None,
target_profile=None,
datastream=False,
dataflow=False,
pub_sub=False,
monitoring=False,
log_level=None,
**kwargs):
""""Parse args for the cleanup command."""
del kwargs
exec_args = ['cleanup']
if job_id:
exec_args.extend(['--jobId', job_id])
if data_shard_ids:
exec_args.extend(['--dataShardIds', data_shard_ids])
if target_profile:
exec_args.extend(['--target-profile', target_profile])
if datastream:
exec_args.append('--datastream')
if dataflow:
exec_args.append('--dataflow')
if pub_sub:
exec_args.append('--pubsub')
if monitoring:
exec_args.append('--monitoring')
if log_level:
exec_args.append('--log-level')
return exec_args
def ParseImportArgs(self,
instance,
database,
source_uri,
source_format,
table_name=None,
project=None,
schema_uri=None,
csv_line_delimiter=None,
csv_field_delimiter=None,
database_dialect=None,
**kwargs):
""""Parse args for the import command."""
del kwargs
exec_args = ['import']
if instance:
exec_args.extend(['--instance', instance])
if database:
exec_args.extend(['--database', database])
if table_name:
exec_args.extend(['--table-name', table_name])
if source_uri:
exec_args.extend(['--source-uri', source_uri])
if source_format:
exec_args.extend(['--source-format', source_format])
if schema_uri:
exec_args.extend(['--schema-uri', schema_uri])
if csv_line_delimiter:
exec_args.extend(['--csv-line-delimiter', csv_line_delimiter])
if csv_field_delimiter:
exec_args.extend(['--csv-field-delimiter', csv_field_delimiter])
if project:
exec_args.extend(['--project', project])
if database_dialect:
exec_args.extend(['--database-dialect', database_dialect])
return exec_args
def _ParseArgsForCommand(self, command, **kwargs):
"""Call the parser corresponding to the command."""
if command == 'schema':
return self._ParseSchemaArgs(**kwargs)
elif command == 'data':
return self._ParseDataArgs(**kwargs)
elif command == 'schema-and-data':
return self._ParseSchemaAndDataArgs(**kwargs)
elif command == 'web':
return self._ParseWebArgs(**kwargs)
elif command == 'cleanup':
return self.ParseCleanupArgs(**kwargs)
elif command == 'import':
return self.ParseImportArgs(**kwargs)
else:
raise binary_operations.InvalidOperationForBinary(
'Invalid Operation [{}] for spanner-migration-tool'.format(command))