HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/api_lib/bms/bms_client.py
# -*- coding: utf-8 -*- #
# Copyright 2022 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Cloud Bare Metal Solution client."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import collections
import re

from apitools.base.py import exceptions as apitools_exceptions
from apitools.base.py import list_pager
from googlecloudsdk.api_lib.util import apis
from googlecloudsdk.api_lib.util import exceptions as apilib_exceptions
from googlecloudsdk.command_lib.bms import util
import six

_DEFAULT_API_VERSION = 'v2'
_V1_API_VERSION = 'v1'
_GLOBAL_REGION = 'global'
_REGIONAL_IAM_REGEX = re.compile(
    "PERMISSION_DENIED: Permission (.+) denied on 'projects/(.+?)/.*"
)

IpRangeReservation = collections.namedtuple(
    'IpRangeReservation', ['start_address', 'end_address', 'note']
)


def _ParseError(error):
  """Returns a best-effort error message created from an API client error."""
  if isinstance(error, apitools_exceptions.HttpError):
    parsed_error = apilib_exceptions.HttpException(
        error, error_format='{message}'
    )
    error_message = parsed_error.message
  else:
    error_message = six.text_type(error)
  return error_message


def _CollapseRegionalIAMErrors(errors):
  """If all errors are PERMISSION_DENIEDs, use a single global error instead."""
  # TODO(b/198857865): Remove this hack once the `global` region fix is in
  if errors:
    matches = [_REGIONAL_IAM_REGEX.match(e) for e in errors]
    if (
        all(match is not None for match in matches)
        and len(set(match.group(1) for match in matches)) == 1
    ):
      errors = [
          'PERMISSION_DENIED: Permission %s denied on projects/%s'
          % (matches[0].group(1), matches[0].group(2))
      ]
  return errors


class BmsClient(object):
  """Cloud Bare Metal Solution client."""

  def __init__(self, api_version=_DEFAULT_API_VERSION):
    self._client = apis.GetClientInstance('baremetalsolution', api_version)
    self._v1_client = apis.GetClientInstance(
        'baremetalsolution', _V1_API_VERSION
    )
    self._messages = apis.GetMessagesModule('baremetalsolution', api_version)
    self.instances_service = self._client.projects_locations_instances
    self.volumes_service = self._client.projects_locations_volumes
    self.snapshot_schedule_policies_service = (
        self._client.projects_locations_snapshotSchedulePolicies
    )
    self.snapshots_service = self._client.projects_locations_volumes_snapshots
    self.networks_service = self._client.projects_locations_networks
    self.locations_service = self._client.projects_locations
    self.luns_service = self._client.projects_locations_volumes_luns
    self.nfs_shares_service = self._client.projects_locations_nfsShares
    self.ssh_keys_service = self._client.projects_locations_sshKeys
    self.operation_service = self._client.projects_locations_operations
    self.os_images_service = self._client.projects_locations_osImages
    self.nfs_mount_permissions_str_to_message = {
        'READ_ONLY': (
            self.messages.AllowedClient.MountPermissionsValueValuesEnum.READ
        ),
        'READ_WRITE': (
            self.messages.AllowedClient.MountPermissionsValueValuesEnum.READ_WRITE
        ),
    }
    self.nfs_storage_type_str_to_message = {
        'SSD': self.messages.NfsShare.StorageTypeValueValuesEnum.SSD,
        'HDD': self.messages.NfsShare.StorageTypeValueValuesEnum.HDD,
    }

  @property
  def client(self):
    return self._client

  @property
  def messages(self):
    return self._messages

  def GetInstance(self, resource):
    request = (
        self.messages.BaremetalsolutionProjectsLocationsInstancesGetRequest(
            name=resource.RelativeName()
        )
    )
    return self.instances_service.Get(request)

  def ListInstances(self, parent, limit=None, page_size=None):
    request = (
        self.messages.BaremetalsolutionProjectsLocationsInstancesListRequest(
            parent=parent
        )
    )
    return list_pager.YieldFromList(
        self.instances_service,
        request,
        limit=limit,
        batch_size_attribute='pageSize',
        batch_size=page_size,
        field='instances',
    )

  def UpdateInstance(
      self,
      instance_resource,
      labels,
      os_image,
      enable_hyperthreading,
      ssh_keys,
      kms_key_version,
      clear_ssh_keys,
  ):
    """Update an existing instance resource."""
    updated_fields = []
    if labels is not None:
      updated_fields.append('labels')
    if os_image is not None:
      updated_fields.append('os_image')
    if enable_hyperthreading is not None:
      updated_fields.append('hyperthreading_enabled')
    if ssh_keys or clear_ssh_keys:
      updated_fields.append('ssh_keys')
    if kms_key_version is not None:
      updated_fields.append('kms_key_version')

    ssh_keys_relative = [ssh_key.RelativeName() for ssh_key in ssh_keys]

    instance_msg = self.messages.Instance(
        name=instance_resource.RelativeName(),
        labels=labels,
        osImage=os_image,
        hyperthreadingEnabled=enable_hyperthreading,
        sshKeys=ssh_keys_relative,
        kmsKeyVersion=kms_key_version,
    )

    request = (
        self.messages.BaremetalsolutionProjectsLocationsInstancesPatchRequest(
            name=instance_resource.RelativeName(),
            instance=instance_msg,
            updateMask=','.join(updated_fields),
        )
    )
    return self.instances_service.Patch(request)

  def ReimageInstance(
      self, instance_resource, os_image, ssh_keys, kms_crypto_key_version
  ):
    """Reimage an existing instance."""
    ssh_keys_relative = [ssh_key.RelativeName() for ssh_key in ssh_keys]
    request = (
        self.messages.BaremetalsolutionProjectsLocationsInstancesReimageRequest(
            name=instance_resource.RelativeName(),
            reimageInstanceRequest={
                'osImage': os_image,
                'sshKeys': ssh_keys_relative,
                'kmsKeyVersion': kms_crypto_key_version,
            },
        )
    )
    return self.instances_service.Reimage(request)

  def EnableHyperthreading(self, instance_resource):
    """Enable hyperthreading on an instance."""
    request = self.messages.BaremetalsolutionProjectsLocationsInstancesEnableHyperthreadingRequest(
        name=instance_resource.RelativeName()
    )
    return self.instances_service.EnableHyperthreading(request)

  def DisableHyperthreading(self, instance_resource):
    """Disable hyperthreading on an instance."""
    request = self.messages.BaremetalsolutionProjectsLocationsInstancesDisableHyperthreadingRequest(
        name=instance_resource.RelativeName()
    )
    return self.instances_service.DisableHyperthreading(request)

  def RenameInstance(self, instance_resource, new_name):
    """Rename an existing instance resource."""
    rename_instance_request = self.messages.RenameInstanceRequest(
        newInstanceId=new_name
    )
    request = (
        self.messages.BaremetalsolutionProjectsLocationsInstancesRenameRequest(
            name=instance_resource.RelativeName(),
            renameInstanceRequest=rename_instance_request,
        )
    )
    return self.instances_service.Rename(request)

  def LoadInstanceAuthInfo(self, instance_resource):
    """Load instance auth info."""
    request = self.messages.BaremetalsolutionProjectsLocationsInstancesLoadAuthInfoRequest(
        name=instance_resource.RelativeName()
    )
    return self.instances_service.LoadAuthInfo(request)

  def ListSnapshotSchedulePolicies(
      self, project_resource, limit=None, page_size=None
  ):
    parent = 'projects/%s/locations/global' % project_resource
    request = self.messages.BaremetalsolutionProjectsLocationsSnapshotSchedulePoliciesListRequest(
        parent=parent
    )
    return list_pager.YieldFromList(
        self.snapshot_schedule_policies_service,
        request,
        limit=limit,
        batch_size_attribute='pageSize',
        batch_size=page_size,
        field='snapshotSchedulePolicies',
    )

  def GetSnapshotSchedulePolicy(self, resource):
    request = self.messages.BaremetalsolutionProjectsLocationsSnapshotSchedulePoliciesGetRequest(
        name=resource.RelativeName()
    )
    return self.snapshot_schedule_policies_service.Get(request)

  def CreateSnapshotSchedulePolicy(
      self, policy_resource, labels, description, schedules
  ):
    """Sends request to create a new Snapshot Schedule Policy."""
    policy_id = policy_resource.Name()
    parent = policy_resource.Parent().RelativeName()
    schedule_msgs = self._ParseSnapshotSchedules(schedules)
    policy_msg = self.messages.SnapshotSchedulePolicy(
        description=description, schedules=schedule_msgs, labels=labels
    )
    request = self.messages.BaremetalsolutionProjectsLocationsSnapshotSchedulePoliciesCreateRequest(
        parent=parent,
        snapshotSchedulePolicyId=policy_id,
        snapshotSchedulePolicy=policy_msg,
    )
    return self.snapshot_schedule_policies_service.Create(request)

  def UpdateSnapshotSchedulePolicy(
      self, policy_resource, description, labels, schedules
  ):
    """Sends request to update an existing SnapshotSchedulePolicy."""
    updated_fields = []
    if description:
      updated_fields.append('description')
    if labels is not None:
      updated_fields.append('labels')
    schedule_msgs = self._ParseSnapshotSchedules(schedules)
    if schedule_msgs:
      updated_fields.append('schedules')

    update_mask = ','.join(updated_fields)
    policy_msg = self.messages.SnapshotSchedulePolicy(
        description=description, schedules=schedule_msgs, labels=labels
    )
    request = self.messages.BaremetalsolutionProjectsLocationsSnapshotSchedulePoliciesPatchRequest(
        name=policy_resource.RelativeName(),
        snapshotSchedulePolicy=policy_msg,
        updateMask=update_mask,
    )
    return self.snapshot_schedule_policies_service.Patch(request)

  def _ParseSnapshotSchedules(self, schedules):
    """Parses schedule ArgDict dicts into a list of Schedule messages."""
    schedule_msgs = []
    if schedules:
      for schedule_arg in schedules:
        schedule_msgs.append(
            self.messages.Schedule(
                crontabSpec=schedule_arg['crontab_spec'],
                retentionCount=schedule_arg['retention_count'],
                prefix=schedule_arg['prefix'],
            )
        )
    return schedule_msgs

  def DeleteSnapshotSchedulePolicy(self, resource):
    request = self.messages.BaremetalsolutionProjectsLocationsSnapshotSchedulePoliciesDeleteRequest(
        name=resource.RelativeName()
    )
    return self.snapshot_schedule_policies_service.Delete(request)

  def ListVolumes(self, parent, limit=None, page_size=None):
    request = (
        self.messages.BaremetalsolutionProjectsLocationsVolumesListRequest(
            parent=parent
        )
    )
    return list_pager.YieldFromList(
        self.volumes_service,
        request,
        limit=limit,
        batch_size_attribute='pageSize',
        batch_size=page_size,
        field='volumes',
    )

  def UpdateVolume(
      self,
      volume_resource,
      labels,
      snapshot_schedule_policy_resource,
      remove_snapshot_schedule_policy,
      snapshot_auto_delete,
  ):
    """Update an existing volume resource."""
    updated_fields = []
    policy_name = None
    if snapshot_schedule_policy_resource:
      updated_fields.append('snapshotSchedulePolicy')
      policy_name = snapshot_schedule_policy_resource.RelativeName()
    elif remove_snapshot_schedule_policy:
      updated_fields.append('snapshotSchedulePolicy')
    if labels is not None:
      updated_fields.append('labels')
    if snapshot_auto_delete:
      updated_fields.append('snapshotAutoDeleteBehavior')

    volume_msg = self.messages.Volume(
        name=volume_resource.RelativeName(),
        snapshotAutoDeleteBehavior=snapshot_auto_delete,
        snapshotSchedulePolicy=policy_name,
        labels=labels,
    )

    request = (
        self.messages.BaremetalsolutionProjectsLocationsVolumesPatchRequest(
            name=volume_resource.RelativeName(),
            volume=volume_msg,
            updateMask=','.join(updated_fields),
        )
    )

    return self.volumes_service.Patch(request)

  def GetVolume(self, resource):
    request = self.messages.BaremetalsolutionProjectsLocationsVolumesGetRequest(
        name=resource.RelativeName()
    )
    return self.volumes_service.Get(request)

  def RenameVolume(self, volume_resource, new_name):
    """Rename an existing volume resource."""
    rename_volume_request = self.messages.RenameVolumeRequest(
        newVolumeId=new_name
    )
    request = (
        self.messages.BaremetalsolutionProjectsLocationsVolumesRenameRequest(
            name=volume_resource.RelativeName(),
            renameVolumeRequest=rename_volume_request,
        )
    )
    return self.volumes_service.Rename(request)

  def ListNetworks(self, parent, limit=None, page_size=None):
    request = (
        self.messages.BaremetalsolutionProjectsLocationsNetworksListRequest(
            parent=parent
        )
    )
    return list_pager.YieldFromList(
        self.networks_service,
        request,
        limit=limit,
        batch_size_attribute='pageSize',
        batch_size=page_size,
        field='networks',
    )

  def ListOSImages(self, project_resource, limit=None, page_size=None):
    parent = 'projects/%s/locations/global' % project_resource
    request = (
        self.messages.BaremetalsolutionProjectsLocationsOsImagesListRequest(
            parent=parent
        )
    )
    return list_pager.YieldFromList(
        self.os_images_service,
        request,
        limit=limit,
        batch_size_attribute='pageSize',
        batch_size=page_size,
        field='osImages',
    )

  def GetOSImage(self, resource):
    request = (
        self.messages.BaremetalsolutionProjectsLocationsOsImagesGetRequest(
            name=resource.RelativeName()
        )
    )
    return self.os_images_service.Get(request)

  def GetNetwork(self, resource):
    request = (
        self.messages.BaremetalsolutionProjectsLocationsNetworksGetRequest(
            name=resource.RelativeName()
        )
    )
    return self.networks_service.Get(request)

  def UpdateNetwork(self, network_resource, labels, ip_reservations):
    """Update an existing network resource."""
    updated_fields = []
    ip_reservations_messages = []
    if labels is not None:
      updated_fields.append('labels')
    if ip_reservations is not None:
      updated_fields.append('reservations')
      for ip_reservation in ip_reservations:
        ip_reservations_messages.append(
            self.messages.NetworkAddressReservation(
                startAddress=ip_reservation.start_address,
                endAddress=ip_reservation.end_address,
                note=ip_reservation.note,
            )
        )

    network_msg = self.messages.Network(
        name=network_resource.RelativeName(),
        labels=labels,
        reservations=ip_reservations_messages,
    )

    request = (
        self.messages.BaremetalsolutionProjectsLocationsNetworksPatchRequest(
            name=network_resource.RelativeName(),
            network=network_msg,
            updateMask=','.join(updated_fields),
        )
    )

    return self.networks_service.Patch(request)

  def RenameNetwork(self, network_resource, new_name):
    """Rename an existing network resource."""
    rename_network_request = self.messages.RenameNetworkRequest(
        newNetworkId=new_name
    )
    request = (
        self.messages.BaremetalsolutionProjectsLocationsNetworksRenameRequest(
            name=network_resource.RelativeName(),
            renameNetworkRequest=rename_network_request,
        )
    )
    return self.networks_service.Rename(request)

  def IsClientNetwork(self, network):
    return network.type == self.messages.Network.TypeValueValuesEnum.CLIENT

  def IsPrivateNetwork(self, network):
    return network.type == self.messages.Network.TypeValueValuesEnum.PRIVATE

  def IsClientLogicalNetworkInterface(self, logical_network_interface):
    return (
        logical_network_interface.networkType
        == self.messages.LogicalNetworkInterface.NetworkTypeValueValuesEnum.CLIENT
    )

  def IsPrivateLogicalNetworkInterface(self, logical_network_interface):
    return (
        logical_network_interface.networkType
        == self.messages.LogicalNetworkInterface.NetworkTypeValueValuesEnum.PRIVATE
    )

  def ListLUNsForVolume(self, volume_resource, limit=None, page_size=None):
    parent = volume_resource.RelativeName()
    request = (
        self.messages.BaremetalsolutionProjectsLocationsVolumesLunsListRequest(
            parent=parent
        )
    )
    return list_pager.YieldFromList(
        self.luns_service,
        request,
        limit=limit,
        batch_size_attribute='pageSize',
        batch_size=page_size,
        field='luns',
    )

  def GetLUN(self, resource):
    request = (
        self.messages.BaremetalsolutionProjectsLocationsVolumesLunsGetRequest(
            name=resource.RelativeName()
        )
    )
    return self.luns_service.Get(request)

  def ListSnapshotsForVolume(self, volume_resource, limit=None, page_size=None):
    parent = volume_resource.RelativeName()
    request = self.messages.BaremetalsolutionProjectsLocationsVolumesSnapshotsListRequest(
        parent=parent
    )
    return list_pager.YieldFromList(
        self.snapshots_service,
        request,
        limit=limit,
        batch_size_attribute='pageSize',
        batch_size=page_size,
        field='volumeSnapshots',
    )

  def GetVolumeSnapshot(self, resource):
    request = self.messages.BaremetalsolutionProjectsLocationsVolumesSnapshotsGetRequest(
        name=resource.RelativeName()
    )
    return self.snapshots_service.Get(request)

  def CreateVolumeSnapshot(self, resource, name, description):
    request = self.messages.BaremetalsolutionProjectsLocationsVolumesSnapshotsCreateRequest(
        parent=resource.RelativeName(),
        volumeSnapshot=self.messages.VolumeSnapshot(
            name=name, description=description
        ),
    )
    return self.snapshots_service.Create(request)

  def DeleteVolumeSnapshot(self, resource):
    request = self.messages.BaremetalsolutionProjectsLocationsVolumesSnapshotsDeleteRequest(
        name=resource.RelativeName()
    )
    return self.snapshots_service.Delete(request)

  def RestoreVolumeSnapshot(self, snapshot_name):
    request = self.messages.BaremetalsolutionProjectsLocationsVolumesSnapshotsRestoreVolumeSnapshotRequest(  # pylint: disable=line-too-long
        volumeSnapshot=snapshot_name
    )
    return self.snapshots_service.RestoreVolumeSnapshot(request)

  def GetNfsShare(self, resource):
    request = (
        self.messages.BaremetalsolutionProjectsLocationsNfsSharesGetRequest(
            name=resource.RelativeName()
        )
    )
    return self.nfs_shares_service.Get(request)

  def ListNfsShares(self, parent, limit=None, page_size=None):
    request = (
        self.messages.BaremetalsolutionProjectsLocationsNfsSharesListRequest(
            parent=parent
        )
    )
    return list_pager.YieldFromList(
        self.nfs_shares_service,
        request,
        limit=limit,
        batch_size_attribute='pageSize',
        batch_size=page_size,
        field='nfsShares',
    )

  def UpdateNfsShare(self, nfs_share_resource, labels, allowed_clients):
    """Update an existing nfs share resource."""
    updated_fields = []
    updated_allowed_clients = []
    if labels is not None:
      updated_fields.append('labels')
    if allowed_clients is not None:
      updated_fields.append('allowedClients')
      updated_allowed_clients = allowed_clients

    nfs_share_msg = self.messages.NfsShare(
        name=nfs_share_resource.RelativeName(),
        labels=labels,
        allowedClients=updated_allowed_clients,
    )

    request = (
        self.messages.BaremetalsolutionProjectsLocationsNfsSharesPatchRequest(
            name=nfs_share_resource.RelativeName(),
            nfsShare=nfs_share_msg,
            updateMask=','.join(updated_fields),
        )
    )

    return self.nfs_shares_service.Patch(request)

  def DeleteNfsShare(self, nfs_share_resource):
    """Delete an existing nfs share resource."""
    request = (
        self.messages.BaremetalsolutionProjectsLocationsNfsSharesDeleteRequest(
            name=nfs_share_resource.RelativeName()
        )
    )
    return self.nfs_shares_service.Delete(request)

  def CreateNfsShare(
      self,
      nfs_share_resource,
      size_gib,
      storage_type,
      allowed_clients_dicts,
      labels,
  ):
    """Create an NFS share resource."""
    allowed_clients = self.ParseAllowedClientsDicts(
        nfs_share_resource=nfs_share_resource,
        allowed_clients_dicts=allowed_clients_dicts,
    )
    nfs_share_msg = self.messages.NfsShare(
        name=nfs_share_resource.RelativeName(),
        requestedSizeGib=size_gib,
        storageType=self.nfs_storage_type_str_to_message[storage_type],
        allowedClients=allowed_clients,
        labels=labels,
    )
    request = (
        self.messages.BaremetalsolutionProjectsLocationsNfsSharesCreateRequest(
            nfsShare=nfs_share_msg,
            parent=nfs_share_resource.Parent().RelativeName(),
        )
    )
    return self.nfs_shares_service.Create(request)

  def RenameNfsShare(self, nfs_share_resource, new_name):
    """Rename an existing nfs share resource."""
    rename_nfs_share_request = self.messages.RenameNfsShareRequest(
        newNfsshareId=new_name
    )
    request = (
        self.messages.BaremetalsolutionProjectsLocationsNfsSharesRenameRequest(
            name=nfs_share_resource.RelativeName(),
            renameNfsShareRequest=rename_nfs_share_request,
        )
    )
    return self.nfs_shares_service.Rename(request)

  def ListSshKeys(self, project_resource, limit=None, page_size=None):
    parent = 'projects/%s/locations/global' % project_resource
    request = (
        self.messages.BaremetalsolutionProjectsLocationsSshKeysListRequest(
            parent=parent
        )
    )
    return list_pager.YieldFromList(
        self.ssh_keys_service,
        request,
        limit=limit,
        batch_size_attribute='pageSize',
        batch_size=page_size,
        field='sshKeys',
    )

  def CreateSshKey(self, resource, public_key):
    """Sends request to create an SSH key."""
    request = (
        self.messages.BaremetalsolutionProjectsLocationsSshKeysCreateRequest(
            parent=resource.Parent().RelativeName(),
            sshKeyId=resource.Name(),
            sSHKey=self.messages.SSHKey(publicKey=public_key),
        )
    )
    return self.ssh_keys_service.Create(request)

  def DeleteSshKey(self, resource):
    request = (
        self.messages.BaremetalsolutionProjectsLocationsSshKeysDeleteRequest(
            name=resource.RelativeName()
        )
    )
    return self.ssh_keys_service.Delete(request)

  def ParseAllowedClientsDicts(self, nfs_share_resource, allowed_clients_dicts):
    """Parses NFS share allowed client list of dicts."""
    allowed_clients = []
    for allowed_client in allowed_clients_dicts:
      mount_permissions = self.nfs_mount_permissions_str_to_message[
          allowed_client['mount-permissions']
      ]
      network_full_name = util.NFSNetworkFullName(
          nfs_share_resource=nfs_share_resource,
          allowed_client_dict=allowed_client,
      )
      allowed_clients.append(
          self.messages.AllowedClient(
              network=network_full_name,
              allowedClientsCidr=allowed_client['cidr'],
              mountPermissions=mount_permissions,
              allowDev=allowed_client['allow-dev'],
              allowSuid=allowed_client['allow-suid'],
              noRootSquash=not allowed_client['enable-root-squash'],
          )
      )
    return allowed_clients