HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/api_lib/compute/instance_utils.py
# -*- coding: utf-8 -*- #
# Copyright 2014 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Convenience functions for dealing with instances and instance templates."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import collections
import re

from googlecloudsdk.api_lib.compute import constants
from googlecloudsdk.api_lib.compute import containers_utils
from googlecloudsdk.api_lib.compute import csek_utils
from googlecloudsdk.api_lib.compute import metadata_utils
from googlecloudsdk.api_lib.compute import utils
from googlecloudsdk.api_lib.compute import zone_utils
from googlecloudsdk.calliope import exceptions as calliope_exceptions
from googlecloudsdk.command_lib.compute import flags as compute_flags
from googlecloudsdk.command_lib.compute import resource_manager_tags_utils
from googlecloudsdk.command_lib.compute import scope as compute_scopes
from googlecloudsdk.command_lib.compute.instances import flags
from googlecloudsdk.command_lib.compute.sole_tenancy import util as sole_tenancy_util
from googlecloudsdk.core import log
from googlecloudsdk.core import resources as cloud_resources
from googlecloudsdk.core.util import times
import six

EMAIL_REGEX = re.compile(r'(^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$)')

_DEFAULT_DEVICE_NAME_CONTAINER_WARNING = (
    'Default device-name for disk name [{0}] will be [{0}] because it is being '
    'mounted to a container with [`--container-mount-disk`]')


def GetCpuRamVmFamilyFromCustomName(name):
  """Gets the CPU and memory specs from the custom machine type name.

  Args:
    name: the custom machine type name for the 'instance create' call

  Returns:
    A three-tuple with the vm family, number of cpu and amount of memory for the
    custom machine type.
    custom_family, the name of the VM family
    custom_cpu, the number of cpu desired for the custom machine type instance
    custom_memory_mib, the amount of ram desired in MiB for the custom machine
      type instance
    None for both variables otherwise
  """
  check_custom = re.search('([a-zA-Z0-9]+)-custom-([0-9]+)-([0-9]+)', name)
  if check_custom:
    custom_family = check_custom.group(1)
    custom_cpu = check_custom.group(2)
    custom_memory_mib = check_custom.group(3)
    return custom_family, custom_cpu, custom_memory_mib
  return None, None, None


def GetNameForCustom(custom_cpu, custom_memory_mib, ext=False, vm_type=False):
  """Creates a custom machine type name from the desired CPU and memory specs.

  Args:
    custom_cpu: the number of cpu desired for the custom machine type
    custom_memory_mib: the amount of ram desired in MiB for the custom machine
      type instance
    ext: extended custom machine type should be used if true
    vm_type: VM instance generation

  Returns:
    The custom machine type name for the 'instance create' call
  """
  if vm_type:
    machine_type = '{0}-custom-{1}-{2}'.format(vm_type, custom_cpu,
                                               custom_memory_mib)
  else:
    machine_type = 'custom-{0}-{1}'.format(custom_cpu, custom_memory_mib)
  if ext:
    machine_type += '-ext'
  return machine_type


def InterpretMachineType(machine_type,
                         custom_cpu,
                         custom_memory,
                         ext=True,
                         vm_type=False,
                         confidential_vm_type=None):
  """Interprets the machine type for the instance.

  Args:
    machine_type: name of existing machine type, eg. n1-standard
    custom_cpu: number of CPU cores for custom machine type,
    custom_memory: amount of RAM memory in bytes for custom machine type,
    ext: extended custom machine type should be used if true,
    vm_type:  VM instance generation
    confidential_vm_type: If not None, use default machine type based on
        confidential-VM encryption type.

  Returns:
    A string representing the URL naming a machine-type.

  Raises:
    calliope_exceptions.RequiredArgumentException when only one of the two
      custom machine type flags are used.
    calliope_exceptions.InvalidArgumentException when both the machine type and
      custom machine type flags are used to generate a new instance.
  """
  # Setting the machine type
  if machine_type:
    machine_type_name = machine_type
  elif confidential_vm_type is not None:
    machine_type_name = constants.DEFAULT_MACHINE_TYPE_FOR_CONFIDENTIAL_VMS[
        confidential_vm_type
    ]
  else:
    machine_type_name = constants.DEFAULT_MACHINE_TYPE

  # Setting the specs for the custom machine.
  if custom_cpu or custom_memory or ext:
    if not custom_cpu:
      raise calliope_exceptions.RequiredArgumentException(
          '--custom-cpu', 'Both [--custom-cpu] and [--custom-memory] must be '
          'set to create a custom machine type instance.')
    if not custom_memory:
      raise calliope_exceptions.RequiredArgumentException(
          '--custom-memory', 'Both [--custom-cpu] and [--custom-memory] must '
          'be set to create a custom machine type instance.')
    if machine_type:
      raise calliope_exceptions.InvalidArgumentException(
          '--machine-type', 'Cannot set both [--machine-type] and '
          '[--custom-cpu]/[--custom-memory] for the same instance.')
    custom_type_string = GetNameForCustom(
        custom_cpu,
        # converting from B to MiB.
        custom_memory // (2**20),
        ext,
        vm_type)

    # Updating the machine type that is set for the URIs
    machine_type_name = custom_type_string
  return machine_type_name


def CheckCustomCpuRamRatio(compute_client, project, zone, machine_type_name):
  """Checks that the CPU and memory ratio is a supported custom instance type.

  Args:
    compute_client: GCE API client,
    project: a project,
    zone: the zone of the instance(s) being created,
    machine_type_name: The machine type of the instance being created.

  Returns:
    Nothing. Function acts as a bound checker, and will raise an exception from
      within the function if needed.

  Raises:
    utils.RaiseToolException if a custom machine type ratio is out of bounds.
  """
  messages = compute_client.messages
  compute = compute_client.apitools_client
  if 'custom' in machine_type_name:
    mt_get_pb = messages.ComputeMachineTypesGetRequest(
        machineType=machine_type_name, project=project, zone=zone)
    mt_get_reqs = [(compute.machineTypes, 'Get', mt_get_pb)]
    errors = []

    # Makes a 'machine-types describe' request to check the bounds
    _ = list(
        compute_client.MakeRequests(
            requests=mt_get_reqs, errors_to_collect=errors))

    if errors:
      utils.RaiseToolException(
          errors, error_message='Could not fetch machine type:')


def CreateServiceAccountMessages(messages, scopes, service_account):
  """Returns a list of ServiceAccount messages corresponding to scopes."""
  if scopes is None:
    scopes = constants.DEFAULT_SCOPES
  if service_account is None:
    service_account = 'default'

  accounts_to_scopes = collections.defaultdict(list)
  for scope in scopes:
    parts = scope.split('=')
    if len(parts) == 1:
      account = service_account
      scope_uri = scope
    elif len(parts) == 2:
      # TODO(b/33688878) Remove exception for this deprecated format
      raise calliope_exceptions.InvalidArgumentException(
          '--scopes',
          'Flag format --scopes [ACCOUNT=]SCOPE,[[ACCOUNT=]SCOPE, ...] is '
          'removed. Use --scopes [SCOPE,...] --service-account ACCOUNT '
          'instead.')
    else:
      raise calliope_exceptions.InvalidArgumentException(
          '--scopes',
          '[{0}] is an illegal value for [--scopes]. Values must be of the '
          'form [SCOPE].'.format(scope))

    if service_account != 'default' and not EMAIL_REGEX.match(service_account):
      raise calliope_exceptions.InvalidArgumentException(
          '--service-account',
          'Invalid format: expected default or user@domain.com, received ' +
          service_account)

    # Expands the scope if the user provided an alias like
    # "compute-rw".
    scope_uri = constants.SCOPES.get(scope_uri, [scope_uri])
    accounts_to_scopes[account].extend(scope_uri)

  if not scopes and service_account != 'default':
    return [messages.ServiceAccount(email=service_account, scopes=[])]
  res = []
  for account, scopes in sorted(six.iteritems(accounts_to_scopes)):
    res.append(messages.ServiceAccount(email=account, scopes=sorted(scopes)))
  return res


def CreateOnHostMaintenanceMessage(messages, maintenance_policy):
  """Create on-host-maintenance message for VM."""
  if maintenance_policy:
    on_host_maintenance = messages.Scheduling.OnHostMaintenanceValueValuesEnum(
        maintenance_policy)
  else:
    on_host_maintenance = None
  return on_host_maintenance


def CreateSchedulingMessage(
    messages,
    maintenance_policy,
    preemptible,
    restart_on_failure,
    node_affinities=None,
    min_node_cpu=None,
    location_hint=None,
    maintenance_freeze_duration=None,
    maintenance_interval=None,
    provisioning_model=None,
    instance_termination_action=None,
    host_error_timeout_seconds=None,
    max_run_duration=None,
    termination_time=None,
    local_ssd_recovery_timeout=None,
    availability_domain=None,
    graceful_shutdown=None,
    discard_local_ssds_at_termination_timestamp=None,
    skip_guest_os_shutdown=None,
    preemption_notice_duration=None,
):
  """Create scheduling message for VM."""
  # Note: We always specify automaticRestart=False for preemptible VMs. This
  # makes sense, since no-restart-on-failure is defined as "store-true", and
  # thus can't be given an explicit value. Hence it either has its default
  # value (in which case we override it for convenience's sake to the only
  # setting that makes sense for preemptible VMs), or the user actually
  # specified no-restart-on-failure, the only usable setting.
  on_host_maintenance = CreateOnHostMaintenanceMessage(messages,
                                                       maintenance_policy)
  if preemptible or provisioning_model == 'SPOT':
    scheduling = messages.Scheduling(
        automaticRestart=False,
        onHostMaintenance=on_host_maintenance,
        preemptible=True)
  else:
    scheduling = messages.Scheduling(
        automaticRestart=restart_on_failure,
        onHostMaintenance=on_host_maintenance)

  if provisioning_model:
    scheduling.provisioningModel = (
        messages.Scheduling.ProvisioningModelValueValuesEnum(provisioning_model)
    )

  if instance_termination_action:
    scheduling.instanceTerminationAction = (
        messages.Scheduling.InstanceTerminationActionValueValuesEnum(
            instance_termination_action
        )
    )

  if max_run_duration is not None:
    scheduling.maxRunDuration = messages.Duration(seconds=max_run_duration)

  if local_ssd_recovery_timeout is not None:
    scheduling.localSsdRecoveryTimeout = messages.Duration(
        seconds=local_ssd_recovery_timeout
    )

  if graceful_shutdown is not None:
    scheduling.gracefulShutdown = messages.SchedulingGracefulShutdown()
    if 'enabled' in graceful_shutdown:
      scheduling.gracefulShutdown.enabled = graceful_shutdown['enabled']
    if 'maxDuration' in graceful_shutdown:
      scheduling.gracefulShutdown.maxDuration = messages.Duration(
          seconds=graceful_shutdown['maxDuration']
      )

  if termination_time:
    scheduling.terminationTime = times.FormatDateTime(termination_time)

  if node_affinities:
    scheduling.nodeAffinities = node_affinities

  if min_node_cpu is not None:
    scheduling.minNodeCpus = int(min_node_cpu)

  if location_hint:
    scheduling.locationHint = location_hint

  if maintenance_freeze_duration:
    scheduling.maintenanceFreezeDurationHours = (
        maintenance_freeze_duration // 3600)

  if maintenance_interval:
    scheduling.maintenanceInterval = (
        messages.Scheduling.MaintenanceIntervalValueValuesEnum(
            maintenance_interval))

  if host_error_timeout_seconds:
    scheduling.hostErrorTimeoutSeconds = host_error_timeout_seconds

  if availability_domain:
    scheduling.availabilityDomain = availability_domain

  if discard_local_ssds_at_termination_timestamp is not None:
    scheduling.onInstanceStopAction = messages.SchedulingOnInstanceStopAction(
        discardLocalSsd=discard_local_ssds_at_termination_timestamp
    )

  if skip_guest_os_shutdown is not None:
    scheduling.skipGuestOsShutdown = skip_guest_os_shutdown

  if preemption_notice_duration is not None:
    scheduling.preemptionNoticeDuration = messages.Duration(
        seconds=preemption_notice_duration
    )

  return scheduling


def CreateShieldedInstanceConfigMessage(messages, enable_secure_boot,
                                        enable_vtpm,
                                        enable_integrity_monitoring):
  """Create shieldedInstanceConfig message for VM."""

  shielded_instance_config = messages.ShieldedInstanceConfig(
      enableSecureBoot=enable_secure_boot,
      enableVtpm=enable_vtpm,
      enableIntegrityMonitoring=enable_integrity_monitoring)

  return shielded_instance_config


def CreateShieldedInstanceIntegrityPolicyMessage(messages,
                                                 update_auto_learn_policy=True):
  """Creates shieldedInstanceIntegrityPolicy message for VM."""

  shielded_instance_integrity_policy = messages.ShieldedInstanceIntegrityPolicy(
      updateAutoLearnPolicy=update_auto_learn_policy)

  return shielded_instance_integrity_policy


def ValidateSvsmConfig(svsm_args, support_snp_svsm):
  """Validates flags specifying SVSM config.

  Args:
    svsm_args: The flags specifying SVSM config.
    support_snp_svsm: Whether SVSM is supported.

  Returns:
    Nothing. Function acts as a validator, and will raise an exception from
      within the function if needed.

  Raises:
    calliope_exceptions.RequiredArgumentException when the flags are not
      specified or are not valid.
  """

  if not svsm_args or not support_snp_svsm:
    return

  tpm_choices = ['EPHEMERAL', 'NO_CC_TPM']
  snp_irq_choices = ['RESTRICTED', 'UNRESTRICTED']

  tpm = svsm_args.get('tpm', None)
  snp_irq = svsm_args.get('snp-irq', None)
  if tpm and tpm not in tpm_choices:
    raise calliope_exceptions.InvalidArgumentException(
        '--svsm-config',
        f'Unexpected confidential TPM type: [{tpm}]. Legal values are '
        f'[{", ".join(tpm_choices)}].',
    )
  if snp_irq and snp_irq not in snp_irq_choices:
    raise calliope_exceptions.InvalidArgumentException(
        '--svsm-config',
        f'Unexpected SEV SNP IRQ mode: [{snp_irq}]. Legal values are '
        f'[{", ".join(snp_irq_choices)}].',
    )


def CreateConfidentialParavisorConfigMessage(args, messages, support_snp_svsm):
  """Create confidentialParavisorConfig message for VM."""
  if (
      not hasattr(args, 'svsm_config') or not args.IsSpecified('svsm_config')
      or not support_snp_svsm
  ):
    return None
  ValidateSvsmConfig(args.svsm_config, support_snp_svsm)
  tpm = (
      args.svsm_config.get('tpm', 'CONFIDENTIAL_TPM_TYPE_UNSPECIFIED')
      if args.svsm_config
      else 'CONFIDENTIAL_TPM_TYPE_UNSPECIFIED'
  )
  snp_irq = (
      args.svsm_config.get('snp-irq', 'SEV_SNP_IRQ_MODE_UNSPECIFIED')
      if args.svsm_config
      else 'SEV_SNP_IRQ_MODE_UNSPECIFIED'
  )
  confidential_tpm_type = (
      messages.ConfidentialParavisorConfig.ConfidentialTpmTypeValueValuesEnum(
          tpm
      )
  )
  sev_snp_irq_mode = (
      messages.ConfidentialParavisorConfig.SevSnpIrqModeValueValuesEnum(
          snp_irq
      )
  )
  return messages.ConfidentialParavisorConfig(
      confidentialTpmType=confidential_tpm_type,
      sevSnpIrqMode=sev_snp_irq_mode,
  )


def CreateConfidentialInstanceMessage(messages, args,
                                      support_confidential_compute_type,
                                      support_confidential_compute_type_tdx,
                                      support_snp_svsm):
  """Create confidentialInstanceConfig message for VM."""
  confidential_instance_config_msg = None
  enable_confidential_compute = None
  confidential_instance_type = None

  if (hasattr(args, 'confidential_compute') and
      args.IsSpecified('confidential_compute') and
      isinstance(args.confidential_compute, bool)):
    enable_confidential_compute = args.confidential_compute

  if (support_confidential_compute_type and
      hasattr(args, 'confidential_compute_type') and
      args.IsSpecified('confidential_compute_type') and
      isinstance(args.confidential_compute_type, six.string_types)):
    confidential_instance_type = (
        messages.ConfidentialInstanceConfig
        .ConfidentialInstanceTypeValueValuesEnum(
            args.confidential_compute_type))

    if (not support_confidential_compute_type_tdx and
        'TDX' in (
            messages.ConfidentialInstanceConfig
            .ConfidentialInstanceTypeValueValuesEnum)):
      enable_confidential_compute = None
      confidential_instance_type = None

  confidential_paravisor_config_msg = CreateConfidentialParavisorConfigMessage(
      args, messages, support_snp_svsm
  )
  if (
      confidential_instance_type is not None
      and confidential_paravisor_config_msg is not None
  ):
    confidential_instance_config_msg = messages.ConfidentialInstanceConfig(
        confidentialInstanceType=confidential_instance_type,
        confidentialParavisorConfig=confidential_paravisor_config_msg,
    )
  elif confidential_instance_type is not None:
    confidential_instance_config_msg = messages.ConfidentialInstanceConfig(
        confidentialInstanceType=confidential_instance_type
    )
  elif confidential_paravisor_config_msg is not None:
    raise calliope_exceptions.InvalidArgumentException(
        '--confidential-compute-type',
        'Confidential compute type must be specified when using '
        '--svsm-config.',
    )
  elif enable_confidential_compute is not None:
    confidential_instance_config_msg = messages.ConfidentialInstanceConfig(
        enableConfidentialCompute=enable_confidential_compute
    )

  return confidential_instance_config_msg


def CreateAdvancedMachineFeaturesMessage(
    messages,
    enable_nested_virtualization=None,
    threads_per_core=None,
    numa_node_count=None,
    visible_core_count=None,
    enable_uefi_networking=None,
    performance_monitoring_unit=None,
    enable_watchdog_timer=None,
    turbo_mode=None,
):
  """Create AdvancedMachineFeatures message for an Instance."""
  # Start with an empty AdvancedMachineFeatures and optionally add on
  # the features we have like CreateSchedulingMessage does. This lets us
  # treat None as also "not supported in this version of the API (yet)".
  features = messages.AdvancedMachineFeatures()

  if enable_nested_virtualization is not None:
    features.enableNestedVirtualization = enable_nested_virtualization

  if threads_per_core is not None:
    features.threadsPerCore = threads_per_core

  if numa_node_count is not None:
    features.numaNodeCount = numa_node_count

  if visible_core_count is not None:
    features.visibleCoreCount = visible_core_count

  if enable_uefi_networking is not None:
    features.enableUefiNetworking = enable_uefi_networking

  if performance_monitoring_unit is not None:
    features.performanceMonitoringUnit = messages.AdvancedMachineFeatures.PerformanceMonitoringUnitValueValuesEnum(
        performance_monitoring_unit.upper()
    )

  if enable_watchdog_timer is not None:
    features.enableWatchdogTimer = enable_watchdog_timer

  if turbo_mode is not None:
    features.turboMode = turbo_mode

  return features


def ParseDiskResource(resources, name, project, zone, type_):
  """Parses disk resources.

  Project and zone are ignored if a fully-qualified resource name is given, i.e.
    - https://compute.googleapis.com/compute/v1/projects/my-project
          /zones/us-central1-a/disks/disk-1
    - projects/my-project/zones/us-central1-a/disks/disk-1

  If project and zone cannot be parsed, we will use the given project and zone
  as fallbacks.

  Args:
    resources: resources.Registry, The resource registry
    name: str, name of the disk.
    project: str, project of the disk.
    zone: str, zone of the disk.
    type_: ScopeEnum, type of the disk.

  Returns:
    A disk resource.
  """
  if type_ == compute_scopes.ScopeEnum.REGION:
    return resources.Parse(
        name,
        collection='compute.regionDisks',
        params={
            'project': project,
            'region': utils.ZoneNameToRegionName(zone)
        })
  else:
    return resources.Parse(
        name,
        collection='compute.disks',
        params={
            'project': project,
            'zone': zone
        })


def ParseDiskResourceFromAttachedDisk(resources, attached_disk):
  """Parses the source disk resource of an AttachedDisk.

  The source of an AttachedDisk is either a partial or fully specified URL
  referencing either a regional or zonal disk.

  Args:
    resources: resources.Registry, The resource registry
    attached_disk: AttachedDisk

  Returns:
    A disk resource.

  Raises:
    InvalidResourceException: If the attached disk source cannot be parsed as a
        regional or zonal disk.
  """
  try:
    disk = resources.Parse(
        attached_disk.source, collection='compute.regionDisks'
    )
    if disk:
      return disk
  except (
      cloud_resources.WrongResourceCollectionException,
      cloud_resources.RequiredFieldOmittedException,
  ):
    pass

  try:
    disk = resources.Parse(attached_disk.source, collection='compute.disks')
    if disk:
      return disk
  except (
      cloud_resources.WrongResourceCollectionException,
      cloud_resources.RequiredFieldOmittedException,
  ):
    pass

  raise cloud_resources.InvalidResourceException(
      "Unable to parse disk's source: [{0}] of device name: [{1}], try using"
      ' `--device-name` instead.'.format(
          attached_disk.source,
          attached_disk.deviceName,
      )
  )


def GetDiskDeviceName(disk, name, container_mount_disk):
  """Helper method to get device-name for a disk message."""
  if container_mount_disk and filter(
      bool, [d.get('name', name) == name for d in container_mount_disk]
  ):
    # device-name must be the same as name if it is being mounted to a
    # container.
    if not disk.get('device-name'):
      log.warning(_DEFAULT_DEVICE_NAME_CONTAINER_WARNING.format(name))
      return name
    # This is defensive only; should be validated before this method is called.
    elif disk.get('device-name') != name:
      raise calliope_exceptions.InvalidArgumentException(
          '--container-mount-disk',
          'Attempting to mount disk named [{}] with device-name [{}]. If '
          'being mounted to container, disk name must match device-name.'
          .format(name, disk.get('device-name')),
      )
  return disk.get('device-name')


def ParseDiskType(
    resources, disk_type, project, location, scope, replica_zone_cnt=0
):
  """Parses disk type reference based on location scope."""
  if scope == compute_scopes.ScopeEnum.ZONE:
    if replica_zone_cnt != 2:
      collection = 'compute.diskTypes'
      params = {'project': project, 'zone': location}
    else:
      collection = 'compute.regionDiskTypes'
      location = GetRegionFromZone(location)
      params = {'project': project, 'region': location}
  elif scope == compute_scopes.ScopeEnum.REGION:
    collection = 'compute.regionDiskTypes'
    params = {'project': project, 'region': location}
  disk_type_ref = resources.Parse(
      disk_type, collection=collection, params=params)
  return disk_type_ref


def GetRegionFromZone(zone):
  """Returns the GCP region that the input zone is in."""
  return '-'.join(zone.split('-')[:-1]).lower()


def ParseStoragePool(resources, storage_pool, project, location):
  """Parses disk type reference based on location scope."""
  collection = 'compute.storagePools'
  params = {'project': project, 'zone': location}
  storage_pool_ref = resources.Parse(
      storage_pool, collection=collection, params=params
  )
  return storage_pool_ref


def UseExistingBootDisk(disks):
  """Returns True if the user has specified an existing boot disk."""
  return any(disk.get('boot', False) for disk in disks)


def IsAnySpecified(args, *dests):
  return any([args.IsSpecified(dest) for dest in dests])


def GetSourceInstanceTemplate(args, resources, source_instance_template_arg):
  if not args.IsSpecified('source_instance_template'):
    return None
  ref = source_instance_template_arg.ResolveAsResource(
      args, resources, default_scope=flags.compute_scope.ScopeEnum.GLOBAL
  )
  return ref.SelfLink()


def GetSkipDefaults(source_instance_template):
  # gcloud creates default values for some fields in Instance resource
  # when no value was specified on command line.
  # When --source-instance-template was specified, defaults are taken from
  # Instance Template and gcloud flags are used to override them - by default
  # fields should not be initialized.
  return source_instance_template is not None


def GetScheduling(
    args,
    client,
    skip_defaults,
    support_node_affinity=False,
    support_min_node_cpu=True,
    support_node_project=False,
    support_host_error_timeout_seconds=True,
    support_max_run_duration=False,
    support_local_ssd_recovery_timeout=False,
    support_graceful_shutdown=False,
    support_skip_guest_os_shutdown=False,
    support_preemption_notice_duration=False,
):
  """Generate a Scheduling Message or None based on specified args."""
  node_affinities = None
  if support_node_affinity:
    node_affinities = sole_tenancy_util.GetSchedulingNodeAffinityListFromArgs(
        args, client.messages, support_node_project)
  min_node_cpu = None
  if support_min_node_cpu:
    min_node_cpu = args.min_node_cpu
  location_hint = None
  if hasattr(args, 'location_hint'):
    location_hint = args.location_hint
  freeze_duration = None
  if hasattr(args, 'maintenance_freeze_duration') and args.IsSpecified(
      'maintenance_freeze_duration'):
    freeze_duration = args.maintenance_freeze_duration
  maintenance_interval = None
  if hasattr(
      args,
      'maintenance_interval') and args.IsSpecified('maintenance_interval'):
    maintenance_interval = args.maintenance_interval
  provisioning_model = None
  if (hasattr(args, 'provisioning_model') and
      args.IsSpecified('provisioning_model')):
    provisioning_model = args.provisioning_model

  instance_termination_action = None
  if (hasattr(args, 'instance_termination_action') and
      args.IsSpecified('instance_termination_action')):
    instance_termination_action = args.instance_termination_action

  host_error_timeout_seconds = None
  if support_host_error_timeout_seconds and hasattr(
      args, 'host_error_timeout_seconds'):
    host_error_timeout_seconds = args.host_error_timeout_seconds

  max_run_duration = None
  if support_max_run_duration and hasattr(args, 'max_run_duration'):
    max_run_duration = args.max_run_duration

  local_ssd_recovery_timeout = None
  if support_local_ssd_recovery_timeout and hasattr(
      args, 'local_ssd_recovery_timeout') and args.IsSpecified(
          'local_ssd_recovery_timeout'):
    local_ssd_recovery_timeout = args.local_ssd_recovery_timeout

  graceful_shutdown = ExtractGracefulShutdownFromArgs(
      args, support_graceful_shutdown
  )

  termination_time = None
  if support_max_run_duration and hasattr(args, 'termination_time'):
    termination_time = args.termination_time

  discard_local_ssds_at_termination_timestamp = None
  if support_max_run_duration and hasattr(
      args, 'discard_local_ssds_at_termination_timestamp'
  ):
    discard_local_ssds_at_termination_timestamp = (
        args.discard_local_ssds_at_termination_timestamp
    )

  # Make sure restart_on_failure always retain user-provided value,
  # but also ignore its default value when asked to skip defaults.
  restart_on_failure = None
  if not skip_defaults or args.IsKnownAndSpecified('restart_on_failure'):
    restart_on_failure = args.restart_on_failure

  availability_domain = None
  if args.IsKnownAndSpecified('availability_domain') and hasattr(
      args, 'availability_domain'
  ):
    availability_domain = args.availability_domain

  skip_guest_os_shutdown = None
  if support_skip_guest_os_shutdown and args.IsKnownAndSpecified(
      'skip_guest_os_shutdown'
  ):
    skip_guest_os_shutdown = args.skip_guest_os_shutdown

  preemption_notice_duration = None
  if support_preemption_notice_duration and hasattr(
      args, 'preemption_notice_duration'
  ):
    preemption_notice_duration = args.preemption_notice_duration

  if (
      skip_defaults
      and not IsAnySpecified(
          args,
          'instance_termination_action',
          'maintenance_policy',
          'preemptible',
          'provisioning_model',
      )
      and not restart_on_failure
      and not node_affinities
      and not max_run_duration
      and not termination_time
      and not freeze_duration
      and not host_error_timeout_seconds
      and not maintenance_interval
      and not local_ssd_recovery_timeout
      and not graceful_shutdown
      and not preemption_notice_duration
  ):
    return None

  return CreateSchedulingMessage(
      messages=client.messages,
      maintenance_policy=args.maintenance_policy,
      preemptible=args.preemptible,
      restart_on_failure=restart_on_failure,
      node_affinities=node_affinities,
      min_node_cpu=min_node_cpu,
      location_hint=location_hint,
      maintenance_freeze_duration=freeze_duration,
      maintenance_interval=maintenance_interval,
      provisioning_model=provisioning_model,
      instance_termination_action=instance_termination_action,
      host_error_timeout_seconds=host_error_timeout_seconds,
      max_run_duration=max_run_duration,
      termination_time=termination_time,
      local_ssd_recovery_timeout=local_ssd_recovery_timeout,
      availability_domain=availability_domain,
      graceful_shutdown=graceful_shutdown,
      discard_local_ssds_at_termination_timestamp=discard_local_ssds_at_termination_timestamp,
      skip_guest_os_shutdown=skip_guest_os_shutdown,
      preemption_notice_duration=preemption_notice_duration,
  )


def GetServiceAccounts(args, client, skip_defaults):
  if args.no_service_account:
    service_account = None
  else:
    service_account = args.service_account
  if (skip_defaults and not IsAnySpecified(
      args, 'scopes', 'no_scopes', 'service_account', 'no_service_account')):
    return []
  return CreateServiceAccountMessages(
      messages=client.messages,
      scopes=[] if args.no_scopes else args.scopes,
      service_account=service_account)


def GetValidatedMetadata(args, client):
  user_metadata = metadata_utils.ConstructMetadataMessage(
      client.messages,
      metadata=args.metadata,
      metadata_from_file=args.metadata_from_file)
  containers_utils.ValidateUserMetadata(user_metadata)
  return user_metadata


def GetMetadata(args, client, skip_defaults):
  if (skip_defaults and
      not IsAnySpecified(args, 'metadata', 'metadata_from_file')):
    return None
  else:
    return metadata_utils.ConstructMetadataMessage(
        client.messages,
        metadata=args.metadata,
        metadata_from_file=args.metadata_from_file)


def GetBootDiskSizeGb(args):
  boot_disk_size_gb = utils.BytesToGb(args.boot_disk_size)
  utils.WarnIfDiskSizeIsTooSmall(boot_disk_size_gb, args.boot_disk_type)
  return boot_disk_size_gb


def GetInstanceRefs(args, client, holder):
  instance_refs = flags.INSTANCES_ARG.ResolveAsResource(
      args,
      holder.resources,
      scope_lister=compute_flags.GetDefaultScopeLister(client))
  # Check if the zone is deprecated or has maintenance coming.
  zone_resource_fetcher = zone_utils.ZoneResourceFetcher(client)
  zone_resource_fetcher.WarnForZonalCreation(instance_refs)
  return instance_refs


def GetSourceMachineImageKey(args, source_image, compute_client, holder):
  machine_image_ref = source_image.ResolveAsResource(args, holder.resources)
  csek_keys = csek_utils.CsekKeyStore.FromFile(
      args.source_machine_image_csek_key_file, allow_rsa_encrypted=False)
  disk_key_or_none = csek_utils.MaybeLookupKeyMessage(
      csek_keys, machine_image_ref, compute_client.apitools_client)
  return disk_key_or_none


def CheckSpecifiedMachineTypeArgs(args, skip_defaults):
  return (not skip_defaults or
          IsAnySpecified(args, 'machine_type', 'custom_cpu', 'custom_memory'))


def CreateMachineTypeUri(args,
                         compute_client,
                         resource_parser,
                         project,
                         location,
                         scope,
                         confidential_vm_type=None):
  """Create a machine type URI for given args and instance reference."""

  machine_type_name = CreateMachineTypeName(args, confidential_vm_type)

  # Check to see if the custom machine type ratio is supported
  CheckCustomCpuRamRatio(compute_client, project, location, machine_type_name)

  machine_type_uri = ParseMachineType(resource_parser, machine_type_name,
                                      project, location, scope)
  return machine_type_uri


def CreateMachineTypeName(args, confidential_vm_type=None):
  """Create a machine type name for given args and instance reference."""

  machine_type = args.machine_type
  custom_cpu = args.custom_cpu
  custom_memory = args.custom_memory
  vm_type = getattr(args, 'custom_vm_type', None)
  ext = getattr(args, 'custom_extensions', None)

  # Setting the machine type
  machine_type_name = InterpretMachineType(
      machine_type=machine_type,
      custom_cpu=custom_cpu,
      custom_memory=custom_memory,
      ext=ext,
      vm_type=vm_type,
      confidential_vm_type=confidential_vm_type)

  return machine_type_name


def ParseMachineType(resource_parser, machine_type_name, project, location,
                     scope):
  """Returns the location-specific machine type uri."""
  if scope == compute_scopes.ScopeEnum.ZONE:
    collection = 'compute.machineTypes'
    params = {'project': project, 'zone': location}
  elif scope == compute_scopes.ScopeEnum.REGION:
    collection = 'compute.regionMachineTypes'
    params = {'project': project, 'region': location}
  machine_type_uri = resource_parser.Parse(
      machine_type_name, collection=collection, params=params).SelfLink()
  return machine_type_uri


def GetConfidentialVmType(args, support_confidential_compute_type):
  """Returns the CONFIDENTIAL_VM_TYPES of the machine."""
  confidential_vm_type = None

  if args.IsSpecified('confidential_compute') and args.confidential_compute:
    confidential_vm_type = constants.CONFIDENTIAL_VM_TYPES.SEV

  if (support_confidential_compute_type
      and args.IsSpecified('confidential_compute_type')
      and args.confidential_compute_type is not None):
    confidential_vm_type = getattr(constants.CONFIDENTIAL_VM_TYPES,
                                   args.confidential_compute_type.upper())

  return confidential_vm_type


def GetCanIpForward(args, skip_defaults):
  if skip_defaults and not args.IsSpecified('can_ip_forward'):
    return None
  return args.can_ip_forward


def GetTags(args, client):
  if args.tags:
    return client.messages.Tags(items=args.tags)
  return None


def GetLabels(args, client, instance_properties=False):
  """Gets labels for the instance message."""
  labels_value = client.messages.Instance.LabelsValue
  if instance_properties:
    labels_value = client.messages.InstanceProperties.LabelsValue
  if args.labels:
    return labels_value(additionalProperties=[
        labels_value.AdditionalProperty(key=key, value=value)
        for key, value in sorted(six.iteritems(args.labels))
    ])
  return None


def ParseAcceleratorType(accelerator_type_name, resource_parser, project,
                         location, scope):
  """Returns accelerator type ref based on location scope."""
  if scope == compute_scopes.ScopeEnum.ZONE:
    collection = 'compute.acceleratorTypes'
    params = {'project': project, 'zone': location}
  elif scope == compute_scopes.ScopeEnum.REGION:
    collection = 'compute.regionAcceleratorTypes'
    params = {'project': project, 'region': location}
  accelerator_type = resource_parser.Parse(
      accelerator_type_name, collection=collection, params=params).SelfLink()
  return accelerator_type


def ResolveSnapshotURI(user_project, snapshot, resource_parser, region=None):
  """Returns snapshot URI based on location scope."""
  if user_project and snapshot and resource_parser:
    if region:
      snapshot_ref = resource_parser.Parse(
          snapshot,
          collection='compute.regionSnapshots',
          params={'project': user_project, 'region': region},
      )
      return snapshot_ref.SelfLink()

    snapshot_ref = resource_parser.Parse(
        snapshot,
        collection='compute.snapshots',
        params={'project': user_project})
    return snapshot_ref.SelfLink()
  return None


def ResolveInstantSnapshotURI(user_project, instant_snapshot, resource_parser):
  if user_project and instant_snapshot and resource_parser:
    instant_snapshot_ref = resource_parser.Parse(
        instant_snapshot,
        collection='compute.instantSnapshots',
        params={'project': user_project},
    )
    return instant_snapshot_ref.SelfLink()
  return None


def GetReservationAffinity(args, client, support_specific_then_x_affinity):
  """Returns the message of reservation affinity for the instance."""
  if args.IsSpecified('reservation_affinity'):
    type_msgs = (
        client.messages.ReservationAffinity
        .ConsumeReservationTypeValueValuesEnum)

    reservation_key = None
    reservation_values = []

    specific_affinities_map = {
        'specific': type_msgs.SPECIFIC_RESERVATION,
    }
    if support_specific_then_x_affinity:
      specific_affinities_map.update({
          'specific-then-any-reservation': (
              type_msgs.SPECIFIC_THEN_ANY_RESERVATION
          ),
          'specific-then-no-reservation': (
              type_msgs.SPECIFIC_THEN_NO_RESERVATION
          ),
      })

    if args.reservation_affinity == 'none':
      reservation_type = type_msgs.NO_RESERVATION
    elif args.reservation_affinity in specific_affinities_map:
      reservation_type = specific_affinities_map[args.reservation_affinity]
      # Currently, the key is fixed and the value is the name of the
      # reservation.
      # The value being a repeated field is reserved for future use when user
      # can specify more than one reservation names from which the VM can take
      # capacity from.
      reservation_key = _RESERVATION_AFFINITY_KEY
      reservation_values = [args.reservation]
    else:
      reservation_type = type_msgs.ANY_RESERVATION

    return client.messages.ReservationAffinity(
        consumeReservationType=reservation_type,
        key=reservation_key or None,
        values=reservation_values)

  return None


def GetNetworkPerformanceConfig(args, client):
  """Get NetworkPerformanceConfig message for the instance."""

  network_perf_args = getattr(args, 'network_performance_configs', [])
  network_perf_configs = client.messages.NetworkPerformanceConfig()

  for config in network_perf_args:
    total_tier = config.get('total-egress-bandwidth-tier', '').upper()
    if total_tier:
      network_perf_configs.totalEgressBandwidthTier = client.messages.NetworkPerformanceConfig.TotalEgressBandwidthTierValueValuesEnum(
          total_tier)

  return network_perf_configs


def ExtractGracefulShutdownFromArgs(args, support_graceful_shutdown=False):
  """Extracts graceful shutdown from args."""

  graceful_shutdown = None
  if support_graceful_shutdown:
    if hasattr(args, 'graceful_shutdown') and args.IsSpecified(
        'graceful_shutdown'
    ):
      graceful_shutdown = {'enabled': args.graceful_shutdown}

    if hasattr(args, 'graceful_shutdown_max_duration') and args.IsSpecified(
        'graceful_shutdown_max_duration'
    ):
      if graceful_shutdown is None:
        graceful_shutdown = {'maxDuration': args.graceful_shutdown_max_duration}
      else:
        graceful_shutdown['maxDuration'] = args.graceful_shutdown_max_duration

  return graceful_shutdown


def CreateParams(args, client):
  """Create a Params message for the instance."""

  params = client.messages.InstanceParams()

  if args.IsKnownAndSpecified('resource_manager_tags'):
    ret_resource_manager_tags = (
        resource_manager_tags_utils.GetResourceManagerTags(
            args.resource_manager_tags
        )
    )
    if ret_resource_manager_tags is not None:
      resource_manager_tags_value = (
          client.messages.InstanceParams.ResourceManagerTagsValue
      )
      params.resourceManagerTags = resource_manager_tags_value(
          additionalProperties=[
              resource_manager_tags_value.AdditionalProperty(
                  key=key, value=value
              )
              for key, value in sorted(six.iteritems(ret_resource_manager_tags))
          ]
      )

  if args.IsKnownAndSpecified('request_valid_for_duration'):
    if (
        not hasattr(args, 'provisioning_model')
        or not args.IsSpecified('provisioning_model')
        or args.provisioning_model != 'FLEX_START'
    ):
      raise calliope_exceptions.InvalidArgumentException(
          '--request_valid_for_duration',
          '[--request_valid_for_duration] is only supported for FLEX_START'
          ' provisioning model.',
      )

    params.requestValidForDuration = client.messages.Duration(
        seconds=args.request_valid_for_duration
    )

  return params


_RESERVATION_AFFINITY_KEY = 'compute.googleapis.com/reservation-name'