HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/surface/compute/disks/convert.py
# -*- coding: utf-8 -*- #
# Copyright 2024 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Command for converting a disk to a different type."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import enum
import textwrap

from googlecloudsdk.api_lib.compute import base_classes
from googlecloudsdk.api_lib.compute import disks_util
from googlecloudsdk.api_lib.compute import kms_utils
from googlecloudsdk.api_lib.compute import name_generator
from googlecloudsdk.api_lib.compute.operations import poller
from googlecloudsdk.api_lib.util import waiter
from googlecloudsdk.calliope import base
from googlecloudsdk.calliope import exceptions
from googlecloudsdk.command_lib.compute import completers
from googlecloudsdk.command_lib.compute import flags
from googlecloudsdk.command_lib.compute.disks import flags as disks_flags
from googlecloudsdk.command_lib.kms import resource_args as kms_resource_args
from googlecloudsdk.core import exceptions as core_exceptions
from googlecloudsdk.core import log
from googlecloudsdk.core.console import console_io
from googlecloudsdk.core.console import progress_tracker


CONTINUE_WITH_CONVERT_PROMPT = (
    'This command will permanently convert disk {0} to disk type: {1}. Please'
    ' detach the disk from all instances before continuing. Data written to the'
    ' original disk during conversion will not appear on the converted disk.'
    ' Please see'
    ' https://cloud.google.com/compute/docs/disks/automatically-convert-disks'
    ' for more details.'
)


class _ConvertState(enum.Enum):
  SNAPSHOT_CREATED = 1
  DISK_RESTORED = 2
  ORIGINAL_DISK_DELETED = 3
  ORIGINAL_DISK_RECREATED = 4
  RESTORED_DISK_DELETED = 5
  SNAPSHOT_DELETED = 6


@base.DefaultUniverseOnly
@base.ReleaseTracks(base.ReleaseTrack.ALPHA,
                    base.ReleaseTrack.BETA,
                    base.ReleaseTrack.GA)
class Convert(base.RestoreCommand):
  """Convert a Compute Engine Persistent Disk volume to a Hyperdisk volume."""

  _DISK_ARG = disks_flags.MakeDiskArg(plural=False)

  @staticmethod
  def Args(parser):
    Convert._DISK_ARG.AddArgument(parser)
    parser.add_argument(
        '--target-disk-type',
        completer=completers.DiskTypesCompleter,
        required=True,
        help="""Specifies the type of Hyperdisk to convert to, for example,
        to convert a Hyperdisk Balanced volume, specify `hyperdisk-balanced`. To get a
        list of available disk types, run `gcloud compute disk-types list`.
        """,
    )
    kms_resource_args.AddKmsKeyResourceArg(
        parser, 'disk', region_fallthrough=True
    )

  def Run(self, args):
    self.holder = base_classes.ComputeApiHolder(self.ReleaseTrack())
    self.client = self.holder.client.apitools_client
    self.messages = self.holder.client.messages
    self.state = None
    self.created_resources = {}
    self.user_messages = ''

    self.disk_ref = self._DISK_ARG.ResolveAsResource(
        args,
        self.holder.resources,
        scope_lister=flags.GetDefaultScopeLister(self.holder.client),
    )

    if self.disk_ref.Collection() == 'compute.regionDisks':
      raise exceptions.InvalidArgumentException(
          '--region',
          'Regional disks are not supported for this command.'
      )

    if args.target_disk_type == 'hyperdisk-ml':
      raise exceptions.InvalidArgumentException(
          '--target-disk-type',
          'Hyperdisk ML is not supported for this command.',
      )
    self.target_disk_type = args.target_disk_type

    # make sure disk is not attached to any instances
    disk_info = disks_util.GetDiskInfo(
        self.disk_ref, self.client, self.messages)
    original_disk = disk_info.GetDiskResource()
    if original_disk.users:
      raise exceptions.ToolException(
          'Disk is attached to instances. Please detach the disk before'
          ' converting.'
      )
    console_io.PromptContinue(
        message=textwrap.dedent(
            CONTINUE_WITH_CONVERT_PROMPT.format(
                self.disk_ref.Name(), self.target_disk_type
            )
        ),
        cancel_on_no=True,
    )

    try:
      with self._CreateProgressTracker(self.disk_ref.Name()):
        result = self._ConvertDisk(
            self.disk_ref, self.target_disk_type, original_disk.sizeGb,
            disk_encryption_key=kms_utils.MaybeGetKmsKey(
                args, self.messages, None
            ),
        )
    except Exception as e:
      raise e
    finally:
      self._CleanUp()
      if self.user_messages:
        if self.state in [_ConvertState.ORIGINAL_DISK_RECREATED,
                          _ConvertState.RESTORED_DISK_DELETED,
                          _ConvertState.SNAPSHOT_DELETED]:
          log.warning(self.user_messages)
        else:
          log.error(self.user_messages)

    return result

  def _ConvertDisk(
      self, disk_ref, target_disk_type, size_gb, disk_encryption_key=None
  ):
    # create a snapshot of the disk
    self.snapshot_name = self._GenerateName(disk_ref)
    result = self._InsertSnapshot(disk_ref, self.snapshot_name)
    snapshot_ref = self.holder.resources.Parse(
        self.snapshot_name,
        params={'project': disk_ref.project},
        collection='compute.snapshots',
    )
    self._UpdateState(_ConvertState.SNAPSHOT_CREATED, snapshot_ref)
    # create a new disk from the snapshot with target disk type
    self.restored_disk_name = self._GenerateName(disk_ref)
    restored_disk_ref = self.holder.resources.Parse(
        self.restored_disk_name,
        params={'project': disk_ref.project, 'zone': disk_ref.zone},
        collection='compute.disks',
    )
    result = (
        self._RestoreDiskFromSnapshot(
            restored_disk_ref,
            snapshot_ref,
            target_disk_type,
            size_gb,
            disk_encryption_key=disk_encryption_key,
        )
        or result
    )
    self._UpdateState(_ConvertState.DISK_RESTORED, restored_disk_ref)
    # delete the original disk
    result = self._DeleteDisk(disk_ref) or result
    self._UpdateState(_ConvertState.ORIGINAL_DISK_DELETED)

    # recreate the original disk with the new disk as source
    result = (
        self._CloneDisk(
            disk_ref.Name(),
            restored_disk_ref,
            disk_encryption_key=disk_encryption_key,
        )
        or result
    )
    self._UpdateState(_ConvertState.ORIGINAL_DISK_RECREATED)

    # delete the restored disk because the original disk is recreated
    result = self._DeleteDisk(restored_disk_ref) or result
    self._UpdateState(_ConvertState.RESTORED_DISK_DELETED)

    # delete the snapshot because the disk is recreated
    result = self._DeleteSnapshot(snapshot_ref) or result
    self._UpdateState(_ConvertState.SNAPSHOT_DELETED)

    return result

  def _InsertSnapshot(self, disk_ref, snapshot_name):
    request = self.messages.ComputeSnapshotsInsertRequest(
        project=disk_ref.project,
        snapshot=self.messages.Snapshot(
            name=snapshot_name,
            sourceDisk=disk_ref.SelfLink(),
            snapshotType=self.messages.Snapshot.SnapshotTypeValueValuesEnum.STANDARD,
        ),
    )
    operation = self._MakeRequest(self.client.snapshots, 'Insert', request)
    operation_ref = self.holder.resources.Parse(
        operation.selfLink,
        collection='compute.globalOperations',
    )
    return waiter.WaitFor(
        poller.Poller(self.client.snapshots),
        operation_ref,
        custom_tracker=self._CreateNoOpProgressTracker(),
        max_wait_ms=None,
    )

  def _DeleteSnapshot(self, snapshot_ref):
    request = self.messages.ComputeSnapshotsDeleteRequest(
        snapshot=snapshot_ref.Name(),
        project=snapshot_ref.project,
    )
    operation = self._MakeRequest(self.client.snapshots, 'Delete', request)
    operation_ref = self.holder.resources.Parse(
        operation.selfLink,
        collection='compute.globalOperations',
    )
    return waiter.WaitFor(
        poller.DeletePoller(self.client.snapshots),
        operation_ref,
        custom_tracker=self._CreateNoOpProgressTracker(),
        max_wait_ms=None,
    )

  def _RestoreDiskFromSnapshot(
      self,
      restored_disk_ref,
      snapshot_ref,
      disk_type,
      size_gb,
      disk_encryption_key=None,
  ):
    kwargs = {}
    if disk_encryption_key:
      kwargs['diskEncryptionKey'] = disk_encryption_key
    disk = self.messages.Disk(
        name=restored_disk_ref.Name(),
        type=disks_util.GetDiskTypeUri(
            disk_type, restored_disk_ref, self.holder
        ),
        sizeGb=size_gb,
        sourceSnapshot=snapshot_ref.SelfLink(),
        **kwargs,
    )

    request = self.messages.ComputeDisksInsertRequest(
        disk=disk,
        project=restored_disk_ref.project,
        zone=restored_disk_ref.zone,
    )
    operation = self._MakeRequest(self.client.disks, 'Insert', request)
    operation_ref = self.holder.resources.Parse(
        operation.selfLink,
        collection='compute.zoneOperations',
    )
    return waiter.WaitFor(
        poller.Poller(self.client.disks),
        operation_ref,
        custom_tracker=self._CreateNoOpProgressTracker(),
        max_wait_ms=None,
    )

  def _GenerateName(self, resource_ref):
    return f'{name_generator.GenerateRandomName()}-{resource_ref.Name()}'[:64]

  def _DeleteDisk(self, disk_ref):
    request = self.messages.ComputeDisksDeleteRequest(
        disk=disk_ref.Name(),
        project=disk_ref.project,
        zone=disk_ref.zone,
    )
    operation = self._MakeRequest(self.client.disks, 'Delete', request)
    operation_ref = self.holder.resources.Parse(
        operation.selfLink,
        collection='compute.zoneOperations',
    )
    return waiter.WaitFor(
        poller.DeletePoller(self.client.disks),
        operation_ref,
        custom_tracker=self._CreateNoOpProgressTracker(),
        max_wait_ms=None,
    )

  def _CloneDisk(
      self, original_disk_name, restored_disk_ref, disk_encryption_key=None
  ):
    kwargs = {}
    if disk_encryption_key:
      kwargs['diskEncryptionKey'] = disk_encryption_key
    disk = self.messages.Disk(
        name=original_disk_name,
        sourceDisk=restored_disk_ref.SelfLink(),
        **kwargs,
    )
    request = self.messages.ComputeDisksInsertRequest(
        disk=disk,
        project=restored_disk_ref.project,
        zone=restored_disk_ref.zone,
    )
    operation = self._MakeRequest(self.client.disks, 'Insert', request)
    operation_ref = self.holder.resources.Parse(
        operation.selfLink,
        collection='compute.zoneOperations',
    )
    operation_poller = poller.Poller(self.client.disks)
    return waiter.WaitFor(
        operation_poller,
        operation_ref,
        custom_tracker=self._CreateNoOpProgressTracker(),
        max_wait_ms=None,
    )

  def _MakeRequest(self, resource_client, method, request):
    errors_to_collect = []
    responses = self.holder.client.AsyncRequests(
        [(resource_client, method, request)], errors_to_collect
    )
    if errors_to_collect:
      raise core_exceptions.MultiError(errors_to_collect)
    if not responses:
      raise core_exceptions.InternalError('No response received')
    return responses[0]

  def _UpdateState(self, state, created_resource=None):
    self.state = state
    if created_resource:
      self.created_resources[state] = created_resource

  def _CleanUp(self):
    if not self.state:
      self.user_messages = (
          'Creating snapshot failed.' + self._BuildCleanupSnapshotMessage()
      )
      return
    if self.state == _ConvertState.SNAPSHOT_CREATED:
      # restore disk failed
      self.user_messages = (
          f'Creating disk from snapshot {self.snapshot_name} failed. '
          + self._BuildCleanupSnapshotMessage()
      )
      self._DeleteSnapshot(
          self.created_resources[_ConvertState.SNAPSHOT_CREATED]
      )
    elif self.state == _ConvertState.DISK_RESTORED:
      # delete original disk request failed
      self.user_messages = (
          f'Deleting original disk {self.disk_ref.Name()} failed. '
          + self._BuildCleanupDiskMessage()
          + self._BuildCleanupSnapshotMessage()
      )
      self._DeleteDisk(self.created_resources[_ConvertState.DISK_RESTORED])
      self._DeleteSnapshot(
          self.created_resources[_ConvertState.SNAPSHOT_CREATED]
      )
    elif self.state == _ConvertState.ORIGINAL_DISK_DELETED:
      # recreate original disk failed
      self.user_messages = (
          f'Recreating original disk {self.disk_ref.Name()} failed. Please run'
          ' `gcloud compute disks create'
          f' {self.disk_ref.Name()} --zone={self.disk_ref.zone} --type={self.target_disk_type} --source-disk={self.restored_disk_name}`'
          ' to recreate the original disk. Please run `gcloud compute'
          f' snapshots delete {self.snapshot_name}` to delete the temporary'
          ' snapshot. Please run `gcloud compute disks delete'
          f' {self.restored_disk_name} --zone={self.disk_ref.zone}` to delete'
          ' the temporary disk.'
      )
    elif self.state == _ConvertState.ORIGINAL_DISK_RECREATED:
      # delete restored disk failed
      self.user_messages = (
          'Conversion completed successfully, Deleting temporary disk'
          f' {self.restored_disk_name} failed.'
          + self._BuildCleanupDiskMessage()
          + self._BuildCleanupSnapshotMessage()
      )
    elif self.state == _ConvertState.RESTORED_DISK_DELETED:
      self.user_messages = (
          'Conversion completed successfully. Deleting temporary snapshot'
          f' {self.snapshot_name} failed.'
          + self._BuildCleanupSnapshotMessage()
      )

  def _CreateProgressTracker(self, disk_name):
    return progress_tracker.ProgressTracker(
        message=f'Converting disk {disk_name}...',
        aborted_message='Conversion aborted.',
    )

  def _CreateNoOpProgressTracker(self):
    return progress_tracker.NoOpProgressTracker(
        interruptable=True, aborted_message=''
    )

  def _BuildCleanupSnapshotMessage(self):
    return (
        f' Please run `gcloud compute snapshots delete {self.snapshot_name}` to'
        ' delete the temporary snapshot if it still exists.'
    )

  def _BuildCleanupDiskMessage(self):
    return (
        ' Please run `gcloud compute disks delete'
        f' {self.restored_disk_name} --zone={self.disk_ref.zone}` to delete the'
        ' temporary disk if it still exists.'
    )


Convert.detailed_help = {
    'DESCRIPTION': """\
 Convert Compute Engine Persistent Disk volumes to Hyperdisk volumes.

 *{command}* converts a Compute Engine Persistent Disk volume to a Hyperdisk volume. For a comprehensive guide, refer to: https://cloud.google.com/sdk/gcloud/reference/compute/disks/convert.
    """,
    'EXAMPLES': """\
The following command converts a Persistent Disk volume to a Hyperdisk Balanced volume:

        $ {command} my-disk-1 --zone=ZONE --target-disk-type=hyperdisk-balanced
        """,
}