HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/396/lib/googlecloudsdk/api_lib/vmware/util.py
# -*- coding: utf-8 -*- #
# Copyright 2021 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Cloud vmware API utilities."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import datetime

from googlecloudsdk.api_lib.util import apis
from googlecloudsdk.api_lib.util import waiter
from googlecloudsdk.core import resources


_DEFAULT_API_VERSION = 'v1'


class VmwareClientBase(object):
  """Base class for vwmare API client wrappers."""

  def __init__(self, api_version=_DEFAULT_API_VERSION):
    self._client = apis.GetClientInstance('vmwareengine', api_version)
    self._messages = apis.GetMessagesModule('vmwareengine', api_version)
    self.service = None
    self.operations_service = self.client.projects_locations_operations

  @property
  def client(self):
    return self._client

  @property
  def messages(self):
    return self._messages

  def GetOperationRef(self, operation):
    """Converts an Operation to a Resource that can be used with `waiter.WaitFor`.
    """
    return resources.REGISTRY.ParseRelativeName(
        operation.name, collection='vmwareengine.projects.locations.operations')

  def WaitForOperation(self,
                       operation_ref,
                       message,
                       has_result=True,
                       max_wait=datetime.timedelta(seconds=3600)):
    """Waits for an operation to complete.

    Polls the IDS Operation service until the operation completes, fails, or
    max_wait_seconds elapses.

    Args:
      operation_ref: a Resource created by GetOperationRef describing the
        operation.
      message: the message to display to the user while they wait.
      has_result: if True, the function will return the target of the operation
        when it completes. If False, nothing will be returned (useful for Delete
        operations)
      max_wait: The time to wait for the operation to succeed before returning.

    Returns:
      if has_result = True, an Endpoint entity.
      Otherwise, None.
    """
    if has_result:
      poller = waiter.CloudOperationPoller(self.service,
                                           self.operations_service)
    else:
      poller = waiter.CloudOperationPollerNoResources(self.operations_service)

    return waiter.WaitFor(
        poller, operation_ref, message, max_wait_ms=max_wait.seconds * 1000)

  def GetResponse(self, operation):
    poller = waiter.CloudOperationPoller(self.service, self.operations_service)
    return poller.GetResult(operation)


def GetResourceId(resource_name):
  return resource_name.split('/')[-1]


def ConstructNodeParameterConfigMessage(map_class, config_class, nodes_configs):
  """Constructs a node configs API message.

  Args:
    map_class: The map message class.
    config_class: The config (map-entry) message class.
    nodes_configs: The list of nodes configurations.

  Returns:
    The constructed message.
  """
  properties = []
  for nodes_config in nodes_configs:
    if nodes_config.count == 0:
      continue

    node_type_config = config_class(nodeCount=nodes_config.count)
    if nodes_config.custom_core_count > 0:
      node_type_config.customCoreCount = nodes_config.custom_core_count

    prop = map_class.AdditionalProperty(
        key=nodes_config.type, value=node_type_config
    )
    properties.append(prop)
  return map_class(additionalProperties=properties)


def ConstructAutoscalingSettingsMessage(
    settings_message_class,
    policy_message_class,
    thresholds_message_class,
    autoscaling_settings,
):
  """Constructs autoscaling settings API message.

  Args:
    settings_message_class: Top-level autoscaling settings message class.
    policy_message_class: Autoscaling policy message class.
    thresholds_message_class: Autoscaling policy thresholds message class.
    autoscaling_settings: Desired autoscaling settings.

  Returns:
    The constructed message.
  """
  if not autoscaling_settings:
    return None

  settings_message = settings_message_class()
  settings_message.minClusterNodeCount = (
      autoscaling_settings.min_cluster_node_count
  )
  settings_message.maxClusterNodeCount = (
      autoscaling_settings.max_cluster_node_count
  )
  settings_message.coolDownPeriod = autoscaling_settings.cool_down_period

  policy_messages = {}
  for name, policy in autoscaling_settings.autoscaling_policies.items():
    policy_message = policy_message_class()
    policy_message.nodeTypeId = policy.node_type_id
    policy_message.scaleOutSize = policy.scale_out_size
    policy_message.minNodeCount = policy.min_node_count
    policy_message.maxNodeCount = policy.max_node_count

    policy_message.cpuThresholds = _ConstructThresholdsMessage(
        policy.cpu_thresholds, thresholds_message_class
    )
    policy_message.grantedMemoryThresholds = _ConstructThresholdsMessage(
        policy.granted_memory_thresholds, thresholds_message_class
    )
    policy_message.consumedMemoryThresholds = _ConstructThresholdsMessage(
        policy.consumed_memory_thresholds, thresholds_message_class
    )
    policy_message.storageThresholds = _ConstructThresholdsMessage(
        policy.storage_thresholds, thresholds_message_class
    )

    policy_messages[name] = policy_message

  settings_message.autoscalingPolicies = settings_message_class.AutoscalingPoliciesValue(
      additionalProperties=[
          settings_message_class.AutoscalingPoliciesValue.AdditionalProperty(
              key=name, value=policy_message
          ) for name, policy_message in policy_messages.items()
      ]
  )
  return settings_message


def _ConstructThresholdsMessage(thresholds, thresholds_message_class):
  thresholds_message = thresholds_message_class()
  if thresholds is None:
    return None
  thresholds_message.scaleIn = thresholds.scale_in
  thresholds_message.scaleOut = thresholds.scale_out
  return thresholds_message