HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/394/lib/googlecloudsdk/api_lib/dataplex/entry.py
# -*- coding: utf-8 -*- #
# Copyright 2024 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Client for interaction with Entries API CRUD DATAPLEX."""

from __future__ import annotations

from typing import Any, Dict, List

from googlecloudsdk.api_lib.dataplex import util as dataplex_api
from googlecloudsdk.calliope import exceptions
from googlecloudsdk.calliope import parser_extensions
from googlecloudsdk.command_lib.dataplex import parsers as dataplex_parsers
from googlecloudsdk.command_lib.util.args import labels_util
from googlecloudsdk.core import log


dataplex_message = dataplex_api.GetMessageModule()


def _GetFieldsForUpdateMask(args: parser_extensions.Namespace) -> List[str]:
  """Create a sorted list of fields to be used in update_mask for Entry based on arguments provided to the command."""

  # Map command arguments to the API proto fields.
  arg_name_to_field = {
      '--fully-qualified-name': 'fully_qualified_name',
      '--update-aspects': 'aspects',  # For update command
      '--remove-aspects': 'aspects',  # For update command
      '--aspects': 'aspects',  # For update-aspects command
      '--keys': 'aspects',  # For remove-aspects command
      '--entry-source-resource': 'entry_source.resource',
      '--entry-source-system': 'entry_source.system',
      '--entry-source-platform': 'entry_source.platform',
      '--entry-source-display-name': 'entry_source.display_name',
      '--entry-source-description': 'entry_source.description',
      '--entry-source-labels': 'entry_source.labels',
      '--entry-source-create-time': 'entry_source.create_time',
      '--entry-source-update-time': 'entry_source.update_time',
  }

  # Remove `--clear-` part from command argument names, as those args are meant
  # to clear the respective API proto fields. Select only those that were
  # supplied to the command by performing a set intersection.
  args_cleaned = set(
      map(
          lambda arg: arg.replace('--clear-', '--'), args.GetSpecifiedArgNames()
      )
  )
  updatable_args = args_cleaned.intersection(arg_name_to_field)
  return sorted(
      set(map(lambda arg_name: arg_name_to_field[arg_name], updatable_args))
  )


def _GenerateAspectKeys(
    args: parser_extensions.Namespace,
    *,
    remove_aspects_arg_name: str,
    update_aspects_arg_name: str,
) -> List[str]:
  """Generate a list of unique aspect keys to be updated or removed.

  This will be used along with the update_mask for updating an Entry. This list
  is populated based on `--update-aspects` and `--remove-aspects` arguments
  (or `--aspects` in case of specialized command like `update-aspects`).

  Args:
    args: The arguments provided to the command.
    remove_aspects_arg_name: The name of the argument that contains the aspect
      keys to be removed.
    update_aspects_arg_name: The name of the argument that contains aspect
      contents to be added or updated.

  Returns:
    A sorted list of unique aspect keys to be updated or removed. Or empty list
    if neither `--update-aspects`, `--remove-aspects` or `--aspects` are
    provided to the command.
  """
  keys = set()

  if args.IsKnownAndSpecified(update_aspects_arg_name):
    keys.update(
        map(
            lambda aspect: aspect.key,
            args.GetValue(update_aspects_arg_name).additionalProperties,
        )
    )

  if args.IsKnownAndSpecified(remove_aspects_arg_name):
    keys.update(args.GetValue(remove_aspects_arg_name))

  return sorted(keys)


def _GetArgValueOrNone(
    args: parser_extensions.Namespace, arg_name: str
) -> Any | None:
  return args.GetValue(arg_name) if args.IsKnownAndSpecified(arg_name) else None


def _GetEntrySourceLabels(
    args: parser_extensions.Namespace,
) -> Dict[str, str] | None:
  """Parse EntrySource labels from the command arguments if defined."""

  if not args.IsKnownAndSpecified('entry_source_labels'):
    return None
  return labels_util.ParseCreateArgs(
      args,
      labels_cls=dataplex_message.GoogleCloudDataplexV1EntrySource.LabelsValue,
      labels_dest='entry_source_labels',
  )


def _GetEntrySourceAncestors(
    args: parser_extensions.Namespace,
) -> List[Any]:
  """Parse EntrySource ancestors from the command arguments if defined."""
  if not args.IsKnownAndSpecified('entry_source_ancestors'):
    return []
  return dataplex_parsers.ParseEntrySourceAncestors(args.entry_source_ancestors)


def _GetEntrySourceOrNone(
    args: parser_extensions.Namespace,
) -> dataplex_message.GoogleCloudDataplexV1EntrySource | None:
  """Parse EntrySource from the command arguments if defined."""
  entry_source = dataplex_message.GoogleCloudDataplexV1EntrySource(
      resource=_GetArgValueOrNone(args, 'entry_source_resource'),
      system=_GetArgValueOrNone(args, 'entry_source_system'),
      platform=_GetArgValueOrNone(args, 'entry_source_platform'),
      displayName=_GetArgValueOrNone(args, 'entry_source_display_name'),
      description=_GetArgValueOrNone(args, 'entry_source_description'),
      labels=_GetEntrySourceLabels(args),
      ancestors=_GetEntrySourceAncestors(args),
      createTime=_GetArgValueOrNone(args, 'entry_source_create_time'),
      updateTime=_GetArgValueOrNone(args, 'entry_source_update_time'),
  )
  return None if not entry_source else entry_source


def Create(args: parser_extensions.Namespace):
  """Create a CreateEntry request based on arguments provided."""
  entry_ref = args.CONCEPTS.entry.Parse()
  entry_type_ref = args.CONCEPTS.entry_type.Parse()
  parent_entry_ref = args.CONCEPTS.parent_entry.Parse()

  dataplex_client = dataplex_api.GetClientInstance()

  parent_entry_name = ''
  if parent_entry_ref is not None:
    parent_entry_name = parent_entry_ref.RelativeName()

  resource = dataplex_client.projects_locations_entryGroups_entries.Create(
      dataplex_message.DataplexProjectsLocationsEntryGroupsEntriesCreateRequest(
          entryId=entry_ref.Name(),
          googleCloudDataplexV1Entry=dataplex_message.GoogleCloudDataplexV1Entry(
              name=entry_ref.RelativeName(),
              entryType=entry_type_ref.RelativeName(),
              parentEntry=parent_entry_name,
              fullyQualifiedName=_GetArgValueOrNone(
                  args, 'fully_qualified_name'
              ),
              aspects=_GetArgValueOrNone(args, 'aspects'),
              entrySource=_GetEntrySourceOrNone(args),
          ),
          parent=entry_ref.Parent().RelativeName(),
      )
  )

  log.CreatedResource(
      entry_ref.Name(),
      details='in [{0}]'.format(entry_ref.Parent().RelativeName()),
  )
  return resource


def Update(
    args: parser_extensions.Namespace,
    remove_aspects_arg_name: str = 'remove_aspects',
    update_aspects_arg_name: str = 'update_aspects',
):
  """Create an UpdateEntry request based on arguments provided."""

  update_mask = _GetFieldsForUpdateMask(args)
  if len(update_mask) < 1:
    raise exceptions.HttpException(
        'Update commands must specify at least one additional parameter to'
        ' change.'
    )

  entry_ref = args.CONCEPTS.entry.Parse()
  dataplex_client = dataplex_api.GetClientInstance()

  resource = dataplex_client.projects_locations_entryGroups_entries.Patch(
      dataplex_message.DataplexProjectsLocationsEntryGroupsEntriesPatchRequest(
          name=entry_ref.RelativeName(),
          googleCloudDataplexV1Entry=dataplex_message.GoogleCloudDataplexV1Entry(
              name=entry_ref.RelativeName(),
              fullyQualifiedName=_GetArgValueOrNone(
                  args, 'fully_qualified_name'
              ),
              aspects=_GetArgValueOrNone(args, update_aspects_arg_name),
              entrySource=_GetEntrySourceOrNone(args),
          ),
          deleteMissingAspects=args.IsKnownAndSpecified(
              remove_aspects_arg_name
          ),
          updateMask=','.join(update_mask),
          aspectKeys=_GenerateAspectKeys(
              args,
              remove_aspects_arg_name=remove_aspects_arg_name,
              update_aspects_arg_name=update_aspects_arg_name,
          ),
      )
  )

  log.UpdatedResource(entry_ref.RelativeName(), kind='entry')
  return resource