HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/api_lib/notebook_executor/schedules.py
# -*- coding: utf-8 -*- #
# Copyright 2024 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Notebook-executor schedules api helper."""

import types

from googlecloudsdk.api_lib.notebook_executor import executions as executions_util
from googlecloudsdk.calliope import exceptions
from googlecloudsdk.calliope import parser_extensions
from googlecloudsdk.core import resources
from googlecloudsdk.core.util import times
from googlecloudsdk.generated_clients.apis.aiplatform.v1beta1 import aiplatform_v1beta1_client


AiplatformV1beta1 = aiplatform_v1beta1_client.AiplatformV1beta1
Namespace = parser_extensions.Namespace


def GetScheduleResourceName(args):
  """Get the resource name for the schedule.

  Args:
    args: Argparse object from Command.Run

  Returns:
    The resource name in the form
    projects/{project}/locations/{location}/schedules/{schedule_id}.
  """
  return args.CONCEPTS.schedule.Parse().RelativeName()


def ParseScheduleOperation(operation_name):
  """Parse operation relative resource name to the operation reference object.

  Args:
    operation_name: The schedule operation resource name

  Returns:
    The operation reference object
  """
  if '/schedules/' in operation_name:
    try:
      return resources.REGISTRY.ParseRelativeName(
          operation_name,
          collection=(
              'aiplatform.projects.locations.schedules.operations'
          ),
      )
    except resources.WrongResourceCollectionException:
      pass
  return resources.REGISTRY.ParseRelativeName(
      operation_name, collection='aiplatform.projects.locations.operations'
  )


def GetScheduleUri(resource):
  """Get the URL for a schedule resource."""
  return resources.REGISTRY.ParseRelativeName(
      relative_name=resource.name,
      collection='aiplatform.projects.locations.schedules',
  ).SelfLink()


def GetStartTime(args):
  """Get the start time for the schedule."""
  return times.FormatDateTime(args.start_time) if args.start_time else None


def GetEndTime(args):
  """Get the end time for the schedule."""
  return times.FormatDateTime(args.end_time) if args.end_time else None


def CreateSchedule(
    args: Namespace,
    messages: types.ModuleType,
    for_update: bool = False,
    for_workbench: bool = False,
):
  """Builds a Schedule message.

  Args:
    args: Argparse object from Command.Run
    messages: Module containing messages definition for the specified API.
    for_update: Whether the schedule is to be used in an update request.
    for_workbench: Whether the schedule is for a Workbench execution.

  Returns:
    Instance of the Schedule message.
  """
  execution_create_request = None
  if not for_update:
    execution_create_request = (
        executions_util.CreateExecutionCreateRequestForSchedule(
            args, messages, for_workbench
        )
    )
  return messages.GoogleCloudAiplatformV1beta1Schedule(
      displayName=args.display_name,
      startTime=GetStartTime(args),
      endTime=GetEndTime(args),
      maxRunCount=args.max_runs,
      cron=args.cron_schedule,
      maxConcurrentRunCount=args.max_concurrent_runs,
      allowQueueing=args.enable_queueing,
      createNotebookExecutionJobRequest=execution_create_request,
  )


def FilterWorkbenchSchedule(schedule):
  """List filter for Workbench schedules.

  Args:
    schedule: The schedule item returned from List API to check.

  Returns:
    True if the schedule is for a Workbench notebook execution.
  """
  return executions_util.IsWorkbenchExecution(
      schedule.createNotebookExecutionJobRequest.notebookExecutionJob
  )


def ValidateAndGetColabSchedule(
    args: Namespace,
    messages: types.ModuleType,
    service: AiplatformV1beta1.ProjectsLocationsSchedulesService,
):
  """Checks if the schedule is for a Colab Enterprise notebook execution and returns the schedule if so.

  Args:
    args: Argparse object from Command.Run
    messages: Module containing messages definition for the specified API.
    service: The service to use to make the request.

  Returns:
    The schedule if it is of Colab Enterprise type.

  Raises:
    InvalidArgumentException: If the schedule is not of notebook execution type
    or is of Workbench type.
  """
  schedule = service.Get(CreateScheduleGetRequest(args, messages))
  notebook_execution_job_request = schedule.createNotebookExecutionJobRequest
  if notebook_execution_job_request is None:
    raise exceptions.InvalidArgumentException(
        'SCHEDULE', 'Schedule is not of notebook execution type.'
    )
  if executions_util.IsWorkbenchExecution(
      notebook_execution_job_request.notebookExecutionJob
  ):
    raise exceptions.InvalidArgumentException(
        'SCHEDULE',
        'Schedule is not of Colab Enterprise type. To manage Workbench'
        ' schedules use `gcloud beta workbench` instead.',
    )
  return schedule


def ValidateAndGetWorkbenchSchedule(
    args: Namespace,
    messages: types.ModuleType,
    service: AiplatformV1beta1.ProjectsLocationsSchedulesService,
    skip_workbench_check: bool = False,
):
  """Checks if the schedule is for a Workbench notebook execution and returns the schedule if so.

  Args:
    args: Argparse object from Command.Run
    messages: Module containing messages definition for the specified API.
    service: The service to use to make the request.
    skip_workbench_check: Whether to skip validation of the schedule type.

  Returns:
    The schedule if it is of Workbench type.

  Raises:
    InvalidArgumentException: If the schedule is not of notebook execution type.
  """
  schedule = service.Get(
      CreateScheduleGetRequest(args, messages)
  )
  notebook_execution_job_request = schedule.createNotebookExecutionJobRequest
  if notebook_execution_job_request is None:
    raise exceptions.InvalidArgumentException(
        'SCHEDULE', 'Schedule is not of notebook execution type.'
    )
  if (
      notebook_execution_job_request.notebookExecutionJob.kernelName is None
      and not skip_workbench_check
  ):
    raise exceptions.InvalidArgumentException(
        'SCHEDULE',
        'Schedule is not of Workbench type. To manage Colab Enterprise'
        ' schedules use `gcloud colab schedules` instead.',
    )
  return schedule


def CreateScheduleGetRequest(args, messages):
  """Builds a SchedulesGetRequest message.

  Args:
    args: Argparse object from Command.Run
    messages: Module containing messages definition for the specified API.

  Returns:
    Instance of the SchedulesGetRequest message.
  """

  return (
      messages.AiplatformProjectsLocationsSchedulesGetRequest(
          name=GetScheduleResourceName(args),
      )
  )


def CreateScheduleDeleteRequest(args, messages):
  """Builds a SchedulesDeleteRequest message.

  Args:
    args: Argparse object from Command.Run
    messages: Module containing messages definition for the specified API.

  Returns:
    Instance of the SchedulesDeleteRequest message.
  """

  return (
      messages.AiplatformProjectsLocationsSchedulesDeleteRequest(
          name=GetScheduleResourceName(args),
      )
  )


def CreateSchedulePauseRequest(args, messages):
  """Builds a SchedulesPauseRequest message.

  Args:
    args: Argparse object from Command.Run
    messages: Module containing messages definition for the specified API.

  Returns:
    Instance of the SchedulesPauseRequest message.
  """

  return (
      messages.AiplatformProjectsLocationsSchedulesPauseRequest(
          name=GetScheduleResourceName(args),
      )
  )


def CreateScheduleResumeRequest(args, messages):
  """Builds a SchedulesResumeRequest message.

  Args:
    args: Argparse object from Command.Run
    messages: Module containing messages definition for the specified API.

  Returns:
    Instance of the SchedulesResumeRequest message.
  """
  resume_schedule_request = (
      messages.GoogleCloudAiplatformV1beta1ResumeScheduleRequest(
          catchUp=args.enable_catch_up
      )
  )
  return (
      messages.AiplatformProjectsLocationsSchedulesResumeRequest(
          name=GetScheduleResourceName(args),
          googleCloudAiplatformV1beta1ResumeScheduleRequest=(
              resume_schedule_request
          ),
      )
  )


def CreateScheduleListRequest(args, messages):
  """Builds a SchedulesListRequest message.

  Args:
    args: Argparse object from Command.Run
    messages: Module containing messages definition for the specified API.

  Returns:
    Instance of the SchedulesListRequest message.
  """
  return messages.AiplatformProjectsLocationsSchedulesListRequest(
      parent=executions_util.GetParentForExecutionOrSchedule(args),
      filter='create_notebook_execution_job_request:*',
  )


def CreateScheduleCreateRequest(
    args: Namespace, messages: types.ModuleType, for_workbench: bool = False
):
  """Builds a SchedulesCreateRequest message.

  Args:
    args: Argparse object from Command.Run
    messages: Module containing messages definition for the specified API.
    for_workbench: Whether the schedule is for a Workbench execution.

  Returns:
    Instance of the SchedulesCreateRequest message.
  """
  return messages.AiplatformProjectsLocationsSchedulesCreateRequest(
      parent=executions_util.GetParentForExecutionOrSchedule(args),
      googleCloudAiplatformV1beta1Schedule=CreateSchedule(
          args, messages, for_update=False, for_workbench=for_workbench
      ),
  )


def CreateScheduleUpdateMask(args):
  """Builds a field mask for the schedule update request.

  Args:
    args: Argparse object from Command.Run

  Returns:
    Field mask for the schedule update request.
  """
  mask_fields = []
  args_to_field_map = {
      'display_name': 'display_name',
      'start_time': 'start_time',
      'end_time': 'end_time',
      'max_runs': 'max_run_count',
      'cron_schedule': 'cron',
      'max_concurrent_runs': 'max_concurrent_run_count',
      'enable_queueing': 'allow_queueing',
  }
  for key, value in args_to_field_map.items():
    if args.IsSpecified(key):
      mask_fields.append(value)
  return ','.join(map(str, mask_fields))


def CreateSchedulePatchRequest(args, messages):
  """Builds a SchedulesPatchRequest message.

  Args:
    args: Argparse object from Command.Run
    messages: Module containing messages definition for the specified API.

  Returns:
    Instance of the SchedulesPatchRequest message.
  """
  return messages.AiplatformProjectsLocationsSchedulesPatchRequest(
      name=GetScheduleResourceName(args),
      googleCloudAiplatformV1beta1Schedule=CreateSchedule(
          args, messages, for_update=True
      ),
      updateMask=CreateScheduleUpdateMask(args),
  )