HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/394/lib/googlecloudsdk/api_lib/ai/indexes/client.py
# -*- coding: utf-8 -*- #
# Copyright 2020 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilities for dealing with AI Platform indexes API."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

from apitools.base.py import extra_types
from apitools.base.py import list_pager
from googlecloudsdk.api_lib.util import apis
from googlecloudsdk.api_lib.util import messages as messages_util
from googlecloudsdk.calliope import exceptions as gcloud_exceptions
from googlecloudsdk.command_lib.ai import constants
from googlecloudsdk.command_lib.ai import errors
from googlecloudsdk.command_lib.util.args import labels_util
from googlecloudsdk.core import yaml


class IndexesClient(object):
  """High-level client for the AI Platform indexes surface."""

  def __init__(self, client=None, messages=None, version=None):
    self.client = client or apis.GetClientInstance(
        constants.AI_PLATFORM_API_NAME,
        constants.AI_PLATFORM_API_VERSION[version])
    self.messages = messages or self.client.MESSAGES_MODULE
    self._service = self.client.projects_locations_indexes

  def _ReadIndexMetadata(self, metadata_file):
    """Parse json metadata file."""
    if not metadata_file:
      raise gcloud_exceptions.BadArgumentException(
          '--metadata-file', 'Index metadata file must be specified.')
    index_metadata = None
    # Yaml is a superset of json, so parse json file as yaml.
    data = yaml.load_path(metadata_file)
    if data:
      index_metadata = messages_util.DictToMessageWithErrorCheck(
          data, extra_types.JsonValue)
    return index_metadata

  def Get(self, index_ref):
    request = self.messages.AiplatformProjectsLocationsIndexesGetRequest(
        name=index_ref.RelativeName())
    return self._service.Get(request)

  def List(self, limit=None, region_ref=None):
    return list_pager.YieldFromList(
        self._service,
        self.messages.AiplatformProjectsLocationsIndexesListRequest(
            parent=region_ref.RelativeName()),
        field='indexes',
        batch_size_attribute='pageSize',
        limit=limit)

  def CreateBeta(self, location_ref, args):
    """Create a new index."""
    labels = labels_util.ParseCreateArgs(
        args, self.messages.GoogleCloudAiplatformV1beta1Index.LabelsValue)

    index_update_method = None
    if args.index_update_method:
      if args.index_update_method == 'stream-update':
        index_update_method = (
            self.messages.GoogleCloudAiplatformV1beta1Index.
            IndexUpdateMethodValueValuesEnum.STREAM_UPDATE)
      elif args.index_update_method == 'batch-update':
        index_update_method = (
            self.messages.GoogleCloudAiplatformV1beta1Index.
            IndexUpdateMethodValueValuesEnum.BATCH_UPDATE)
      else:
        raise gcloud_exceptions.BadArgumentException(
            '--index-update-method',
            'Invalid index update method: {}'.format(args.index_update_method),
        )

    encryption_spec = None
    if args.encryption_kms_key_name is not None:
      encryption_spec = (
          self.messages.GoogleCloudAiplatformV1beta1EncryptionSpec(
              kmsKeyName=args.encryption_kms_key_name))

    req = self.messages.AiplatformProjectsLocationsIndexesCreateRequest(
        parent=location_ref.RelativeName(),
        googleCloudAiplatformV1beta1Index=self.messages
        .GoogleCloudAiplatformV1beta1Index(
            displayName=args.display_name,
            description=args.description,
            metadata=self._ReadIndexMetadata(args.metadata_file),
            labels=labels,
            indexUpdateMethod=index_update_method,
            encryptionSpec=encryption_spec
            ))
    return self._service.Create(req)

  def Create(self, location_ref, args):
    """Create a new v1 index."""
    labels = labels_util.ParseCreateArgs(
        args, self.messages.GoogleCloudAiplatformV1Index.LabelsValue)

    index_update_method = None
    if args.index_update_method:
      if args.index_update_method == 'stream-update':
        index_update_method = (
            self.messages.GoogleCloudAiplatformV1Index
            .IndexUpdateMethodValueValuesEnum.STREAM_UPDATE)
      elif args.index_update_method == 'batch-update':
        index_update_method = (
            self.messages.GoogleCloudAiplatformV1Index.IndexUpdateMethodValueValuesEnum.BATCH_UPDATE
        )
      else:
        raise gcloud_exceptions.BadArgumentException(
            '--index-update-method',
            'Invalid index update method: {}'.format(args.index_update_method),
        )

    encryption_spec = None
    if args.encryption_kms_key_name is not None:
      encryption_spec = (
          self.messages.GoogleCloudAiplatformV1EncryptionSpec(
              kmsKeyName=args.encryption_kms_key_name))

    req = self.messages.AiplatformProjectsLocationsIndexesCreateRequest(
        parent=location_ref.RelativeName(),
        googleCloudAiplatformV1Index=self.messages.GoogleCloudAiplatformV1Index(
            displayName=args.display_name,
            description=args.description,
            metadata=self._ReadIndexMetadata(args.metadata_file),
            labels=labels,
            indexUpdateMethod=index_update_method,
            encryptionSpec=encryption_spec
            ))
    return self._service.Create(req)

  def PatchBeta(self, index_ref, args):
    """Update an index."""
    index = self.messages.GoogleCloudAiplatformV1beta1Index()
    update_mask = []

    if args.metadata_file is not None:
      index.metadata = self._ReadIndexMetadata(args.metadata_file)
      update_mask.append('metadata')
    else:
      if args.display_name is not None:
        index.displayName = args.display_name
        update_mask.append('display_name')

      if args.description is not None:
        index.description = args.description
        update_mask.append('description')

      def GetLabels():
        return self.Get(index_ref).labels

      labels_update = labels_util.ProcessUpdateArgsLazy(
          args, self.messages.GoogleCloudAiplatformV1beta1Index.LabelsValue,
          GetLabels)
      if labels_update.needs_update:
        index.labels = labels_update.labels
        update_mask.append('labels')

    if not update_mask:
      raise errors.NoFieldsSpecifiedError('No updates requested.')

    request = self.messages.AiplatformProjectsLocationsIndexesPatchRequest(
        name=index_ref.RelativeName(),
        googleCloudAiplatformV1beta1Index=index,
        updateMask=','.join(update_mask))
    return self._service.Patch(request)

  def Patch(self, index_ref, args):
    """Update an v1 index."""
    index = self.messages.GoogleCloudAiplatformV1Index()
    update_mask = []

    if args.metadata_file is not None:
      index.metadata = self._ReadIndexMetadata(args.metadata_file)
      update_mask.append('metadata')
    else:
      if args.display_name is not None:
        index.displayName = args.display_name
        update_mask.append('display_name')

      if args.description is not None:
        index.description = args.description
        update_mask.append('description')

      def GetLabels():
        return self.Get(index_ref).labels

      labels_update = labels_util.ProcessUpdateArgsLazy(
          args, self.messages.GoogleCloudAiplatformV1Index.LabelsValue,
          GetLabels)
      if labels_update.needs_update:
        index.labels = labels_update.labels
        update_mask.append('labels')

    if not update_mask:
      raise errors.NoFieldsSpecifiedError('No updates requested.')

    request = self.messages.AiplatformProjectsLocationsIndexesPatchRequest(
        name=index_ref.RelativeName(),
        googleCloudAiplatformV1Index=index,
        updateMask=','.join(update_mask))
    return self._service.Patch(request)

  def Delete(self, index_ref):
    request = self.messages.AiplatformProjectsLocationsIndexesDeleteRequest(
        name=index_ref.RelativeName())
    return self._service.Delete(request)

  def RemoveDatapointsBeta(self, index_ref, args):
    """Remove data points from a v1beta1 index."""
    if args.datapoint_ids and args.datapoints_from_file:
      raise errors.ArgumentError(
          'datapoint_ids and datapoints_from_file can not be set'
          ' at the same time.'
      )

    if args.datapoint_ids:
      req = self.messages.AiplatformProjectsLocationsIndexesRemoveDatapointsRequest(
          index=index_ref.RelativeName(),
          googleCloudAiplatformV1beta1RemoveDatapointsRequest=self.messages
          .GoogleCloudAiplatformV1beta1RemoveDatapointsRequest(
              datapointIds=args.datapoint_ids))
    if args.datapoints_from_file:
      data = yaml.load_path(args.datapoints_from_file)
      req = self.messages.AiplatformProjectsLocationsIndexesRemoveDatapointsRequest(
          index=index_ref.RelativeName(),
          googleCloudAiplatformV1beta1RemoveDatapointsRequest=self.messages
          .GoogleCloudAiplatformV1beta1RemoveDatapointsRequest(
              datapointIds=data))
    return self._service.RemoveDatapoints(req)

  def RemoveDatapoints(self, index_ref, args):
    """Remove data points from a v1 index."""
    if args.datapoint_ids and args.datapoints_from_file:
      raise errors.ArgumentError(
          '`--datapoint_ids` and `--datapoints_from_file` can not be set at the'
          ' same time.'
      )

    if args.datapoint_ids:
      req = self.messages.AiplatformProjectsLocationsIndexesRemoveDatapointsRequest(
          index=index_ref.RelativeName(),
          googleCloudAiplatformV1RemoveDatapointsRequest=self.messages
          .GoogleCloudAiplatformV1RemoveDatapointsRequest(
              datapointIds=args.datapoint_ids))
    if args.datapoints_from_file:
      data = yaml.load_path(args.datapoints_from_file)
      req = self.messages.AiplatformProjectsLocationsIndexesRemoveDatapointsRequest(
          index=index_ref.RelativeName(),
          googleCloudAiplatformV1RemoveDatapointsRequest=self.messages
          .GoogleCloudAiplatformV1RemoveDatapointsRequest(
              datapointIds=data))
    return self._service.RemoveDatapoints(req)

  def UpsertDatapointsBeta(self, index_ref, args):
    """Upsert data points from a v1beta1 index."""
    datapoints = []
    if args.datapoints_from_file:
      data = yaml.load_path(args.datapoints_from_file)
      for datapoint_json in data:
        datapoint = messages_util.DictToMessageWithErrorCheck(
            datapoint_json,
            self.messages.GoogleCloudAiplatformV1beta1IndexDatapoint)
        datapoints.append(datapoint)
    update_mask = None
    if args.update_mask:
      update_mask = ','.join(args.update_mask)

    req = self.messages.AiplatformProjectsLocationsIndexesUpsertDatapointsRequest(
        index=index_ref.RelativeName(),
        googleCloudAiplatformV1beta1UpsertDatapointsRequest=self.messages
        .GoogleCloudAiplatformV1beta1UpsertDatapointsRequest(
            datapoints=datapoints,
            updateMask=update_mask))
    return self._service.UpsertDatapoints(req)

  def UpsertDatapoints(self, index_ref, args):
    """Upsert data points from a v1 index."""
    datapoints = []
    if args.datapoints_from_file:
      data = yaml.load_path(args.datapoints_from_file)
      for datapoint_json in data:
        datapoint = messages_util.DictToMessageWithErrorCheck(
            datapoint_json,
            self.messages.GoogleCloudAiplatformV1IndexDatapoint)
        datapoints.append(datapoint)
    update_mask = None
    if args.update_mask:
      update_mask = ','.join(args.update_mask)

    req = self.messages.AiplatformProjectsLocationsIndexesUpsertDatapointsRequest(
        index=index_ref.RelativeName(),
        googleCloudAiplatformV1UpsertDatapointsRequest=self.messages
        .GoogleCloudAiplatformV1UpsertDatapointsRequest(
            datapoints=datapoints,
            updateMask=update_mask))
    return self._service.UpsertDatapoints(req)