HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/api_lib/container/gkeonprem/vmware_node_pools.py
# -*- coding: utf-8 -*- #
# Copyright 2022 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilities for node pool resources in Anthos clusters on VMware."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

from typing import Generator

from apitools.base.py import list_pager
from googlecloudsdk.api_lib.container.gkeonprem import client
from googlecloudsdk.api_lib.container.gkeonprem import update_mask
from googlecloudsdk.calliope import parser_extensions
from googlecloudsdk.command_lib.container.vmware import flags
from googlecloudsdk.generated_clients.apis.gkeonprem.v1 import gkeonprem_v1_messages as messages


class NodePoolsClient(client.ClientBase):
  """Client for node pools in Anthos clusters on VMware API."""

  def __init__(self, **kwargs):
    super(NodePoolsClient, self).__init__(**kwargs)
    self._service = (
        self._client.projects_locations_vmwareClusters_vmwareNodePools
    )

  def List(
      self, args: parser_extensions.Namespace
  ) -> Generator[messages.VmwareNodePool, None, None]:
    """Lists Node Pools in the Anthos clusters on VMware API."""
    list_req = messages.GkeonpremProjectsLocationsVmwareClustersVmwareNodePoolsListRequest(
        parent=self._user_cluster_name(args)
    )
    return list_pager.YieldFromList(
        self._service,
        list_req,
        field='vmwareNodePools',
        batch_size=flags.Get(args, 'page_size'),
        limit=flags.Get(args, 'limit'),
        batch_size_attribute='pageSize',
    )

  def Delete(self, args: parser_extensions.Namespace) -> messages.Operation:
    """Deletes a gkeonprem node pool API resource."""
    kwargs = {
        'allowMissing': flags.Get(args, 'allow_missing'),
        'etag': flags.Get(args, 'etag'),
        'name': self._node_pool_name(args),
        'validateOnly': flags.Get(args, 'validate_only'),
        'ignoreErrors': flags.Get(args, 'ignore_errors'),
    }
    req = messages.GkeonpremProjectsLocationsVmwareClustersVmwareNodePoolsDeleteRequest(
        **kwargs
    )
    return self._service.Delete(req)

  def Create(self, args: parser_extensions.Namespace) -> messages.Operation:
    """Creates a gkeonprem node pool API resource."""
    node_pool_ref = self._node_pool_ref(args)
    kwargs = {
        'parent': node_pool_ref.Parent().RelativeName(),
        'validateOnly': flags.Get(args, 'validate_only'),
        'vmwareNodePool': self._vmware_node_pool(args),
        'vmwareNodePoolId': self._node_pool_id(args),
    }
    req = messages.GkeonpremProjectsLocationsVmwareClustersVmwareNodePoolsCreateRequest(
        **kwargs
    )
    return self._service.Create(req)

  def Update(self, args: parser_extensions.Namespace) -> messages.Operation:
    """Updates a gkeonprem node pool API resource."""
    kwargs = {
        'allowMissing': flags.Get(args, 'allow_missing'),
        'name': self._node_pool_name(args),
        'updateMask': update_mask.get_update_mask(
            args, update_mask.VMWARE_NODE_POOL_ARGS_TO_UPDATE_MASKS
        ),
        'validateOnly': flags.Get(args, 'validate_only'),
        'vmwareNodePool': self._vmware_node_pool(args),
    }
    req = messages.GkeonpremProjectsLocationsVmwareClustersVmwareNodePoolsPatchRequest(
        **kwargs
    )
    return self._service.Patch(req)

  def Enroll(self, args: parser_extensions.Namespace) -> messages.Operation:
    """Enrolls an Anthos on VMware node pool API resource."""
    enroll_vmware_node_pool_request = messages.EnrollVmwareNodePoolRequest(
        vmwareNodePoolId=self._node_pool_id(args),
    )
    req = messages.GkeonpremProjectsLocationsVmwareClustersVmwareNodePoolsEnrollRequest(
        enrollVmwareNodePoolRequest=enroll_vmware_node_pool_request,
        parent=self._node_pool_parent(args),
    )
    return self._service.Enroll(req)

  def Unenroll(self, args: parser_extensions.Namespace) -> messages.Operation:
    """Unenrolls an Anthos on VMware node pool API resource."""
    kwargs = {
        'allowMissing': flags.Get(args, 'allow_missing'),
        'etag': flags.Get(args, 'etag'),
        'name': self._node_pool_name(args),
        'validateOnly': flags.Get(args, 'validate_only'),
    }
    req = messages.GkeonpremProjectsLocationsVmwareClustersVmwareNodePoolsUnenrollRequest(
        **kwargs
    )
    return self._service.Unenroll(req)

  def _vmware_node_pool(
      self, args: parser_extensions.Namespace
  ) -> messages.VmwareNodePool:
    """Constructs proto message VmwareNodePool."""
    kwargs = {
        'name': self._node_pool_name(args),
        'displayName': flags.Get(args, 'display_name'),
        'annotations': self._annotations(args),
        'config': self._vmware_node_config(args),
        'onPremVersion': flags.Get(args, 'version'),
        'nodePoolAutoscaling': self._vmware_node_pool_autoscaling_config(args),
    }
    return messages.VmwareNodePool(**kwargs)

  def _enable_load_balancer(self, args: parser_extensions.Namespace):
    if flags.Get(args, 'enable_load_balancer'):
      return True
    if flags.Get(args, 'disable_load_balancer'):
      return False
    return None

  def _node_taints(self, args: parser_extensions.Namespace):
    taint_messages = []
    node_taints = flags.Get(args, 'node_taints', {})
    for node_taint in node_taints.items():
      taint_object = self._parse_node_taint(node_taint)
      taint_messages.append(messages.NodeTaint(**taint_object))
    return taint_messages

  def _labels_value(
      self, args: parser_extensions.Namespace
  ) -> messages.VmwareNodeConfig.LabelsValue:
    """Constructs proto message LabelsValue."""
    node_labels = flags.Get(args, 'node_labels', {})
    additional_property_messages = []
    if not node_labels:
      return None

    for key, value in node_labels.items():
      additional_property_messages.append(
          messages.VmwareNodeConfig.LabelsValue.AdditionalProperty(
              key=key, value=value
          )
      )

    labels_value_message = messages.VmwareNodeConfig.LabelsValue(
        additionalProperties=additional_property_messages
    )

    return labels_value_message

  def _vmware_node_config(self, args: parser_extensions.Namespace):
    """Constructs proto message VmwareNodeConfig."""
    kwargs = {
        'cpus': flags.Get(args, 'cpus'),
        'memoryMb': flags.Get(args, 'memory'),
        'replicas': flags.Get(args, 'replicas'),
        'imageType': flags.Get(args, 'image_type'),
        'image': flags.Get(args, 'image'),
        'bootDiskSizeGb': flags.Get(args, 'boot_disk_size'),
        'taints': self._node_taints(args),
        'labels': self._labels_value(args),
        'vsphereConfig': self._vsphere_config(args),
        'enableLoadBalancer': self._enable_load_balancer(args),
    }
    if flags.IsSet(kwargs):
      return messages.VmwareNodeConfig(**kwargs)
    return None

  def _vsphere_config(self, args: parser_extensions.Namespace):
    """Constructs proto message VmwareVsphereConfig."""
    if 'vsphere_config' not in args.GetSpecifiedArgsDict():
      return None

    kwargs = {
        'datastore': args.vsphere_config.get('datastore', None),
        'storagePolicyName': args.vsphere_config.get(
            'storage-policy-name', None
        ),
    }
    if flags.IsSet(kwargs):
      return messages.VmwareVsphereConfig(**kwargs)
    return None

  def _vmware_node_pool_autoscaling_config(
      self, args: parser_extensions.Namespace
  ):
    """Constructs proto message VmwareNodePoolAutoscalingConfig."""
    kwargs = {
        'minReplicas': flags.Get(args, 'min_replicas'),
        'maxReplicas': flags.Get(args, 'max_replicas'),
    }
    if any(kwargs.values()):
      return messages.VmwareNodePoolAutoscalingConfig(**kwargs)
    return None

  def _annotations(self, args: parser_extensions.Namespace):
    """Constructs proto message AnnotationsValue."""
    annotations = flags.Get(args, 'annotations', {})
    additional_property_messages = []
    if not annotations:
      return None

    for key, value in annotations.items():
      additional_property_messages.append(
          messages.VmwareNodePool.AnnotationsValue.AdditionalProperty(
              key=key, value=value
          )
      )

    annotation_value_message = messages.VmwareNodePool.AnnotationsValue(
        additionalProperties=additional_property_messages
    )
    return annotation_value_message