HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/396/lib/googlecloudsdk/api_lib/dataplex/util.py
# -*- coding: utf-8 -*- #
# Copyright 2021 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Client for interaction with Dataplex."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import argparse
import io

from googlecloudsdk.api_lib.storage import storage_api
from googlecloudsdk.api_lib.storage import storage_util
from googlecloudsdk.api_lib.util import apis
from googlecloudsdk.api_lib.util import waiter
from googlecloudsdk.calliope import exceptions
from googlecloudsdk.core import resources
from googlecloudsdk.core import yaml


def GetClientInstance():
  return apis.GetClientInstance('dataplex', 'v1')


def GetMessageModule():
  return apis.GetMessagesModule('dataplex', 'v1')


def WaitForOperation(
    operation, resource, sleep_ms=5000, pre_start_sleep_ms=1000
):
  """Waits for the given google.longrunning.Operation to complete."""
  operation_ref = resources.REGISTRY.Parse(
      operation.name, collection='dataplex.projects.locations.operations'
  )
  poller = waiter.CloudOperationPoller(
      resource, GetClientInstance().projects_locations_operations
  )
  return waiter.WaitFor(
      poller,
      operation_ref,
      'Waiting for [{0}] to finish'.format(operation_ref.RelativeName()),
      sleep_ms=sleep_ms,
      pre_start_sleep_ms=pre_start_sleep_ms,
  )


def CreateLabels(dataplex_resource, args):
  if getattr(args, 'labels', None):
    return dataplex_resource.LabelsValue(
        additionalProperties=[
            dataplex_resource.LabelsValue.AdditionalProperty(
                key=key, value=value
            )
            for key, value in sorted(args.labels.items())
        ]
    )
  return None


def ReadObject(object_url, storage_client=None):
  """Reads an object's content from GCS.

  Args:
    object_url: Can be a local file path or the URL of the object to be read
      from gcs bucket (must have "gs://" prefix).
    storage_client: Storage api client used to read files from gcs.

  Returns:
    A str for the content of the file.

  Raises:
    ObjectReadError:
      If the read of GCS object is not successful.
  """
  if not object_url.startswith('gs://'):
    return yaml.load_path(object_url)
  client = storage_client or storage_api.StorageClient()
  object_ref = storage_util.ObjectReference.FromUrl(object_url)
  try:
    bytes_io = client.ReadObject(object_ref)
    wrapper = io.TextIOWrapper(bytes_io, encoding='utf-8')
    return yaml.load(wrapper.read())
  except Exception as e:
    raise exceptions.BadFileException(
        'Unable to read file {0} due to incorrect file path or insufficient'
        ' read permissions'.format(object_url)
    ) from e


def SnakeToCamel(arg_str):
  """Converts snake case string to camel case."""
  return ''.join(
      word.capitalize() if ind > 0 else word
      for ind, word in enumerate(arg_str.split('_'))
  )


def SnakeToCamelDict(arg_type):
  """Reccursive method to convert all nested snake case dictionary keys to camel case."""
  if isinstance(arg_type, list):
    return [
        SnakeToCamelDict(list_val)
        if isinstance(list_val, (dict, list))
        else list_val
        for list_val in arg_type
    ]
  return {
      SnakeToCamel(key): (
          SnakeToCamelDict(value) if isinstance(value, (dict, list)) else value
      )
      for key, value in arg_type.items()
  }


def FetchExecutionSpecArgs(args_map_as_list):
  """Returns Dataplex task execution spec args as a map of key,value pairs from an input list of strings of format key=value."""
  execution_args_map = dict()
  for arg_entry in args_map_as_list:
    if '=' not in arg_entry:
      raise argparse.ArgumentTypeError(
          "Execution spec argument '{}' should be of the type argKey=argValue."
          .format(arg_entry)
      )
    arg_entry_split = arg_entry.split('=', 1)
    if (
        len(arg_entry_split) < 2
        or len(arg_entry_split[0].strip()) == 0
        or len(arg_entry_split[1]) == 0
    ):
      raise argparse.ArgumentTypeError(
          "Execution spec argument '{}' should be of the format"
          ' argKey=argValue.'.format(arg_entry)
      )
    execution_args_map[arg_entry_split[0]] = arg_entry_split[1]
  return execution_args_map