HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/394/lib/googlecloudsdk/command_lib/ai/model_garden_utils.py
# -*- coding: utf-8 -*- #
# Copyright 2024 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilities for the model garden command group."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import datetime

from apitools.base.py import encoding
from apitools.base.py import exceptions as apitools_exceptions
from googlecloudsdk.api_lib.ai import operations
from googlecloudsdk.api_lib.ai.models import client as client_models
from googlecloudsdk.api_lib.monitoring import metric
from googlecloudsdk.api_lib.quotas import quota_info
from googlecloudsdk.command_lib.ai import endpoints_util
from googlecloudsdk.command_lib.ai import models_util
from googlecloudsdk.command_lib.ai import operations_util
from googlecloudsdk.core import exceptions as core_exceptions
from googlecloudsdk.core import log
from googlecloudsdk.core import properties
from googlecloudsdk.core import requests
from googlecloudsdk.core import resources
from googlecloudsdk.core.console import console_io


_MAX_LABEL_VALUE_LENGTH = 63


_ACCELERATOR_TYPE_TO_QUOTA_ID_MAP = {
    'NVIDIA_TESLA_P4': 'CustomModelServingP4GPUsPerProjectPerRegion',
    'NVIDIA_TESLA_T4': 'CustomModelServingT4GPUsPerProjectPerRegion',
    'NVIDIA_L4': 'CustomModelServingL4GPUsPerProjectPerRegion',
    'NVIDIA_TESLA_K80': 'CustomModelServingK80GPUsPerProjectPerRegion',
    'NVIDIA_TESLA_V100': 'CustomModelServingV100GPUsPerProjectPerRegion',
    'NVIDIA_TESLA_P100': 'CustomModelServingP100GPUsPerProjectPerRegion',
    'NVIDIA_TESLA_A100': 'CustomModelServingA100GPUsPerProjectPerRegion',
    'NVIDIA_A100_80GB': 'CustomModelServingA10080GBGPUsPerProjectPerRegion',
    'NVIDIA_H100_80GB': 'CustomModelServingH100GPUsPerProjectPerRegion',
    'TPU_V5_LITEPOD': 'CustomModelServingV5ETPUPerProjectPerRegion',
}


_ACCELERATOR_TYPE_TP_QUOTA_METRIC_MAP = {
    'NVIDIA_TESLA_P4': 'custom_model_serving_nvidia_p4_gpus',
    'NVIDIA_TESLA_T4': 'custom_model_serving_nvidia_t4_gpus',
    'NVIDIA_L4': 'custom_model_serving_nvidia_l4_gpus',
    'NVIDIA_TESLA_K80': 'custom_model_serving_nvidia_k80_gpus',
    'NVIDIA_TESLA_V100': 'custom_model_serving_nvidia_v100_gpus',
    'NVIDIA_TESLA_P100': 'custom_model_serving_nvidia_p100_gpus',
    'NVIDIA_TESLA_A100': 'custom_model_serving_nvidia_a100_gpus',
    'NVIDIA_A100_80GB': 'custom_model_serving_nvidia_a100_80gb_gpus',
    'NVIDIA_H100_80GB': 'custom_model_serving_nvidia_h100_gpus',
    'TPU_V5_LITEPOD': 'custom_model_serving_tpu_v5e',
}
_TIME_SERIES_FILTER = (
    # gcloud-disable-gdu-domain
    'metric.type="serviceruntime.googleapis.com/quota/allocation/usage" AND'
    ' resource.type="consumer_quota" AND'
    # gcloud-disable-gdu-domain
    ' metric.label.quota_metric="aiplatform.googleapis.com/{}"'
    ' AND resource.label.project_id="{}" AND resource.label.location="{}" AND'
    # gcloud-disable-gdu-domain
    ' resource.label.service="aiplatform.googleapis.com"'
)


def _ParseEndpoint(endpoint_id, location_id):
  """Parses a Vertex Endpoint ID into a endpoint resource object."""
  return resources.REGISTRY.Parse(
      endpoint_id,
      params={
          'locationsId': location_id,
          'projectsId': properties.VALUES.core.project.GetOrFail,
      },
      collection='aiplatform.projects.locations.endpoints',
  )


def _GetQuotaLimit(region, project, accelerator_type):
  """Gets the quota limit for the accelerator type in the region."""
  accelerator_quota = quota_info.GetQuotaInfo(
      project,
      None,
      None,
      # gcloud-disable-gdu-domain
      'aiplatform.googleapis.com',
      _ACCELERATOR_TYPE_TO_QUOTA_ID_MAP[accelerator_type],
  )
  for region_info in accelerator_quota.dimensionsInfos:
    if region_info.applicableLocations[0] == region:
      return region_info.details.value or 0
  return 0


def _GetQuotaUsage(region, project, accelerator_type):
  """Gets the quota usage for the accelerator type in the region using the monitoring AP."""
  # Format the time in RFC3339 UTC Zulu format
  current_time_utc = datetime.datetime.now(datetime.timezone.utc)
  # Need to go back at least 24 hours to reliably get a data point
  twenty_five_hours_ago_time_utc = current_time_utc - datetime.timedelta(
      hours=25
  )

  rfc3339_time = current_time_utc.isoformat(timespec='seconds').replace(
      '+00:00', 'Z'
  )
  rfc3339_time_twenty_five_hours_ago = twenty_five_hours_ago_time_utc.isoformat(
      timespec='seconds'
  ).replace('+00:00', 'Z')

  quota_usage_time_series = metric.MetricClient().ListTimeSeriesByProject(
      project=project,
      aggregation_alignment_period='60s',
      aggregation_per_series_aligner=metric.GetMessagesModule().MonitoringProjectsTimeSeriesListRequest.AggregationPerSeriesAlignerValueValuesEnum.ALIGN_NEXT_OLDER,
      interval_start_time=rfc3339_time_twenty_five_hours_ago,
      interval_end_time=rfc3339_time,
      filter_str=_TIME_SERIES_FILTER.format(
          _ACCELERATOR_TYPE_TP_QUOTA_METRIC_MAP[accelerator_type],
          project,
          region,
      ),
  )
  try:
    current_usage = (
        quota_usage_time_series.timeSeries[0].points[0].value.int64Value
    )
  except IndexError:
    # If no data point is found, the usage is 0.
    current_usage = 0
  return current_usage


def GetCLIEndpointLabelValue(
    is_hf_model, publisher_name, model_name='', model_version_name=''
):
  if is_hf_model:
    return f'hf-{publisher_name}-{model_name}'.replace('.', '_')[
        :_MAX_LABEL_VALUE_LENGTH
    ]
  else:
    return f'mg-{publisher_name}-{model_version_name}'.replace('.', '_')[
        :_MAX_LABEL_VALUE_LENGTH
    ]


def GetOneClickEndpointLabelValue(
    is_hf_model, publisher_name, model_name='', model_version_name=''
):
  if is_hf_model:
    return f'hf-{publisher_name}-{model_name}'.replace('.', '_')[
        :_MAX_LABEL_VALUE_LENGTH
    ]
  else:
    return (
        f'publishers-{publisher_name}-models-{model_name}-{model_version_name}'
        .replace(
            '.', '_'
        )[
            :_MAX_LABEL_VALUE_LENGTH
        ]
    )


def IsHFModelGated(publisher_name, model_name):
  """Checks if the HF model is gated or not by calling HF API."""
  hf_response = requests.GetSession().get(
      f'https://huggingface.co/api/models/{publisher_name}/{model_name}?blobs=true'
  )
  if hf_response.status_code != 200:
    raise core_exceptions.InternalError(
        "Something went wrong when we call HuggingFace's API to get the"
        ' model metadata. Please try again later.'
    )
  return bool(hf_response.json()['gated'])


def VerifyHFTokenPermission(hf_token, publisher_name, model_name):
  hf_response = requests.GetSession().request(
      'GET',
      f'https://huggingface.co/api/models/{publisher_name}/{model_name}/auth-check',
      headers={'Authorization': f'Bearer {hf_token}'},
  )
  if hf_response.status_code != 200:
    raise core_exceptions.Error(
        'The Hugging Face access token is not valid or does not have permission'
        ' to access the gated model.'
    )
  return


def GetDeployConfig(args, publisher_model):
  """Returns a best suited deployment configuration for the publisher model."""
  try:
    multi_deploy = (
        publisher_model.supportedActions.multiDeployVertex.multiDeployVertex
    )
  except AttributeError:
    raise core_exceptions.Error(
        'Model does not support deployment, please use a deploy-able model'
        ' instead. You can use the `gcloud ai model-garden models list`'
        ' command to find out which ones are currently supported by the'
        ' `deploy` command.'
    )

  deploy_config = None
  if args.machine_type or args.accelerator_type or args.container_image_uri:
    for deploy in multi_deploy:
      if (
          (
              args.machine_type
              and deploy.dedicatedResources.machineSpec.machineType
              != args.machine_type
          )
          or (
              args.accelerator_type
              and str(deploy.dedicatedResources.machineSpec.acceleratorType)
              != args.accelerator_type.upper()
          )
          or (
              args.container_image_uri
              and deploy.containerSpec.imageUri != args.container_image_uri
          )
      ):
        continue
      deploy_config = deploy
      break

    if not deploy_config:
      raise core_exceptions.Error(
          'The machine type, accelerator type and/or container image URI is not'
          ' supported by the model. You can use `gcloud alpha/beta ai'
          ' model-garden models list-deployment-config` command to find the'
          ' supported configurations'
      )
    log.status.Print('Using the selected deployment configuration:')
  else:
    # Default to use the first config.
    deploy_config = multi_deploy[0]
    log.status.Print('Using the default deployment configuration:')

  machine_spec = deploy_config.dedicatedResources.machineSpec
  container_image_uri = deploy_config.containerSpec.imageUri
  if machine_spec.machineType:
    log.status.Print(f' Machine type: {machine_spec.machineType}')
  if machine_spec.acceleratorType:
    log.status.Print(f' Accelerator type: {machine_spec.acceleratorType}')
  if machine_spec.acceleratorCount:
    log.status.Print(f' Accelerator count: {machine_spec.acceleratorCount}')
  if container_image_uri:
    log.status.Print(f' Container image URI: {container_image_uri}')
  return deploy_config


def CheckAcceleratorQuota(
    args, machine_type, accelerator_type, accelerator_count
):
  """Checks the accelerator quota for the project and region."""
  # In the machine spec, TPUs don't have accelerator type and count, but they
  # have machine type.
  if machine_type == 'ct5lp-hightpu-1t':
    accelerator_type = 'TPU_V5_LITEPOD'
    accelerator_count = 1
  elif machine_type == 'ct5lp-hightpu-4t':
    accelerator_type = 'TPU_V5_LITEPOD'
    accelerator_count = 4
  project = properties.VALUES.core.project.GetOrFail()
  quota_limit = _GetQuotaLimit(args.region, project, accelerator_type)
  if quota_limit < accelerator_count:
    raise core_exceptions.Error(
        'The project does not have enough quota for'
        f' {_ACCELERATOR_TYPE_TP_QUOTA_METRIC_MAP[accelerator_type]} in'
        f' {args.region} to'
        f' deploy the model. The quota limit is {quota_limit} and you are'
        f' requesting for {accelerator_count}. Please'
        ' use a different region or request more quota by following'
        ' https://cloud.google.com/vertex-ai/docs/quotas#requesting_additional_quota.'
    )

  current_usage = _GetQuotaUsage(args.region, project, accelerator_type)
  if current_usage + accelerator_count > quota_limit:
    raise core_exceptions.Error(
        'The project does not have enough quota for'
        f' {_ACCELERATOR_TYPE_TP_QUOTA_METRIC_MAP[accelerator_type]} in'
        f' {args.region} to'
        f' deploy the model. The current usage is {current_usage} out of'
        f' {quota_limit} and you are'
        f' requesting for {accelerator_count}. Please'
        ' use a different region or request more quota by following'
        ' https://cloud.google.com/vertex-ai/docs/quotas#requesting_additional_quota.'
    )
  log.status.Print(
      'The project has enough quota. The current usage of quota for'
      f' accelerator type {accelerator_type} in region {args.region} is'
      f' {current_usage} out of {quota_limit}.'
  )


def CreateEndpoint(
    endpoint_name,
    label_value,
    region_ref,
    operation_client,
    endpoints_client,
):
  """Creates a Vertex endpoint for deployment."""
  create_endpoint_op = endpoints_client.CreateBeta(
      region_ref,
      endpoint_name,
      labels=endpoints_client.messages.GoogleCloudAiplatformV1beta1Endpoint.LabelsValue(
          additionalProperties=[
              endpoints_client.messages.GoogleCloudAiplatformV1beta1Endpoint.LabelsValue.AdditionalProperty(
                  key='mg-cli-deploy', value=label_value
              )
          ]
      ),
  )
  create_endpoint_response_msg = operations_util.WaitForOpMaybe(
      operation_client,
      create_endpoint_op,
      endpoints_util.ParseOperation(create_endpoint_op.name),
  )
  if create_endpoint_response_msg is None:
    raise core_exceptions.InternalError(
        'Internal error: Failed to create a Vertex endpoint. Please try again.'
    )
  response = encoding.MessageToPyValue(create_endpoint_response_msg)
  if 'name' not in response:
    raise core_exceptions.InternalError(
        'Internal error: Failed to create a Vertex endpoint. Please try again.'
    )
  log.status.Print(
      (
          'Created Vertex AI endpoint: {}.\nStarting to upload the model'
          ' to Model Registry.'
      ).format(response['name'])
  )
  return response['name'].split('/')[-1]


def UploadModel(
    deploy_config,
    args,
    requires_hf_token,
    is_hf_model,
    uploaded_model_name,
    publisher_name,
    publisher_model_name,
):
  """Uploads the Model Garden model to Model Registry."""
  container_env_vars, container_args, container_commands = None, None, None
  if deploy_config.containerSpec.env:
    container_env_vars = {
        var.name: var.value for var in deploy_config.containerSpec.env
    }
    if requires_hf_token and 'HUGGING_FACE_HUB_TOKEN' in container_env_vars:
      container_env_vars['HUGGING_FACE_HUB_TOKEN'] = (
          args.hugging_face_access_token
      )
  if deploy_config.containerSpec.args:
    container_args = list(deploy_config.containerSpec.args)
  if deploy_config.containerSpec.command:
    container_commands = list(deploy_config.containerSpec.command)

  models_client = client_models.ModelsClient()
  upload_model_op = models_client.UploadV1Beta1(
      args.CONCEPTS.region.Parse(),
      uploaded_model_name,  # Re-use endpoint_name as the uploaded model name.
      None,
      None,
      deploy_config.artifactUri,
      deploy_config.containerSpec.imageUri,
      container_commands,
      container_args,
      container_env_vars,
      [deploy_config.containerSpec.ports[0].containerPort],
      None,
      deploy_config.containerSpec.predictRoute,
      deploy_config.containerSpec.healthRoute,
      base_model_source=models_client.messages.GoogleCloudAiplatformV1beta1ModelBaseModelSource(
          modelGardenSource=models_client.messages.GoogleCloudAiplatformV1beta1ModelGardenSource(
              # The value is consistent with one-click deploy.
              publicModelName='publishers/{}/models/{}'.format(
                  'hf-' + publisher_name if is_hf_model else publisher_name,
                  publisher_model_name,
              )
          )
      ),
  )

  upload_model_response_msg = operations_util.WaitForOpMaybe(
      operations_client=operations.OperationsClient(),
      op=upload_model_op,
      op_ref=models_util.ParseModelOperation(upload_model_op.name),
  )
  if upload_model_response_msg is None:
    raise core_exceptions.InternalError(
        'Internal error: Failed to upload a Model Garden model to Model'
        ' Registry. Please try again later.'
    )
  upload_model_response = encoding.MessageToPyValue(upload_model_response_msg)
  if 'model' not in upload_model_response:
    raise core_exceptions.InternalError(
        'Internal error: Failed to upload a Model Garden model to Model'
        ' Registry. Please try again later.'
    )
  log.status.Print(
      (
          'Uploaded model to Model Registry at {}.\nStarting to deploy the'
          ' model.'
      ).format(upload_model_response['model'])
  )
  return upload_model_response['model'].split('/')[-1]


def DeployModel(
    args,
    deploy_config,
    endpoint_id,
    endpoint_name,
    model_id,
    endpoints_client,
    operation_client,
):
  """Deploys the Model Registry model to the Vertex endpoint."""
  accelerator_type = (
      deploy_config.dedicatedResources.machineSpec.acceleratorType
  )
  accelerator_count = (
      deploy_config.dedicatedResources.machineSpec.acceleratorCount
  )

  accelerator_dict = None
  if accelerator_type is not None or accelerator_count is not None:
    accelerator_dict = {}
    if accelerator_type is not None:
      accelerator_dict['type'] = str(accelerator_type).lower().replace('_', '-')
    if accelerator_count is not None:
      accelerator_dict['count'] = accelerator_count

  deploy_model_op = endpoints_client.DeployModelBeta(
      _ParseEndpoint(endpoint_id, args.region),
      model_id,
      args.region,
      endpoint_name,  # Use the endpoint_name as the deployed model name.
      machine_type=deploy_config.dedicatedResources.machineSpec.machineType,
      accelerator_dict=accelerator_dict,
      enable_access_logging=True,
      enable_container_logging=True,
  )
  operations_util.WaitForOpMaybe(
      operation_client,
      deploy_model_op,
      endpoints_util.ParseOperation(deploy_model_op.name),
      asynchronous=True,  # Deploy the model asynchronously.
  )
  deploy_op_id = deploy_model_op.name.split('/')[-1]
  print(
      'Deploying the model to the endpoint. To check the deployment'
      ' status, you can try one of the following methods:\n1) Look for'
      f' endpoint `{endpoint_name}` at the [Vertex AI] -> [Online'
      ' prediction] tab in Cloud Console\n2) Use `gcloud ai operations'
      f' describe {deploy_op_id} --region={args.region}` to find the status'
      ' of the deployment long-running operation\n3) Use `gcloud ai'
      f' endpoints describe {endpoint_id} --region={args.region}` command'
      " to check the endpoint's metadata."
  )


def Deploy(
    args, machine_spec, endpoint_name, model, operation_client, mg_client
):
  """Deploys the publisher model to a Vertex endpoint."""
  try:
    if machine_spec is not None:
      machine_type = machine_spec.machineType
      accelerator_type = machine_spec.acceleratorType
      accelerator_count = machine_spec.acceleratorCount
    else:
      machine_type = None
      accelerator_type = None
      accelerator_count = None
    deploy_op = mg_client.Deploy(
        project=properties.VALUES.core.project.GetOrFail(),
        location=args.region,
        model=model,
        accept_eula=args.accept_eula,
        accelerator_type=accelerator_type,
        accelerator_count=accelerator_count,
        machine_type=machine_type,
        endpoint_display_name=endpoint_name,
        hugging_face_access_token=args.hugging_face_access_token,
        spot=args.spot,
        reservation_affinity=args.reservation_affinity,
        use_dedicated_endpoint=args.use_dedicated_endpoint,
        enable_fast_tryout=args.enable_fast_tryout,
        container_image_uri=args.container_image_uri,
        container_command=args.container_command,
        container_args=args.container_args,
        container_env_vars=args.container_env_vars,
        container_ports=args.container_ports,
        container_grpc_ports=args.container_grpc_ports,
        container_predict_route=args.container_predict_route,
        container_health_route=args.container_health_route,
        container_deployment_timeout_seconds=args.container_deployment_timeout_seconds,
        container_shared_memory_size_mb=args.container_shared_memory_size_mb,
        container_startup_probe_exec=args.container_startup_probe_exec,
        container_startup_probe_period_seconds=args.container_startup_probe_period_seconds,
        container_startup_probe_timeout_seconds=args.container_startup_probe_timeout_seconds,
        container_health_probe_exec=args.container_health_probe_exec,
        container_health_probe_period_seconds=args.container_health_probe_period_seconds,
        container_health_probe_timeout_seconds=args.container_health_probe_timeout_seconds,
    )
  except apitools_exceptions.HttpError as e:
    # Keep prompting for HF token if the error is due to missing HF token.
    if (
        e.status_code == 400
        and 'provide a valid Hugging Face access token' in e.content
        and args.hugging_face_access_token is None
    ):
      while not args.hugging_face_access_token:
        args.hugging_face_access_token = console_io.PromptPassword(
            'Please enter your Hugging Face read access token: '
        )
      Deploy(
          args,
          machine_spec,
          endpoint_name,
          model,
          operation_client,
          mg_client,
      )
      return
    elif e.status_code == 403 and 'EULA' in e.content:
      log.status.Print(
          'The End User License Agreement'
          ' (EULA) of the model has not been accepted.'
      )
      publisher, model_id = args.model.split('@')[0].split('/')
      try:
        args.accept_eula = console_io.PromptContinue(
            message=(
                'The model can be deployed only if the EULA of the model has'
                ' been'
                ' accepted. You can view it at'
                f' https://console.cloud.google.com/vertex-ai/publishers/{publisher}/model-garden/{model_id}):'
            ),
            prompt_string='Do you want to accept the EULA?',
            default=False,
            cancel_on_no=True,
            cancel_string='EULA is not accepted.',
            throw_if_unattended=True,
        )
      except console_io.Error:
        raise core_exceptions.Error(
            'Please accept the EULA using the `--accept-eula` flag.'
        )
      Deploy(
          args,
          machine_spec,
          endpoint_name,
          model,
          operation_client,
          mg_client,
      )
      return
    else:
      raise e

  deploy_op_id = deploy_op.name.split('/')[-1]
  log.status.Print(
      'Deploying the model to the endpoint. To check the deployment'
      ' status, you can try one of the following methods:\n1) Look for'
      f' endpoint `{endpoint_name}` at the [Vertex AI] -> [Online'
      ' prediction] tab in Cloud Console\n2) Use `gcloud ai operations'
      f' describe {deploy_op_id} --region={args.region}` to find the status'
      ' of the deployment long-running operation\n'
  )
  operations_util.WaitForOpMaybe(
      operation_client,
      deploy_op,
      ParseOperation(deploy_op.name),
      asynchronous=args.asynchronous,
      max_wait_ms=3600000,  # 60 minutes
  )


def ParseOperation(operation_name):
  """Parse operation resource to the operation reference object.

  Args:
    operation_name: The operation resource to wait on

  Returns:
    The operation reference object
  """
  return resources.REGISTRY.ParseRelativeName(
      operation_name,
      collection='aiplatform.projects.locations.operations',
  )