HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/394/lib/googlecloudsdk/command_lib/compute/resource_policies/util.py
# -*- coding: utf-8 -*- #
# Copyright 2024 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utility functions for Google Compute Engine resource policies."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

from googlecloudsdk.api_lib.compute import utils
from googlecloudsdk.calliope import arg_parsers
from googlecloudsdk.calliope import base
from googlecloudsdk.calliope import exceptions
from googlecloudsdk.command_lib.compute import scope as compute_scopes
from googlecloudsdk.command_lib.compute.resource_policies import flags
from googlecloudsdk.command_lib.util.args import labels_util
from googlecloudsdk.core import yaml
from googlecloudsdk.core.util import times

_API_TIMEZONE = times.UTC


def _ParseWeeklyDayAndTime(start_time, weekday):
  """Converts the dt and day to _API_TIMEZONE and returns API formatted values.

  Args:
    start_time: The datetime object which represents a start time.
    weekday: The times.Weekday value which corresponds to the weekday.

  Returns:
    The weekday and start_time pair formatted as strings for use by the API
    clients.
  """
  weekday = times.GetWeekdayInTimezone(start_time, weekday, _API_TIMEZONE)
  formatted_time = _FormatStartTime(start_time)
  return weekday.name, formatted_time


def _FormatStartTime(dt):
  return times.FormatDateTime(dt, '%H:%M', _API_TIMEZONE)


def MakeVmMaintenancePolicy(policy_ref, args, messages):
  """Creates a VM Maintenance Window Resource Policy message from args."""
  vm_policy = messages.ResourcePolicyVmMaintenancePolicy()
  if args.IsSpecified('daily_cycle'):
    _, daily_cycle, _ = _ParseCycleFrequencyArgs(args, messages)
    vm_policy.maintenanceWindow = \
      messages.ResourcePolicyVmMaintenancePolicyMaintenanceWindow(
          dailyMaintenanceWindow=daily_cycle)
  else:
    if 1 <= args.concurrency_limit_percent <= 100:
      concurrency_control_group = \
        messages.ResourcePolicyVmMaintenancePolicyConcurrencyControl(
            concurrencyLimit=args.concurrency_limit_percent)
      vm_policy.concurrencyControlGroup = concurrency_control_group
    else:
      raise ValueError('--concurrency-limit-percent must be greater or equal to'
                       ' 1 and less or equal to 100')
  return messages.ResourcePolicy(
      name=policy_ref.Name(),
      description=args.description,
      region=policy_ref.region,
      vmMaintenancePolicy=vm_policy)


def MakeVmMaintenanceMaintenanceWindow(policy_ref, args, messages):
  """Creates a VM Maintenance window policy message from args."""
  vm_policy = messages.ResourcePolicyVmMaintenancePolicy()
  _, daily_cycle, _ = _ParseCycleFrequencyArgs(args, messages)
  vm_policy.maintenanceWindow = \
    messages.ResourcePolicyVmMaintenancePolicyMaintenanceWindow(
        dailyMaintenanceWindow=daily_cycle)
  return messages.ResourcePolicy(
      name=policy_ref.Name(),
      description=args.description,
      region=policy_ref.region,
      vmMaintenancePolicy=vm_policy)


def MakeVmMaintenanceConcurrentPolicy(policy_ref, args, messages):
  """Creates a VM Maintenance concurrency limit policy message from args."""
  concurrency_control_group = \
    messages.ResourcePolicyVmMaintenancePolicyConcurrencyControl(
        concurrencyLimit=args.max_percent
    )
  vm_policy = messages.ResourcePolicyVmMaintenancePolicy(
      concurrencyControlGroup=concurrency_control_group)

  return messages.ResourcePolicy(
      name=policy_ref.Name(),
      description=args.description,
      region=policy_ref.region,
      vmMaintenancePolicy=vm_policy)


def MakeDiskSnapshotSchedulePolicy(policy_ref, args, messages,
                                   support_snapshot_region=False):
  """Creates a Disk Snapshot Schedule Resource Policy message from args."""
  hourly_cycle, daily_cycle, weekly_cycle = _ParseCycleFrequencyArgs(
      args, messages, supports_hourly=True, supports_weekly=True)

  snapshot_properties = None
  snapshot_labels = labels_util.ParseCreateArgs(
      args,
      messages.ResourcePolicySnapshotSchedulePolicySnapshotProperties
      .LabelsValue,
      labels_dest='snapshot_labels')
  storage_location = [args.storage_location] if args.storage_location else []
  if not support_snapshot_region:
    if (args.IsSpecified('guest_flush') or snapshot_labels or storage_location):
      snapshot_properties = (
          messages.ResourcePolicySnapshotSchedulePolicySnapshotProperties(
              guestFlush=args.guest_flush,
              labels=snapshot_labels,
              storageLocations=storage_location))
  else:
    snapshot_region = args.snapshot_region if args.snapshot_region else None
    if (args.IsSpecified('guest_flush') or snapshot_labels or storage_location
        or snapshot_region):
      snapshot_properties = (
          messages.ResourcePolicySnapshotSchedulePolicySnapshotProperties(
              guestFlush=args.guest_flush,
              labels=snapshot_labels,
              storageLocations=storage_location,
              region=snapshot_region))
  snapshot_policy = messages.ResourcePolicySnapshotSchedulePolicy(
      retentionPolicy=messages
      .ResourcePolicySnapshotSchedulePolicyRetentionPolicy(
          maxRetentionDays=args.max_retention_days,
          onSourceDiskDelete=flags.GetOnSourceDiskDeleteFlagMapper(
              messages).GetEnumForChoice(args.on_source_disk_delete)),
      schedule=messages.ResourcePolicySnapshotSchedulePolicySchedule(
          hourlySchedule=hourly_cycle,
          dailySchedule=daily_cycle,
          weeklySchedule=weekly_cycle),
      snapshotProperties=snapshot_properties)
  return messages.ResourcePolicy(
      name=policy_ref.Name(),
      description=args.description,
      region=policy_ref.region,
      snapshotSchedulePolicy=snapshot_policy)


def MakeDiskSnapshotSchedulePolicyForUpdate(policy_ref, args, messages):
  """Creates a Disk Snapshot Schedule Resource Policy message from args used in ResourcePolicy.Patch.
  """
  hourly_cycle, daily_cycle, weekly_cycle = _ParseCycleFrequencyArgs(
      args, messages, supports_hourly=True, supports_weekly=True)

  snapshot_properties, snapshot_schedule, description = None, None, None
  snapshot_labels = labels_util.ParseCreateArgs(
      args,
      messages.ResourcePolicySnapshotSchedulePolicySnapshotProperties
      .LabelsValue,
      labels_dest='snapshot_labels')
  if snapshot_labels:
    snapshot_properties = (
        messages.ResourcePolicySnapshotSchedulePolicySnapshotProperties(
            labels=snapshot_labels))

  if args.IsSpecified('description'):
    description = args.description

  retention_policy = None
  if (args.max_retention_days or args.on_source_disk_delete):
    retention_policy = (
        messages.ResourcePolicySnapshotSchedulePolicyRetentionPolicy(
            maxRetentionDays=args.max_retention_days,
            onSourceDiskDelete=flags.GetOnSourceDiskDeleteFlagMapper(
                messages).GetEnumForChoice(args.on_source_disk_delete)))

  if hourly_cycle or daily_cycle or weekly_cycle:
    snapshot_schedule = messages.ResourcePolicySnapshotSchedulePolicySchedule(
        hourlySchedule=hourly_cycle,
        dailySchedule=daily_cycle,
        weeklySchedule=weekly_cycle)

  snapshot_policy = None
  if snapshot_schedule or snapshot_properties or retention_policy:
    snapshot_policy = messages.ResourcePolicySnapshotSchedulePolicy(
        schedule=snapshot_schedule, snapshotProperties=snapshot_properties,
        retentionPolicy=retention_policy)

  return messages.ResourcePolicy(
      name=policy_ref.Name(),
      description=description,
      snapshotSchedulePolicy=snapshot_policy)


def MakeInstanceSchedulePolicy(policy_ref, args, messages):
  """Creates an Instance Schedule Policy message from args."""

  vm_start_schedule = None
  if args.vm_start_schedule:
    vm_start_schedule = messages.ResourcePolicyInstanceSchedulePolicySchedule(
        schedule=args.vm_start_schedule)

  vm_stop_schedule = None
  if args.vm_stop_schedule:
    vm_stop_schedule = messages.ResourcePolicyInstanceSchedulePolicySchedule(
        schedule=args.vm_stop_schedule)

  instance_schedule_policy = messages.ResourcePolicyInstanceSchedulePolicy(
      timeZone=args.timezone,
      vmStartSchedule=vm_start_schedule,
      vmStopSchedule=vm_stop_schedule)

  if args.initiation_date:
    instance_schedule_policy.startTime = times.FormatDateTime(
        args.initiation_date)

  if args.end_date:
    instance_schedule_policy.expirationTime = times.FormatDateTime(
        args.end_date)

  return messages.ResourcePolicy(
      name=policy_ref.Name(),
      description=args.description,
      region=policy_ref.region,
      instanceSchedulePolicy=instance_schedule_policy)


def MakeInstanceSchedulePolicyForUpdate(policy_ref, args, messages):
  """Creates an Instance Schedule Policy message from args for update."""

  vm_start_schedule = None
  if args.vm_start_schedule:
    vm_start_schedule = messages.ResourcePolicyInstanceSchedulePolicySchedule(
        schedule=args.vm_start_schedule
    )

  vm_stop_schedule = None
  if args.vm_stop_schedule:
    vm_stop_schedule = messages.ResourcePolicyInstanceSchedulePolicySchedule(
        schedule=args.vm_stop_schedule
    )

  instance_schedule_policy = messages.ResourcePolicyInstanceSchedulePolicy(
      timeZone=args.timezone,
      vmStartSchedule=vm_start_schedule,
      vmStopSchedule=vm_stop_schedule,
  )

  if args.initiation_date:
    instance_schedule_policy.startTime = times.FormatDateTime(
        args.initiation_date
    )

  if args.end_date:
    instance_schedule_policy.expirationTime = times.FormatDateTime(
        args.end_date
    )

  return messages.ResourcePolicy(
      name=policy_ref.Name(),
      description=args.description,
      instanceSchedulePolicy=instance_schedule_policy,
  )


def MakeGroupPlacementPolicy(policy_ref, args, messages, track):
  """Creates a Group Placement Resource Policy message from args."""
  availability_domain_count = None
  if args.IsSpecified('availability_domain_count'):
    availability_domain_count = args.availability_domain_count
  collocation = None
  if args.IsSpecified('collocation'):
    collocation = flags.GetCollocationFlagMapper(
        messages, track).GetEnumForChoice(args.collocation)
  gpu_topology = args.gpu_topology if args.IsSpecified('gpu_topology') else None
  placement_policy = None
  if track == base.ReleaseTrack.ALPHA and args.IsSpecified('scope'):
    scope = flags.GetAvailabilityDomainScopeFlagMapper(
        messages).GetEnumForChoice(args.scope)
    placement_policy = messages.ResourcePolicyGroupPlacementPolicy(
        vmCount=args.vm_count,
        availabilityDomainCount=availability_domain_count,
        collocation=collocation,
        scope=scope,
    )
  elif track == base.ReleaseTrack.ALPHA and args.IsSpecified('tpu_topology'):
    placement_policy = messages.ResourcePolicyGroupPlacementPolicy(
        vmCount=args.vm_count,
        collocation=collocation,
        tpuTopology=args.tpu_topology,
    )
  elif track in (
      base.ReleaseTrack.ALPHA,
      base.ReleaseTrack.BETA,
  ) and args.IsSpecified('max_distance'):
    placement_policy = messages.ResourcePolicyGroupPlacementPolicy(
        vmCount=args.vm_count,
        collocation=collocation,
        maxDistance=args.max_distance,
    )
  else:
    placement_policy = messages.ResourcePolicyGroupPlacementPolicy(
        vmCount=args.vm_count,
        availabilityDomainCount=availability_domain_count,
        collocation=collocation,
    )
  if gpu_topology:
    placement_policy.gpuTopology = gpu_topology

  return messages.ResourcePolicy(
      name=policy_ref.Name(),
      description=args.description,
      region=policy_ref.region,
      groupPlacementPolicy=placement_policy)


def MakeWorkloadPolicy(policy_ref, args, messages):
  """Creates a Workload Policy message from args."""
  workload_policy = messages.ResourcePolicyWorkloadPolicy()

  if args.type:
    workload_policy.type = (
        messages.ResourcePolicyWorkloadPolicy.TypeValueValuesEnum(args.type)
    )
  if args.max_topology_distance:
    workload_policy.maxTopologyDistance = messages.ResourcePolicyWorkloadPolicy.MaxTopologyDistanceValueValuesEnum(
        args.max_topology_distance
    )
  if args.accelerator_topology:
    workload_policy.acceleratorTopology = args.accelerator_topology

  return messages.ResourcePolicy(
      name=policy_ref.Name(),
      description=args.description,
      region=policy_ref.region,
      workloadPolicy=workload_policy,
  )


def MakeDiskConsistencyGroupPolicy(policy_ref, args, messages):
  """Creates a Disk Consistency Group Resource Policy message from args.

  Args:
    policy_ref: resource reference of the Disk Consistency Group policy.
    args: Namespace, argparse.Namespace.
    messages: message classes.

  Returns:
    A messages.ResourcePolicy object for Disk Consistency Group Resource Policy.
  """
  consistency_group_policy = messages.ResourcePolicyDiskConsistencyGroupPolicy()

  return messages.ResourcePolicy(
      name=policy_ref.Name(),
      description=args.description,
      region=policy_ref.region,
      diskConsistencyGroupPolicy=consistency_group_policy)


def _ParseCycleFrequencyArgs(args,
                             messages,
                             supports_hourly=False,
                             supports_weekly=False):
  """Parses args and returns a tuple of DailyCycle and WeeklyCycle messages."""
  _ValidateCycleFrequencyArgs(args)

  hourly_cycle, daily_cycle, weekly_cycle = None, None, None
  if args.daily_cycle:
    daily_cycle = messages.ResourcePolicyDailyCycle(
        daysInCycle=1, startTime=_FormatStartTime(args.start_time))
  if supports_weekly:
    if args.weekly_cycle:
      day_enum = messages.ResourcePolicyWeeklyCycleDayOfWeek.DayValueValuesEnum
      weekday = times.Weekday.Get(args.weekly_cycle.upper())
      day, start_time = _ParseWeeklyDayAndTime(args.start_time, weekday)
      weekly_cycle = messages.ResourcePolicyWeeklyCycle(dayOfWeeks=[
          messages.ResourcePolicyWeeklyCycleDayOfWeek(
              day=day_enum(day), startTime=start_time)
      ])
    if args.IsSpecified('weekly_cycle_from_file'):
      if args.weekly_cycle_from_file:
        weekly_cycle = _ParseWeeklyCycleFromFile(args, messages)
      else:
        raise exceptions.InvalidArgumentException(
            args.GetFlag('weekly_cycle_from_file'), 'File cannot be empty.')
  if supports_hourly and args.hourly_cycle:
    hourly_cycle = messages.ResourcePolicyHourlyCycle(
        hoursInCycle=args.hourly_cycle,
        startTime=_FormatStartTime(args.start_time))
  return hourly_cycle, daily_cycle, weekly_cycle


def _ValidateCycleFrequencyArgs(args):
  """Validates cycle frequency args."""
  if args.IsSpecified('daily_cycle') and not args.daily_cycle:
    raise exceptions.InvalidArgumentException(
        args.GetFlag('daily_cycle'), 'cannot request a non-daily cycle.')


def _ParseWeeklyCycleFromFile(args, messages):
  """Parses WeeklyCycle message from file contents specified in args."""
  weekly_cycle_dict = yaml.load(args.weekly_cycle_from_file)
  day_enum = messages.ResourcePolicyWeeklyCycleDayOfWeek.DayValueValuesEnum
  days_of_week = []
  for day_and_time in weekly_cycle_dict:
    if 'day' not in day_and_time or 'startTime' not in day_and_time:
      raise exceptions.InvalidArgumentException(
          args.GetFlag('weekly_cycle_from_file'),
          'Each JSON/YAML object in the list must have the following keys: '
          '[day, startTime].')
    day = day_and_time['day'].upper()
    try:
      weekday = times.Weekday.Get(day)
    except KeyError:
      raise exceptions.InvalidArgumentException(
          args.GetFlag('weekly_cycle_from_file'),
          'Invalid value for `day`: [{}].'.format(day))
    start_time = arg_parsers.Datetime.ParseUtcTime(day_and_time['startTime'])
    day, start_time = _ParseWeeklyDayAndTime(start_time, weekday)
    days_of_week.append(
        messages.ResourcePolicyWeeklyCycleDayOfWeek(
            day=day_enum(day), startTime=start_time))
  return messages.ResourcePolicyWeeklyCycle(dayOfWeeks=days_of_week)


def ParseResourcePolicy(resources, name, project=None, region=None):
  return resources.Parse(
      name, {
          'project': project,
          'region': region
      },
      collection='compute.resourcePolicies')


def ParseResourcePolicyWithZone(resources, name, project, zone):
  region = utils.ZoneNameToRegionName(zone)
  return ParseResourcePolicy(resources, name, project, region)


def ParseResourcePolicyWithScope(resources, name, project, location, scope):
  if scope == compute_scopes.ScopeEnum.ZONE:
    region = utils.ZoneNameToRegionName(location)
  elif scope == compute_scopes.ScopeEnum.REGION:
    region = location
  return ParseResourcePolicy(resources, name, project, region)