HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/api_lib/ai/index_endpoints/client.py
# -*- coding: utf-8 -*- #
# Copyright 2020 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilities for dealing with AI Platform index endpoints API."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

from apitools.base.py import list_pager
from googlecloudsdk.api_lib.util import apis
from googlecloudsdk.command_lib.ai import constants
from googlecloudsdk.command_lib.ai import errors
from googlecloudsdk.command_lib.util.args import labels_util
from googlecloudsdk.core import properties
from googlecloudsdk.core import resources


def _ParseIndex(index_id, location_id):
  """Parses a index ID into a index resource object."""
  return resources.REGISTRY.Parse(
      index_id,
      params={
          'locationsId': location_id,
          'projectsId': properties.VALUES.core.project.GetOrFail
      },
      collection='aiplatform.projects.locations.indexes')


class IndexEndpointsClient(object):
  """High-level client for the AI Platform index endpoints surface."""

  def __init__(self, client=None, messages=None, version=constants.GA_VERSION):
    self.client = client or apis.GetClientInstance(
        constants.AI_PLATFORM_API_NAME,
        constants.AI_PLATFORM_API_VERSION[version])
    self.messages = messages or self.client.MESSAGES_MODULE
    self._service = self.client.projects_locations_indexEndpoints

  def CreateBeta(self, location_ref, args):
    """Create a new index endpoint."""
    labels = labels_util.ParseCreateArgs(
        args,
        self.messages.GoogleCloudAiplatformV1beta1IndexEndpoint.LabelsValue)

    encryption_spec = None
    if args.encryption_kms_key_name is not None:
      encryption_spec = (
          self.messages.GoogleCloudAiplatformV1beta1EncryptionSpec(
              kmsKeyName=args.encryption_kms_key_name))

    private_service_connect_config = None
    if args.enable_private_service_connect:
      private_service_connect_config = (
          self.messages.GoogleCloudAiplatformV1beta1PrivateServiceConnectConfig(
              enablePrivateServiceConnect=args.enable_private_service_connect,
              projectAllowlist=(args.project_allowlist
                                if args.project_allowlist else [])
          )
      )

      req = self.messages.AiplatformProjectsLocationsIndexEndpointsCreateRequest(
          parent=location_ref.RelativeName(),
          googleCloudAiplatformV1beta1IndexEndpoint=self.messages.GoogleCloudAiplatformV1beta1IndexEndpoint(
              displayName=args.display_name,
              description=args.description,
              publicEndpointEnabled=args.public_endpoint_enabled,
              labels=labels,
              encryptionSpec=encryption_spec,
              privateServiceConnectConfig=private_service_connect_config,
          ),
      )
    elif args.network is not None:
      req = self.messages.AiplatformProjectsLocationsIndexEndpointsCreateRequest(
          parent=location_ref.RelativeName(),
          googleCloudAiplatformV1beta1IndexEndpoint=self.messages.GoogleCloudAiplatformV1beta1IndexEndpoint(
              displayName=args.display_name,
              description=args.description,
              network=args.network,
              labels=labels,
              encryptionSpec=encryption_spec,
          ),
      )
    else:
      req = self.messages.AiplatformProjectsLocationsIndexEndpointsCreateRequest(
          parent=location_ref.RelativeName(),
          googleCloudAiplatformV1beta1IndexEndpoint=self.messages.GoogleCloudAiplatformV1beta1IndexEndpoint(
              displayName=args.display_name,
              description=args.description,
              publicEndpointEnabled=True,
              labels=labels,
              encryptionSpec=encryption_spec,
              privateServiceConnectConfig=private_service_connect_config,
          ),
      )

    return self._service.Create(req)

  def Create(self, location_ref, args):
    """Create a new v1 index endpoint."""
    labels = labels_util.ParseCreateArgs(
        args, self.messages.GoogleCloudAiplatformV1IndexEndpoint.LabelsValue)

    encryption_spec = None
    if args.encryption_kms_key_name is not None:
      encryption_spec = (
          self.messages.GoogleCloudAiplatformV1EncryptionSpec(
              kmsKeyName=args.encryption_kms_key_name))

    private_service_connect_config = None
    if args.enable_private_service_connect:
      private_service_connect_config = (
          self.messages.GoogleCloudAiplatformV1PrivateServiceConnectConfig(
              enablePrivateServiceConnect=args.enable_private_service_connect,
              projectAllowlist=(args.project_allowlist
                                if args.project_allowlist else []),
          )
      )

      req = self.messages.AiplatformProjectsLocationsIndexEndpointsCreateRequest(
          parent=location_ref.RelativeName(),
          googleCloudAiplatformV1IndexEndpoint=self.messages.GoogleCloudAiplatformV1IndexEndpoint(
              displayName=args.display_name,
              description=args.description,
              publicEndpointEnabled=args.public_endpoint_enabled,
              labels=labels,
              encryptionSpec=encryption_spec,
              privateServiceConnectConfig=private_service_connect_config,
          ),
      )
    elif args.network is not None:
      req = self.messages.AiplatformProjectsLocationsIndexEndpointsCreateRequest(
          parent=location_ref.RelativeName(),
          googleCloudAiplatformV1IndexEndpoint=self.messages.GoogleCloudAiplatformV1IndexEndpoint(
              displayName=args.display_name,
              description=args.description,
              network=args.network,
              labels=labels,
              encryptionSpec=encryption_spec,
          ),
      )
    else:
      req = self.messages.AiplatformProjectsLocationsIndexEndpointsCreateRequest(
          parent=location_ref.RelativeName(),
          googleCloudAiplatformV1IndexEndpoint=self.messages.GoogleCloudAiplatformV1IndexEndpoint(
              displayName=args.display_name,
              description=args.description,
              publicEndpointEnabled=True,
              labels=labels,
              encryptionSpec=encryption_spec,
              privateServiceConnectConfig=private_service_connect_config,
          ),
      )

    return self._service.Create(req)

  def PatchBeta(self, index_endpoint_ref, args):
    """Update an index endpoint."""
    index_endpoint = self.messages.GoogleCloudAiplatformV1beta1IndexEndpoint()
    update_mask = []

    if args.display_name is not None:
      index_endpoint.displayName = args.display_name
      update_mask.append('display_name')

    if args.description is not None:
      index_endpoint.description = args.description
      update_mask.append('description')

    def GetLabels():
      return self.Get(index_endpoint_ref).labels

    labels_update = labels_util.ProcessUpdateArgsLazy(
        args,
        self.messages.GoogleCloudAiplatformV1beta1IndexEndpoint.LabelsValue,
        GetLabels)
    if labels_update.needs_update:
      index_endpoint.labels = labels_update.labels
      update_mask.append('labels')

    if not update_mask:
      raise errors.NoFieldsSpecifiedError('No updates requested.')

    request = self.messages.AiplatformProjectsLocationsIndexEndpointsPatchRequest(
        name=index_endpoint_ref.RelativeName(),
        googleCloudAiplatformV1beta1IndexEndpoint=index_endpoint,
        updateMask=','.join(update_mask))
    return self._service.Patch(request)

  def Patch(self, index_endpoint_ref, args):
    """Update an v1 index endpoint."""
    index_endpoint = self.messages.GoogleCloudAiplatformV1IndexEndpoint()
    update_mask = []

    if args.display_name is not None:
      index_endpoint.displayName = args.display_name
      update_mask.append('display_name')

    if args.description is not None:
      index_endpoint.description = args.description
      update_mask.append('description')

    def GetLabels():
      return self.Get(index_endpoint_ref).labels

    labels_update = labels_util.ProcessUpdateArgsLazy(
        args, self.messages.GoogleCloudAiplatformV1IndexEndpoint.LabelsValue,
        GetLabels)
    if labels_update.needs_update:
      index_endpoint.labels = labels_update.labels
      update_mask.append('labels')

    if not update_mask:
      raise errors.NoFieldsSpecifiedError('No updates requested.')

    request = self.messages.AiplatformProjectsLocationsIndexEndpointsPatchRequest(
        name=index_endpoint_ref.RelativeName(),
        googleCloudAiplatformV1IndexEndpoint=index_endpoint,
        updateMask=','.join(update_mask))
    return self._service.Patch(request)

  def DeployIndexBeta(self, index_endpoint_ref, args):
    """Deploy an index to an index endpoint."""
    index_ref = _ParseIndex(args.index, args.region)
    deployed_index = self.messages.GoogleCloudAiplatformV1beta1DeployedIndex(
        displayName=args.display_name,
        id=args.deployed_index_id,
        index=index_ref.RelativeName(),
    )

    if args.reserved_ip_ranges is not None:
      deployed_index.reservedIpRanges.extend(args.reserved_ip_ranges)

    if args.deployment_group is not None:
      deployed_index.deploymentGroup = args.deployment_group

    if args.enable_access_logging is not None:
      deployed_index.enableAccessLogging = args.enable_access_logging

    if args.audiences is not None and args.allowed_issuers is not None:
      auth_provider = self.messages.GoogleCloudAiplatformV1beta1DeployedIndexAuthConfigAuthProvider()
      auth_provider.audiences.extend(args.audiences)
      auth_provider.allowedIssuers.extend(args.allowed_issuers)
      deployed_index.deployedIndexAuthConfig = (
          self.messages.GoogleCloudAiplatformV1beta1DeployedIndexAuthConfig(
              authProvider=auth_provider))

    if args.machine_type is not None:
      dedicated_resources = (
          self.messages.GoogleCloudAiplatformV1beta1DedicatedResources()
      )
      dedicated_resources.machineSpec = (
          self.messages.GoogleCloudAiplatformV1beta1MachineSpec(
              machineType=args.machine_type
          )
      )
      if args.min_replica_count is not None:
        dedicated_resources.minReplicaCount = args.min_replica_count
      if args.max_replica_count is not None:
        dedicated_resources.maxReplicaCount = args.max_replica_count
      deployed_index.dedicatedResources = dedicated_resources
    else:
      automatic_resources = (
          self.messages.GoogleCloudAiplatformV1beta1AutomaticResources()
      )
      if args.min_replica_count is not None:
        automatic_resources.minReplicaCount = args.min_replica_count
      if args.max_replica_count is not None:
        automatic_resources.maxReplicaCount = args.max_replica_count
      deployed_index.automaticResources = automatic_resources

    deploy_index_req = self.messages.GoogleCloudAiplatformV1beta1DeployIndexRequest(
        deployedIndex=deployed_index)
    request = self.messages.AiplatformProjectsLocationsIndexEndpointsDeployIndexRequest(
        indexEndpoint=index_endpoint_ref.RelativeName(),
        googleCloudAiplatformV1beta1DeployIndexRequest=deploy_index_req)
    return self._service.DeployIndex(request)

  def DeployIndex(self, index_endpoint_ref, args):
    """Deploy an v1 index to an index endpoint."""
    index_ref = _ParseIndex(args.index, args.region)
    deployed_index = self.messages.GoogleCloudAiplatformV1DeployedIndex(
        displayName=args.display_name,
        id=args.deployed_index_id,
        index=index_ref.RelativeName(),
        enableAccessLogging=args.enable_access_logging
    )

    if args.reserved_ip_ranges is not None:
      deployed_index.reservedIpRanges.extend(args.reserved_ip_ranges)

    if args.deployment_group is not None:
      deployed_index.deploymentGroup = args.deployment_group

    # JWT Authentication config
    if args.audiences is not None and args.allowed_issuers is not None:
      auth_provider = self.messages.GoogleCloudAiplatformV1DeployedIndexAuthConfigAuthProvider()
      auth_provider.audiences.extend(args.audiences)
      auth_provider.allowedIssuers.extend(args.allowed_issuers)
      deployed_index.deployedIndexAuthConfig = (
          self.messages.GoogleCloudAiplatformV1DeployedIndexAuthConfig(
              authProvider=auth_provider))

    # PSC automation configs
    if args.psc_automation_configs is not None:
      deployed_index.pscAutomationConfigs = []
      for psc_automation_config in args.psc_automation_configs:
        deployed_index.pscAutomationConfigs.append(
            self.messages.GoogleCloudAiplatformV1PSCAutomationConfig(
                projectId=psc_automation_config['project-id'],
                network=psc_automation_config['network'],
            )
        )

    if args.machine_type is not None:
      dedicated_resources = (
          self.messages.GoogleCloudAiplatformV1DedicatedResources()
      )
      dedicated_resources.machineSpec = (
          self.messages.GoogleCloudAiplatformV1MachineSpec(
              machineType=args.machine_type
          )
      )
      if args.min_replica_count is not None:
        dedicated_resources.minReplicaCount = args.min_replica_count
      if args.max_replica_count is not None:
        dedicated_resources.maxReplicaCount = args.max_replica_count
      deployed_index.dedicatedResources = dedicated_resources
    else:
      automatic_resources = (
          self.messages.GoogleCloudAiplatformV1AutomaticResources()
      )
      if args.min_replica_count is not None:
        automatic_resources.minReplicaCount = args.min_replica_count
      if args.max_replica_count is not None:
        automatic_resources.maxReplicaCount = args.max_replica_count
      deployed_index.automaticResources = automatic_resources

    deploy_index_req = self.messages.GoogleCloudAiplatformV1DeployIndexRequest(
        deployedIndex=deployed_index)
    request = self.messages.AiplatformProjectsLocationsIndexEndpointsDeployIndexRequest(
        indexEndpoint=index_endpoint_ref.RelativeName(),
        googleCloudAiplatformV1DeployIndexRequest=deploy_index_req)
    return self._service.DeployIndex(request)

  def UndeployIndexBeta(self, index_endpoint_ref, args):
    """Undeploy an index to an index endpoint."""
    undeploy_index_req = self.messages.GoogleCloudAiplatformV1beta1UndeployIndexRequest(
        deployedIndexId=args.deployed_index_id)
    request = self.messages.AiplatformProjectsLocationsIndexEndpointsUndeployIndexRequest(
        indexEndpoint=index_endpoint_ref.RelativeName(),
        googleCloudAiplatformV1beta1UndeployIndexRequest=undeploy_index_req)
    return self._service.UndeployIndex(request)

  def UndeployIndex(self, index_endpoint_ref, args):
    """Undeploy an v1 index to an index endpoint."""
    undeploy_index_req = self.messages.GoogleCloudAiplatformV1UndeployIndexRequest(
        deployedIndexId=args.deployed_index_id)
    request = self.messages.AiplatformProjectsLocationsIndexEndpointsUndeployIndexRequest(
        indexEndpoint=index_endpoint_ref.RelativeName(),
        googleCloudAiplatformV1UndeployIndexRequest=undeploy_index_req)
    return self._service.UndeployIndex(request)

  def MutateDeployedIndexBeta(self, index_endpoint_ref, args):
    """Mutate a deployed index from an index endpoint."""

    deployed_index = self.messages.GoogleCloudAiplatformV1beta1DeployedIndex(
        id=args.deployed_index_id,
        enableAccessLogging=args.enable_access_logging,
    )

    if args.machine_type is not None:
      deployed_index.dedicatedResources = self._GetDedicatedResourcesBeta(args)
    else:
      deployed_index.automaticResources = self._GetAutomaticResourcesBeta(args)

    if args.reserved_ip_ranges is not None:
      deployed_index.reservedIpRanges.extend(args.reserved_ip_ranges)

    if args.deployment_group is not None:
      deployed_index.deploymentGroup = args.deployment_group

    if args.audiences is not None and args.allowed_issuers is not None:
      auth_provider = self.messages.GoogleCloudAiplatformV1beta1DeployedIndexAuthConfigAuthProvider()
      auth_provider.audiences.extend(args.audiences)
      auth_provider.allowedIssuers.extend(args.allowed_issuers)
      deployed_index.deployedIndexAuthConfig = (
          self.messages.GoogleCloudAiplatformV1beta1DeployedIndexAuthConfig(
              authProvider=auth_provider))

    request = self.messages.AiplatformProjectsLocationsIndexEndpointsMutateDeployedIndexRequest(
        indexEndpoint=index_endpoint_ref.RelativeName(),
        googleCloudAiplatformV1beta1DeployedIndex=deployed_index)
    return self._service.MutateDeployedIndex(request)

  def MutateDeployedIndex(self, index_endpoint_ref, args):
    """Mutate a deployed index from an index endpoint."""

    deployed_index = self.messages.GoogleCloudAiplatformV1DeployedIndex(
        id=args.deployed_index_id,
        enableAccessLogging=args.enable_access_logging,
    )

    if args.machine_type is not None:
      deployed_index.dedicatedResources = self._GetDedicatedResources(args)
    else:
      deployed_index.automaticResources = self._GetAutomaticResources(args)

    if args.reserved_ip_ranges is not None:
      deployed_index.reservedIpRanges.extend(args.reserved_ip_ranges)

    if args.deployment_group is not None:
      deployed_index.deploymentGroup = args.deployment_group

    if args.audiences is not None and args.allowed_issuers is not None:
      auth_provider = self.messages.GoogleCloudAiplatformV1DeployedIndexAuthConfigAuthProvider()
      auth_provider.audiences.extend(args.audiences)
      auth_provider.allowedIssuers.extend(args.allowed_issuers)
      deployed_index.deployedIndexAuthConfig = (
          self.messages.GoogleCloudAiplatformV1DeployedIndexAuthConfig(
              authProvider=auth_provider))

    request = self.messages.AiplatformProjectsLocationsIndexEndpointsMutateDeployedIndexRequest(
        indexEndpoint=index_endpoint_ref.RelativeName(),
        googleCloudAiplatformV1DeployedIndex=deployed_index)
    return self._service.MutateDeployedIndex(request)

  def Get(self, index_endpoint_ref):
    request = self.messages.AiplatformProjectsLocationsIndexEndpointsGetRequest(
        name=index_endpoint_ref.RelativeName())
    return self._service.Get(request)

  def List(self, limit=None, region_ref=None):
    return list_pager.YieldFromList(
        self._service,
        self.messages.AiplatformProjectsLocationsIndexEndpointsListRequest(
            parent=region_ref.RelativeName()),
        field='indexEndpoints',
        batch_size_attribute='pageSize',
        limit=limit)

  def Delete(self, index_endpoint_ref):
    request = self.messages.AiplatformProjectsLocationsIndexEndpointsDeleteRequest(
        name=index_endpoint_ref.RelativeName())
    return self._service.Delete(request)

  def _GetDedicatedResourcesBeta(self, args):
    """Construct dedicated resources for beta API."""
    dedicated_resources = (
        self.messages.GoogleCloudAiplatformV1beta1DedicatedResources()
    )
    dedicated_resources.machineSpec = (
        self.messages.GoogleCloudAiplatformV1beta1MachineSpec(
            machineType=args.machine_type
        )
    )
    if args.min_replica_count is not None:
      dedicated_resources.minReplicaCount = args.min_replica_count
    if args.max_replica_count is not None:
      dedicated_resources.maxReplicaCount = args.max_replica_count
    return dedicated_resources

  def _GetAutomaticResourcesBeta(self, args):
    """Construct automatic resources for beta API."""
    automatic_resources = (
        self.messages.GoogleCloudAiplatformV1beta1AutomaticResources()
    )
    if args.min_replica_count is not None:
      automatic_resources.minReplicaCount = args.min_replica_count
    if args.max_replica_count is not None:
      automatic_resources.maxReplicaCount = args.max_replica_count
    return automatic_resources

  def _GetDedicatedResources(self, args):
    """Construct dedicated resources for GA API."""
    dedicated_resources = (
        self.messages.GoogleCloudAiplatformV1DedicatedResources()
    )
    dedicated_resources.machineSpec = (
        self.messages.GoogleCloudAiplatformV1MachineSpec(
            machineType=args.machine_type
        )
    )
    if args.min_replica_count is not None:
      dedicated_resources.minReplicaCount = args.min_replica_count
    if args.max_replica_count is not None:
      dedicated_resources.maxReplicaCount = args.max_replica_count
    return dedicated_resources

  def _GetAutomaticResources(self, args):
    """Construct automatic resources for GA API."""
    automatic_resources = (
        self.messages.GoogleCloudAiplatformV1AutomaticResources()
    )
    if args.min_replica_count is not None:
      automatic_resources.minReplicaCount = args.min_replica_count
    if args.max_replica_count is not None:
      automatic_resources.maxReplicaCount = args.max_replica_count
    return automatic_resources