HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/command_lib/vmware/clusters/util.py
# -*- coding: utf-8 -*- #
# Copyright 2023 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utils for VMware Engine private-clouds clusters commands.

Provides helpers for parsing the autoscaling settings and node type configs and
for combining settings from many sources together.
"""

from __future__ import absolute_import
from __future__ import annotations
from __future__ import division
from __future__ import unicode_literals

import collections
import dataclasses
from typing import Any, Dict, List, Union

from googlecloudsdk.core import exceptions


@dataclasses.dataclass(frozen=True)
class ScalingThresholds:
  """Scaling thresholds for a single condition. Uses None for empty values.

  Attributes:
    scale_in: The threshold for scaling in.
    scale_out: The threshold for scaling out.
  """

  scale_in: int
  scale_out: int


def _MergeFields(left, right):
  """Merges two fields, favoring right one.

  Args:
    left: First field.
    right: Second field.

  Returns:
    Merged field.
  """
  return right if right is not None else left


def _MergeScalingThresholds(
    left: ScalingThresholds | None, right: ScalingThresholds | None
) -> ScalingThresholds | None:
  """Merges two ScalingThresholds objects, favoring right one.

  Args:
    left: First ScalingThresholds object.
    right: Second ScalingThresholds object.

  Returns:
    Merged ScalingThresholds - It contains the updated scale_in and scale_out
    values, favoring the right one.
    None - It indicates removal of threshold from autoscaling policy, favoring
    right one. Therefore, if right is None, return None.

  """
  if left is None:
    return right
  if right is None:
    return None

  return ScalingThresholds(
      scale_in=_MergeFields(left.scale_in, right.scale_in),
      scale_out=_MergeFields(left.scale_out, right.scale_out),
  )


@dataclasses.dataclass(frozen=True)
class AutoscalingPolicy:
  """Represents the autoscaling policy for a single node type.

  Uses None for empty settings.

  Attributes:
    node_type_id: The node type id.
    scale_out_size: The size of a single scale out operation.
    min_node_count: The minimum number of nodes of this type in the cluster.
    max_node_count: The maximum number of nodes of this type in the cluster.
    cpu_thresholds: The CPU thresholds.
    granted_memory_thresholds: The granted memory thresholds.
    consumed_memory_thresholds: The consumed memory thresholds.
    storage_thresholds: The storage thresholds.
  """

  node_type_id: str
  scale_out_size: int
  min_node_count: int
  max_node_count: int
  cpu_thresholds: ScalingThresholds
  granted_memory_thresholds: ScalingThresholds
  consumed_memory_thresholds: ScalingThresholds
  storage_thresholds: ScalingThresholds


def _MergeAutoscalingPolicies(
    left: AutoscalingPolicy,
    right: AutoscalingPolicy,
) -> AutoscalingPolicy:
  """Merges two AutoscalingPolicy objects, favoring right one.

  Args:
    left: First AutoscalingPolicy object.
    right: Second AutoscalingPolicy object.

  Returns:
    Merged AutoscalingPolicy.
  """
  if left is None:
    return right
  if right is None:
    return left

  return AutoscalingPolicy(
      node_type_id=_MergeFields(left.node_type_id, right.node_type_id),
      scale_out_size=_MergeFields(left.scale_out_size, right.scale_out_size),
      min_node_count=_MergeFields(left.min_node_count, right.min_node_count),
      max_node_count=_MergeFields(left.max_node_count, right.max_node_count),
      cpu_thresholds=_MergeScalingThresholds(
          left.cpu_thresholds, right.cpu_thresholds
      ),
      granted_memory_thresholds=_MergeScalingThresholds(
          left.granted_memory_thresholds, right.granted_memory_thresholds
      ),
      consumed_memory_thresholds=_MergeScalingThresholds(
          left.consumed_memory_thresholds, right.consumed_memory_thresholds
      ),
      storage_thresholds=_MergeScalingThresholds(
          left.storage_thresholds, right.storage_thresholds
      ),
  )


@dataclasses.dataclass(frozen=True)
class AutoscalingSettings:
  """Represents the autoscaling settings for a private-cloud cluster.

  Uses None for empty settings.

  Attributes:
    min_cluster_node_count: The minimum number of nodes in the cluster.
    max_cluster_node_count: The maximum number of nodes in the cluster.
    cool_down_period: The cool down period for autoscaling.
    autoscaling_policies: The autoscaling policies for each node type.
  """

  min_cluster_node_count: int
  max_cluster_node_count: int
  cool_down_period: str
  autoscaling_policies: Dict[str, AutoscalingPolicy]


def MergeAutoscalingSettings(
    left: AutoscalingSettings, right: AutoscalingSettings
) -> AutoscalingSettings:
  """Merges two AutoscalingSettings objects, favoring right one.

  Args:
    left: First AutoscalingSettings object.
    right: Second AutoscalingSettings object.

  Returns:
    Merged AutoscalingSettings.
  """
  if left is None:
    return right
  if right is None:
    return left

  policies = {}
  for policy_name, policy in left.autoscaling_policies.items():
    if policy_name in right.autoscaling_policies:
      policies[policy_name] = _MergeAutoscalingPolicies(
          policy, right.autoscaling_policies[policy_name]
      )
    else:
      policies[policy_name] = policy

  for policy_name, policy in right.autoscaling_policies.items():
    if policy_name not in left.autoscaling_policies:
      policies[policy_name] = policy

  return AutoscalingSettings(
      min_cluster_node_count=_MergeFields(
          left.min_cluster_node_count, right.min_cluster_node_count
      ),
      max_cluster_node_count=_MergeFields(
          left.max_cluster_node_count, right.max_cluster_node_count
      ),
      cool_down_period=_MergeFields(
          left.cool_down_period, right.cool_down_period
      ),
      autoscaling_policies=policies,
  )


class InvalidNodeConfigsProvidedError(exceptions.Error):

  def __init__(self, details):
    super(InvalidNodeConfigsProvidedError, self).__init__(
        f'INVALID_ARGUMENT: {details}'
    )


class InvalidAutoscalingSettingsProvidedError(exceptions.Error):

  def __init__(self, details):
    super(InvalidAutoscalingSettingsProvidedError, self).__init__(
        f'INVALID_ARGUMENT: {details}'
    )


NodeTypeConfig = collections.namedtuple(
    typename='NodeTypeConfig',
    field_names=['type', 'count', 'custom_core_count'],
)


def FindDuplicatedTypes(types):
  type_counts = collections.Counter(types)
  return [node_type for node_type, count in type_counts.items() if count > 1]


def ParseNodesConfigsParameters(nodes_configs):
  requested_node_types = [config['type'] for config in nodes_configs]

  duplicated_types = FindDuplicatedTypes(requested_node_types)
  if duplicated_types:
    raise InvalidNodeConfigsProvidedError(
        'types: {} provided more than once.'.format(duplicated_types)
    )

  return [
      NodeTypeConfig(
          config['type'], config['count'], config.get('custom-core-count', 0)
      )
      for config in nodes_configs
  ]


def ParseAutoscalingSettingsFromInlinedFormat(
    min_cluster_node_count: int,
    max_cluster_node_count: int,
    cool_down_period: str,
    autoscaling_policies: List[Dict[str, Union[str, int]]],
) -> AutoscalingSettings:
  """Parses inlined autoscaling settings (passed as CLI arguments).

  The resulting object can later be passed to
  googlecloudsdk.api_lib.vmware.util.ConstructAutoscalingSettingsMessage.

  Args:
    min_cluster_node_count: autoscaling-min-cluster-node-count CLI argument.
    max_cluster_node_count: autoscaling-max-cluster-node-count CLI argument.
    cool_down_period: autoscaling-cool-down-period CLI argument.
    autoscaling_policies: list of update-autoscaling-policy CLI arguments.

  Returns:
    Equivalent AutoscalingSettings instance.
  """
  parsed_settings = AutoscalingSettings(
      min_cluster_node_count=min_cluster_node_count,
      max_cluster_node_count=max_cluster_node_count,
      cool_down_period=cool_down_period,
      autoscaling_policies={},
  )

  for policy in autoscaling_policies:
    parsed_policy = AutoscalingPolicy(
        node_type_id=policy.get('node-type-id'),
        scale_out_size=policy.get('scale-out-size'),
        min_node_count=policy.get('min-node-count'),
        max_node_count=policy.get('max-node-count'),
        cpu_thresholds=_AutoscalingThresholdsFromPolicy(
            policy, 'cpu-thresholds'
        ),
        granted_memory_thresholds=_AutoscalingThresholdsFromPolicy(
            policy, 'granted-memory-thresholds'
        ),
        consumed_memory_thresholds=_AutoscalingThresholdsFromPolicy(
            policy, 'consumed-memory-thresholds'
        ),
        storage_thresholds=_AutoscalingThresholdsFromPolicy(
            policy, 'storage-thresholds'
        ),
    )

    parsed_settings.autoscaling_policies[policy['name']] = parsed_policy

  return parsed_settings


def _AutoscalingThresholdsFromPolicy(
    policy: Dict[str, Union[str, int]], threshold: str
) -> ScalingThresholds:
  scale_in = policy.get(f'{threshold}-scale-in')
  scale_out = policy.get(f'{threshold}-scale-out')
  if scale_in is None and scale_out is None:
    return None
  return ScalingThresholds(scale_in=scale_in, scale_out=scale_out)


def _ValidateIfOnlySupportedKeysArePassed(
    keys: List[str], supported_keys: List[str]
):
  for key in keys:
    if key not in supported_keys:
      raise InvalidAutoscalingSettingsProvidedError(
          'unsupported key: {key}, supported keys are: {supported_keys}'.format(
              key=key, supported_keys=supported_keys
          )
      )


def ParseAutoscalingSettingsFromFileFormat(
    cluster: Dict[str, Any]
) -> AutoscalingSettings:
  """Parses the autoscaling settings from the format returned by  the describe command.

  The resulting object can later be passed to
  googlecloudsdk.api_lib.vmware.util.ConstructAutoscalingSettingsMessage.

  Args:
    cluster: dictionary with the settings. Parsed from a file provided by user.

  Returns:
    Equivalent AutoscalingSettings instance.

  Raises:
    InvalidAutoscalingSettingsProvidedError: if the file format was wrong.
  """

  def _ParseThresholds(thresholds_dict):
    if thresholds_dict is None:
      return None

    _ValidateIfOnlySupportedKeysArePassed(
        thresholds_dict.keys(), ['scaleIn', 'scaleOut']
    )

    return ScalingThresholds(
        scale_in=thresholds_dict.get('scaleIn'),
        scale_out=thresholds_dict.get('scaleOut'),
    )

  _ValidateIfOnlySupportedKeysArePassed(cluster.keys(), ['autoscalingSettings'])
  if 'autoscalingSettings' not in cluster:
    raise InvalidAutoscalingSettingsProvidedError(
        'autoscalingSettings not provided in the file'
    )
  autoscaling_settings = cluster['autoscalingSettings']

  _ValidateIfOnlySupportedKeysArePassed(
      autoscaling_settings.keys(),
      [
          'minClusterNodeCount',
          'maxClusterNodeCount',
          'coolDownPeriod',
          'autoscalingPolicies',
      ],
  )
  parsed_settings = AutoscalingSettings(
      min_cluster_node_count=autoscaling_settings.get('minClusterNodeCount'),
      max_cluster_node_count=autoscaling_settings.get('maxClusterNodeCount'),
      cool_down_period=autoscaling_settings.get('coolDownPeriod'),
      autoscaling_policies={},
  )

  if 'autoscalingPolicies' not in autoscaling_settings:
    return parsed_settings

  for policy_name, policy_settings in autoscaling_settings[
      'autoscalingPolicies'
  ].items():
    _ValidateIfOnlySupportedKeysArePassed(
        policy_settings.keys(),
        [
            'nodeTypeId',
            'scaleOutSize',
            'minNodeCount',
            'maxNodeCount',
            'cpuThresholds',
            'grantedMemoryThresholds',
            'consumedMemoryThresholds',
            'storageThresholds',
        ],
    )
    parsed_policy = AutoscalingPolicy(
        node_type_id=policy_settings.get('nodeTypeId'),
        scale_out_size=policy_settings.get('scaleOutSize'),
        min_node_count=policy_settings.get('minNodeCount'),
        max_node_count=policy_settings.get('maxNodeCount'),
        cpu_thresholds=_ParseThresholds(policy_settings.get('cpuThresholds')),
        granted_memory_thresholds=_ParseThresholds(
            policy_settings.get('grantedMemoryThresholds')
        ),
        consumed_memory_thresholds=_ParseThresholds(
            policy_settings.get('consumedMemoryThresholds')
        ),
        storage_thresholds=_ParseThresholds(
            policy_settings.get('storageThresholds')
        ),
    )
    parsed_settings.autoscaling_policies[policy_name] = parsed_policy

  return parsed_settings


def ParseAutoscalingSettingsFromApiFormat(
    cluster_message,
) -> AutoscalingSettings:
  """Parses the autoscaling settings from the format returned by the describe command.

  The resulting object can later be passed to
  googlecloudsdk.api_lib.vmware.util.ConstructAutoscalingSettingsMessage.

  Args:
    cluster_message: cluster object with the autoscaling settings.

  Returns:
    Equivalent AutoscalingSettings istance.
  """
  if cluster_message.autoscalingSettings is None:
    return None

  autoscaling_settings = cluster_message.autoscalingSettings

  parsed_settings = AutoscalingSettings(
      min_cluster_node_count=autoscaling_settings.minClusterNodeCount,
      max_cluster_node_count=autoscaling_settings.maxClusterNodeCount,
      cool_down_period=autoscaling_settings.coolDownPeriod,
      autoscaling_policies={},
  )

  for item in autoscaling_settings.autoscalingPolicies.additionalProperties:
    policy_name, policy_settings = item.key, item.value

    def _ParseThresholds(thresholds):
      if thresholds is None:
        return None
      return ScalingThresholds(
          scale_in=thresholds.scaleIn,
          scale_out=thresholds.scaleOut,
      )

    parsed_policy = AutoscalingPolicy(
        node_type_id=policy_settings.nodeTypeId,
        scale_out_size=policy_settings.scaleOutSize,
        min_node_count=policy_settings.minNodeCount,
        max_node_count=policy_settings.maxNodeCount,
        cpu_thresholds=_ParseThresholds(policy_settings.cpuThresholds),
        granted_memory_thresholds=_ParseThresholds(
            policy_settings.grantedMemoryThresholds
        ),
        consumed_memory_thresholds=_ParseThresholds(
            policy_settings.consumedMemoryThresholds
        ),
        storage_thresholds=_ParseThresholds(policy_settings.storageThresholds),
    )
    parsed_settings.autoscaling_policies[policy_name] = parsed_policy

  return parsed_settings