HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/api_lib/container/gkeonprem/vmware_clusters.py
# -*- coding: utf-8 -*- #
# Copyright 2022 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilities for gkeonprem API clients for VMware cluster resources."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

from typing import Generator, Optional

from apitools.base.py import exceptions as apitools_exceptions
from apitools.base.py import list_pager
from googlecloudsdk.api_lib.container.gkeonprem import client
from googlecloudsdk.api_lib.container.gkeonprem import update_mask
from googlecloudsdk.api_lib.container.vmware import version_util
from googlecloudsdk.calliope import parser_extensions
from googlecloudsdk.command_lib.container.vmware import flags
from googlecloudsdk.core import exceptions
from googlecloudsdk.core import properties
from googlecloudsdk.generated_clients.apis.gkeonprem.v1 import gkeonprem_v1_messages as messages


class ClustersClient(client.ClientBase):
  """Client for clusters in GKE On-Prem VMware API."""

  def __init__(self, **kwargs):
    super(ClustersClient, self).__init__(**kwargs)
    self._service = self._client.projects_locations_vmwareClusters

  def List(
      self, args: parser_extensions.Namespace
  ) -> Generator[messages.VmwareCluster, None, None]:
    """Lists Clusters in the GKE On-Prem VMware API."""
    # Workaround for P4SA: Call query version config first, ignore the result.
    project = (
        args.project if args.project else properties.VALUES.core.project.Get()
    )
    # Hard code location to `us-west1`, because it cannot handle `--location=-`.
    parent = 'projects/{project}/locations/{location}'.format(
        project=project, location='us-west1'
    )
    dummy_request = messages.GkeonpremProjectsLocationsVmwareClustersQueryVersionConfigRequest(
        parent=parent,
    )
    try:
      _ = self._service.QueryVersionConfig(dummy_request)
    except (
        apitools_exceptions.HttpUnauthorizedError,
        apitools_exceptions.HttpForbiddenError,
    ):
      # Ignore unauthorized or forbidden errors. This may happen during the
      # support case when googler inspect customer's project via Google Admin.
      pass

    # If location is not specified, and container_vmware/location is not set,
    # list clusters of all locations within a project.
    if (
        'location' not in args.GetSpecifiedArgsDict()
        and not properties.VALUES.container_vmware.location.Get()
    ):
      args.location = '-'

    list_req = messages.GkeonpremProjectsLocationsVmwareClustersListRequest(
        parent=self._location_name(args)
    )

    return list_pager.YieldFromList(
        self._service,
        list_req,
        field='vmwareClusters',
        batch_size=flags.Get(args, 'page_size'),
        limit=flags.Get(args, 'limit'),
        batch_size_attribute='pageSize',
    )

  def Enroll(self, args: parser_extensions.Namespace) -> messages.Operation:
    """Enrolls a VMware cluster to Anthos."""
    kwargs = {
        'adminClusterMembership': self._admin_cluster_membership_name(args),
        'vmwareClusterId': self._user_cluster_id(args),
        'localName': flags.Get(args, 'local_name'),
        'validateOnly': flags.Get(args, 'validate_only'),
    }
    enroll_vmware_cluster_request = messages.EnrollVmwareClusterRequest(
        **kwargs
    )
    req = messages.GkeonpremProjectsLocationsVmwareClustersEnrollRequest(
        parent=self._user_cluster_parent(args),
        enrollVmwareClusterRequest=enroll_vmware_cluster_request,
    )
    return self._service.Enroll(req)

  def Unenroll(self, args: parser_extensions.Namespace) -> messages.Operation:
    """Unenrolls an Anthos cluster on VMware."""
    kwargs = {
        'name': self._user_cluster_name(args),
        'force': flags.Get(args, 'force'),
        'allowMissing': flags.Get(args, 'allow_missing'),
        'validateOnly': flags.Get(args, 'validate_only'),
    }
    req = messages.GkeonpremProjectsLocationsVmwareClustersUnenrollRequest(
        **kwargs
    )
    return self._service.Unenroll(req)

  def Delete(self, args: parser_extensions.Namespace) -> messages.Operation:
    """Deletes an Anthos cluster on VMware."""
    kwargs = {
        'name': self._user_cluster_name(args),
        'allowMissing': flags.Get(args, 'allow_missing'),
        'validateOnly': flags.Get(args, 'validate_only'),
        'force': flags.Get(args, 'force'),
        'ignoreErrors': flags.Get(args, 'ignore_errors'),
    }
    req = messages.GkeonpremProjectsLocationsVmwareClustersDeleteRequest(
        **kwargs
    )
    return self._service.Delete(req)

  def Create(self, args: parser_extensions.Namespace) -> messages.Operation:
    """Creates an Anthos cluster on VMware."""
    kwargs = {
        'parent': self._user_cluster_parent(args),
        'validateOnly': flags.Get(args, 'validate_only'),
        'vmwareCluster': self._vmware_cluster(args),
        'vmwareClusterId': self._user_cluster_id(args),
    }
    req = messages.GkeonpremProjectsLocationsVmwareClustersCreateRequest(
        **kwargs
    )
    return self._service.Create(req)

  def CreateFromImport(
      self,
      args: parser_extensions.Namespace,
      vmware_cluster,
      vmware_cluster_ref,
  ) -> messages.Operation:
    """Creates an Anthos cluster on VMware."""
    kwargs = {
        'parent': vmware_cluster_ref.Parent().RelativeName(),
        'validateOnly': flags.Get(args, 'validate_only'),
        'vmwareCluster': vmware_cluster,
        'vmwareClusterId': vmware_cluster_ref.Name(),
    }
    req = messages.GkeonpremProjectsLocationsVmwareClustersCreateRequest(
        **kwargs
    )
    return self._service.Create(req)

  def Update(self, args: parser_extensions.Namespace) -> messages.Operation:
    kwargs = {
        'name': self._user_cluster_name(args),
        'allowMissing': flags.Get(args, 'allow_missing'),
        'updateMask': update_mask.get_update_mask(
            args, update_mask.VMWARE_CLUSTER_ARGS_TO_UPDATE_MASKS
        ),
        'validateOnly': flags.Get(args, 'validate_only'),
        'vmwareCluster': self._vmware_cluster(args),
    }
    req = messages.GkeonpremProjectsLocationsVmwareClustersPatchRequest(
        **kwargs
    )
    return self._service.Patch(req)

  def UpdateFromFile(
      self, args: parser_extensions.Namespace, vmware_cluster
  ) -> messages.Operation:
    # explicitly list supported mutable fields
    # etag is excluded from the mutable fields, because it is derived.
    top_level_mutable_fields = [
        'description',
        'on_prem_version',
        'annotations',
        'control_plane_node',
        'anti_affinity_groups',
        'storage',
        'network_config',
        'load_balancer',
        'dataplane_v2',
        'auto_repair_config',
        'authorization',
    ]
    kwargs = {
        'name': self._user_cluster_name(args),
        'allowMissing': flags.Get(args, 'allow_missing'),
        'updateMask': ','.join(top_level_mutable_fields),
        'validateOnly': flags.Get(args, 'validate_only'),
        'vmwareCluster': vmware_cluster,
    }
    req = messages.GkeonpremProjectsLocationsVmwareClustersPatchRequest(
        **kwargs
    )
    return self._service.Patch(req)

  def QueryVersionConfig(
      self, args: parser_extensions.Namespace
  ) -> messages.QueryVmwareVersionConfigResponse:
    kwargs = {
        'createConfig_adminClusterMembership': (
            self._admin_cluster_membership_name(args)
        ),
        'upgradeConfig_clusterName': self._user_cluster_name(args),
        'parent': self._location_ref(args).RelativeName(),
    }

    req = messages.GkeonpremProjectsLocationsVmwareClustersQueryVersionConfigRequest(
        **kwargs
    )
    return self._service.QueryVersionConfig(req)

  def _vmware_cluster(self, args: parser_extensions.Namespace):
    """Constructs proto message VmwareCluster."""
    kwargs = {
        'name': self._user_cluster_name(args),
        'adminClusterMembership': self._admin_cluster_membership_name(args),
        'description': flags.Get(args, 'description'),
        'onPremVersion': flags.Get(args, 'version'),
        'annotations': self._annotations(args),
        'controlPlaneNode': self._vmware_control_plane_node_config(args),
        'antiAffinityGroups': self._vmware_aag_config(args),
        'storage': self._vmware_storage_config(args),
        'networkConfig': self._vmware_network_config(args),
        'loadBalancer': self._vmware_load_balancer_config(args),
        'vcenter': self._vmware_vcenter_config(args),
        'dataplaneV2': self._vmware_dataplane_v2_config(args),
        'vmTrackingEnabled': self._vm_tracking_enabled(args),
        'autoRepairConfig': self._vmware_auto_repair_config(args),
        'authorization': self._authorization(args),
        'enableControlPlaneV2': self._enable_control_plane_v2(args),
        'upgradePolicy': self._upgrade_policy(args),
    }
    if any(kwargs.values()):
      return messages.VmwareCluster(**kwargs)
    return None

  def _vmware_vcenter_config(self, args: parser_extensions.Namespace):
    """Constructs proto message VmwareVCenterConfig."""
    kwargs = {
        'caCertData': flags.Get(args, 'vcenter_ca_cert_data'),
        'cluster': flags.Get(args, 'vcenter_cluster'),
        'datacenter': flags.Get(args, 'vcenter_datacenter'),
        'datastore': flags.Get(args, 'vcenter_datastore'),
        'folder': flags.Get(args, 'vcenter_folder'),
        'resourcePool': flags.Get(args, 'vcenter_resource_pool'),
        'storagePolicyName': flags.Get(args, 'vcenter_storage_policy_name'),
    }
    if flags.IsSet(kwargs):
      return messages.VmwareVCenterConfig(**kwargs)
    return None

  def _upgrade_policy(self, args: parser_extensions.Namespace):
    """Constructs proto message VmwareClusterUpgradePolicy."""
    if '--upgrade-policy' not in args.GetSpecifiedArgNames():
      return None

    upgrade_policy = args.upgrade_policy
    return messages.VmwareClusterUpgradePolicy(
        controlPlaneOnly=upgrade_policy.get('control-plane-only', None),
    )

  def _enable_control_plane_v2(self, args: parser_extensions.Namespace):
    """While creating a 1.15+ user cluster, default enable_control_plane_v2 to True if not set."""
    if 'enable_control_plane_v2' in args.GetSpecifiedArgsDict():
      return True

    if 'disable_control_plane_v2' in args.GetSpecifiedArgsDict():
      return False

    default_enable_control_plane_v2 = '1.15.0-gke.0'
    if args.command_path[-1] == 'create' and version_util.Version(
        args.version
    ).feature_available(default_enable_control_plane_v2):
      return True

    return None

  def _vm_tracking_enabled(self, args: parser_extensions.Namespace):
    if flags.Get(args, 'enable_vm_tracking'):
      return True
    return None

  def auto_repair_enabled(self, args: parser_extensions.Namespace):
    if flags.Get(args, 'enable_auto_repair'):
      return True
    if flags.Get(args, 'disable_auto_repair'):
      return False
    return None

  def _vmware_auto_repair_config(self, args: parser_extensions.Namespace):
    """Constructs proto message VmwareAutoRepairConfig."""
    kwargs = {
        'enabled': self.auto_repair_enabled(args),
    }
    if flags.IsSet(kwargs):
      return messages.VmwareAutoRepairConfig(**kwargs)
    return None

  def _cluster_users(self, args: parser_extensions.Namespace):
    """Constructs repeated proto message ClusterUser."""
    cluster_user_messages = []
    admin_users = flags.Get(args, 'admin_users')
    if admin_users:
      for admin_user in admin_users:
        cluster_user_message = messages.ClusterUser(username=admin_user)
        cluster_user_messages.append(cluster_user_message)
      return cluster_user_messages

    # On update, skip setting default value.
    if args.command_path[-1] == 'update':
      return None

    # On create, client side default admin user to the current gcloud user.
    gcloud_config_core_account = properties.VALUES.core.account.Get()
    if gcloud_config_core_account:
      default_admin_user_message = messages.ClusterUser(
          username=gcloud_config_core_account
      )
      cluster_user_messages.append(default_admin_user_message)
      return cluster_user_messages

    return None

  def _authorization(self, args: parser_extensions.Namespace):
    """Constructs proto message Authorization."""
    kwargs = {
        'adminUsers': self._cluster_users(args),
    }
    if flags.IsSet(kwargs):
      return messages.Authorization(**kwargs)
    return None

  def _dataplane_v2_enabled(self, args: parser_extensions.Namespace):
    """Constructs proto field dataplane_v2_enabled."""
    if flags.Get(args, 'enable_dataplane_v2'):
      return True
    if flags.Get(args, 'disable_dataplane_v2'):
      return False
    return None

  def _advanced_networking(self, args: parser_extensions.Namespace):
    """Constructs proto field advanced_networking."""
    if flags.Get(args, 'enable_advanced_networking'):
      return True
    if flags.Get(args, 'disable_advanced_networking'):
      return False
    return None

  def _vmware_dataplane_v2_config(self, args: parser_extensions.Namespace):
    """Constructs proto message VmwareDataplaneV2Config."""
    kwargs = {
        'dataplaneV2Enabled': flags.Get(args, 'enable_dataplane_v2'),
        'advancedNetworking': flags.Get(args, 'enable_advanced_networking'),
    }
    if flags.IsSet(kwargs):
      return messages.VmwareDataplaneV2Config(**kwargs)
    return None

  def _vsphere_csi_disabled(self, args: parser_extensions.Namespace):
    """Constructs proto field vsphere_csi_disabled."""
    if flags.Get(args, 'disable_vsphere_csi'):
      return True
    if flags.Get(args, 'enable_vsphere_csi'):
      return False
    return None

  def _vmware_storage_config(self, args: parser_extensions.Namespace):
    """Constructs proto message VmwareStorageConfig."""
    kwargs = {
        'vsphereCsiDisabled': self._vsphere_csi_disabled(args),
    }
    if flags.IsSet(kwargs):
      return messages.VmwareStorageConfig(**kwargs)
    return None

  def _aag_config_disabled(self, args: parser_extensions.Namespace):
    """Constructs proto field aag_config_disabled."""
    if flags.Get(args, 'disable_aag_config'):
      return True
    if flags.Get(args, 'enable_aag_config'):
      return False
    return None

  def _vmware_aag_config(self, args: parser_extensions.Namespace):
    """Constructs proto message VmwareAAGConfig."""
    kwargs = {
        'aagConfigDisabled': self._aag_config_disabled(args),
    }
    if flags.IsSet(kwargs):
      return messages.VmwareAAGConfig(**kwargs)
    return None

  def _auto_resize_enabled(self, args: parser_extensions.Namespace):
    """Constructs proto field auto_resize_config.enabled."""
    if flags.Get(args, 'enable_auto_resize'):
      return True
    if flags.Get(args, 'disable_auto_resize'):
      return False
    return None

  def _vmware_auto_resize_config(self, args: parser_extensions.Namespace):
    """Constructs proto message VmwareAutoResizeConfig."""
    kwargs = {
        'enabled': self._auto_resize_enabled(args),
    }
    if flags.IsSet(kwargs):
      return messages.VmwareAutoResizeConfig(**kwargs)
    return None

  def _vmware_control_plane_node_config(
      self, args: parser_extensions.Namespace
  ):
    """Constructs proto message VmwareControlPlaneNodeConfig."""
    kwargs = {
        'autoResizeConfig': self._vmware_auto_resize_config(args),
        'cpus': flags.Get(args, 'cpus'),
        'memory': flags.Get(args, 'memory'),
        'replicas': flags.Get(args, 'replicas'),
        'vsphereConfig': self._vmware_control_plane_vsphere_config(args),
    }
    if flags.IsSet(kwargs):
      return messages.VmwareControlPlaneNodeConfig(**kwargs)
    return None

  def _vmware_control_plane_vsphere_config(
      self, args: parser_extensions.Namespace
  ) -> Optional[messages.VmwareControlPlaneVsphereConfig]:
    """Constructs proto message VmwareControlPlaneVsphereConfig."""
    if 'control_plane_vsphere_config' not in args.GetSpecifiedArgsDict():
      return None

    kwargs = {
        'datastore': args.control_plane_vsphere_config.get('datastore', None),
        'storagePolicyName': args.control_plane_vsphere_config.get(
            'storage-policy-name', None
        ),
    }
    return messages.VmwareControlPlaneVsphereConfig(**kwargs)

  def _create_annotations(self, args: parser_extensions.Namespace):
    """Constructs proto message AnnotationsValue for create command."""
    annotations = flags.Get(args, 'annotations')
    return self._dict_to_annotations_message(annotations)

  def _update_annotations(self, args: parser_extensions.Namespace):
    """Constructs proto message AnnotationsValue for update command."""
    annotation_flags = [
        'add_annotations',
        'clear_annotations',
        'remove_annotations',
        'set_annotations',
    ]
    if all(
        flag not in args.GetSpecifiedArgsDict() for flag in annotation_flags
    ):
      return None

    cluster_ref = args.CONCEPTS.cluster.Parse()
    cluster_response = self.Describe(cluster_ref)

    curr_annotations = {}
    if cluster_response.annotations:
      for annotation in cluster_response.annotations.additionalProperties:
        curr_annotations[annotation.key] = annotation.value

    if 'add_annotations' in args.GetSpecifiedArgsDict():
      for key, value in args.add_annotations.items():
        curr_annotations[key] = value
      return self._dict_to_annotations_message(curr_annotations)
    elif 'clear_annotations' in args.GetSpecifiedArgsDict():
      return messages.VmwareCluster.AnnotationsValue()
    elif 'remove_annotations' in args.GetSpecifiedArgsDict():
      updated_annotations = {
          key: value
          for key, value in curr_annotations.items()
          if key not in args.remove_annotations
      }
      return self._dict_to_annotations_message(updated_annotations)
    elif 'set_annotations' in args.GetSpecifiedArgsDict():
      return self._dict_to_annotations_message(args.set_annotations)

    return None

  def _dict_to_annotations_message(self, annotations):
    additional_property_messages = []
    if not annotations:
      return None

    for key, value in annotations.items():
      additional_property_messages.append(
          messages.VmwareCluster.AnnotationsValue.AdditionalProperty(
              key=key, value=value
          )
      )

    annotation_value_message = messages.VmwareCluster.AnnotationsValue(
        additionalProperties=additional_property_messages
    )
    return annotation_value_message

  def _annotations(self, args: parser_extensions.Namespace):
    """Constructs proto message AnnotationsValue."""
    if args.command_path[-1] == 'create':
      return self._create_annotations(args)
    elif args.command_path[-1] == 'update':
      return self._update_annotations(args)
    return None

  def _vmware_host_ip(self, host_ip) -> messages.VmwareHostIp:
    """Constructs proto message VmwareHostIp."""
    hostname = host_ip.get('hostname', None)
    if not hostname:
      raise InvalidConfigFile(
          'Missing field [hostname] in Static IP configuration file.'
      )

    ip = host_ip.get('ip', None)
    if not ip:
      raise InvalidConfigFile(
          'Missing field [ip] in Static IP configuration file.'
      )

    kwargs = {'hostname': hostname, 'ip': ip}
    return messages.VmwareHostIp(**kwargs)

  def _vmware_ip_block(self, ip_block):
    """Constructs proto message VmwareIpBlock."""
    gateway = ip_block.get('gateway', None)
    if not gateway:
      raise InvalidConfigFile(
          'Missing field [gateway] in Static IP configuration file.'
      )

    netmask = ip_block.get('netmask', None)
    if not netmask:
      raise InvalidConfigFile(
          'Missing field [netmask] in Static IP configuration file.'
      )

    host_ips = ip_block.get('ips', [])
    if not host_ips:
      raise InvalidConfigFile(
          'Missing field [ips] in Static IP configuration file.'
      )

    kwargs = {
        'gateway': gateway,
        'netmask': netmask,
        'ips': [self._vmware_host_ip(host_ip) for host_ip in host_ips],
    }
    if flags.IsSet(kwargs):
      return messages.VmwareIpBlock(**kwargs)
    return None

  def _vmware_static_ip_config_from_file(
      self, args: parser_extensions.Namespace
  ):
    file_content = args.static_ip_config_from_file
    static_ip_config = file_content.get('staticIPConfig', None)
    if not static_ip_config:
      raise InvalidConfigFile(
          'Missing field [staticIPConfig] in Static IP configuration file.'
      )

    ip_blocks = static_ip_config.get('ipBlocks', [])
    if not ip_blocks:
      raise InvalidConfigFile(
          'Missing field [ipBlocks] in Static IP configuration file.'
      )

    kwargs = {
        'ipBlocks': [self._vmware_ip_block(ip_block) for ip_block in ip_blocks],
    }
    if flags.IsSet(kwargs):
      return messages.VmwareStaticIpConfig(**kwargs)
    return None

  def _vmware_static_ip_config_ip_blocks(
      self, args: parser_extensions.Namespace
  ):
    vmware_static_ip_config_message = messages.VmwareStaticIpConfig()
    for ip_block in args.static_ip_config_ip_blocks:
      vmware_ip_block_message = messages.VmwareIpBlock(
          gateway=ip_block['gateway'],
          netmask=ip_block['netmask'],
          ips=[
              messages.VmwareHostIp(ip=ip[0], hostname=ip[1])
              for ip in ip_block['ips']
          ],
      )
      vmware_static_ip_config_message.ipBlocks.append(vmware_ip_block_message)
    return vmware_static_ip_config_message

  def _vmware_static_ip_config(self, args: parser_extensions.Namespace):
    """Constructs proto message VmwareStaticIpConfig."""
    if 'static_ip_config_from_file' in args.GetSpecifiedArgsDict():
      return self._vmware_static_ip_config_from_file(args)

    if 'static_ip_config_ip_blocks' in args.GetSpecifiedArgsDict():
      return self._vmware_static_ip_config_ip_blocks(args)

    return None

  def _vmware_dhcp_ip_config(self, args: parser_extensions.Namespace):
    """Constructs proto message VmwareDhcpIpConfig."""
    kwargs = {
        'enabled': flags.Get(args, 'enable_dhcp'),
    }
    if flags.IsSet(kwargs):
      return messages.VmwareDhcpIpConfig(**kwargs)
    return None

  def _vmware_host_config(self, args: parser_extensions.Namespace):
    """Constructs proto message VmwareHostConfig."""
    kwargs = {
        'dnsServers': flags.Get(args, 'dns_servers', []),
        'ntpServers': flags.Get(args, 'ntp_servers', []),
        'dnsSearchDomains': flags.Get(args, 'dns_search_domains', []),
    }
    if flags.IsSet(kwargs):
      return messages.VmwareHostConfig(**kwargs)
    return None

  def _control_plane_ip_block(self, args: parser_extensions.Namespace):
    """Constructs proto message ControlPlaneIpBlock."""
    if 'control_plane_ip_block' not in args.GetSpecifiedArgsDict():
      return None

    kwargs = {
        'gateway': args.control_plane_ip_block.get('gateway', None),
        'netmask': args.control_plane_ip_block.get('netmask', None),
        'ips': [
            messages.VmwareHostIp(ip=ip[0], hostname=ip[1])
            for ip in args.control_plane_ip_block.get('ips', [])
        ],
    }
    return messages.VmwareIpBlock(**kwargs)

  def _vmware_control_plane_v2_config(self, args: parser_extensions.Namespace):
    """Constructs proto message VmwareControlPlaneV2Config."""
    kwargs = {
        'controlPlaneIpBlock': self._control_plane_ip_block(args),
    }
    if any(kwargs.values()):
      return messages.VmwareControlPlaneV2Config(**kwargs)
    return None

  def _vmware_network_config(self, args: parser_extensions.Namespace):
    """Constructs proto message VmwareNetworkConfig."""
    kwargs = {
        'serviceAddressCidrBlocks': flags.Get(
            args, 'service_address_cidr_blocks', []
        ),
        'podAddressCidrBlocks': flags.Get(args, 'pod_address_cidr_blocks', []),
        'staticIpConfig': self._vmware_static_ip_config(args),
        'dhcpIpConfig': self._vmware_dhcp_ip_config(args),
        'hostConfig': self._vmware_host_config(args),
        'controlPlaneV2Config': self._vmware_control_plane_v2_config(args),
    }
    if any(kwargs.values()):
      return messages.VmwareNetworkConfig(**kwargs)
    return None

  def _vmware_load_balancer_config(self, args: parser_extensions.Namespace):
    """Constructs proto message VmwareLoadBalancerConfig."""
    kwargs = {
        'f5Config': self._vmware_f5_big_ip_config(args),
        'metalLbConfig': self._vmware_metal_lb_config(args),
        'manualLbConfig': self._vmware_manual_lb_config(args),
        'vipConfig': self._vmware_vip_config(args),
    }
    if any(kwargs.values()):
      return messages.VmwareLoadBalancerConfig(**kwargs)
    return None

  def _vmware_vip_config(self, args: parser_extensions.Namespace):
    """Constructs proto message VmwareVipConfig."""
    kwargs = {
        'controlPlaneVip': flags.Get(args, 'control_plane_vip'),
        'ingressVip': flags.Get(args, 'ingress_vip'),
    }
    if any(kwargs.values()):
      return messages.VmwareVipConfig(**kwargs)
    return None

  def _vmware_f5_big_ip_config(self, args: parser_extensions.Namespace):
    """Constructs proto message VmwareF5BigIpConfig."""
    kwargs = {
        'address': flags.Get(args, 'f5_config_address'),
        'partition': flags.Get(args, 'f5_config_partition'),
        'snatPool': flags.Get(args, 'f5_config_snat_pool'),
    }
    if any(kwargs.values()):
      return messages.VmwareF5BigIpConfig(**kwargs)
    return None

  def _vmware_metal_lb_config_from_file(
      self, args: parser_extensions.Namespace
  ) -> messages.VmwareMetalLbConfig:
    file_content = args.metal_lb_config_from_file
    metal_lb_config = file_content.get('metalLBConfig', None)
    if not metal_lb_config:
      raise InvalidConfigFile(
          'Missing field [metalLBConfig] in Metal LB configuration file.'
      )

    address_pools = metal_lb_config.get('addressPools', [])
    if not address_pools:
      raise InvalidConfigFile(
          'Missing field [addressPools] in Metal LB configuration file.'
      )

    kwargs = {
        'addressPools': self._address_pools(address_pools),
    }
    return messages.VmwareMetalLbConfig(**kwargs)

  def _vmware_metal_lb_config_from_flag(
      self, args: parser_extensions.Namespace
  ) -> messages.VmwareMetalLbConfig:
    vmware_metal_lb_config = messages.VmwareMetalLbConfig()
    for address_pool in args.metal_lb_config_address_pools:
      address_pool_message = messages.VmwareAddressPool(
          addresses=address_pool.get('addresses', []),
          avoidBuggyIps=address_pool.get('avoid-buggy-ips', None),
          manualAssign=address_pool.get('manual-assign', None),
          pool=address_pool.get('pool', None),
      )
      vmware_metal_lb_config.addressPools.append(address_pool_message)
    return vmware_metal_lb_config

  def _vmware_metal_lb_config(self, args: parser_extensions.Namespace):
    """Constructs proto message VmwareMetalLbConfig."""
    if 'metal_lb_config_from_file' in args.GetSpecifiedArgsDict():
      return self._vmware_metal_lb_config_from_file(args)
    elif 'metal_lb_config_address_pools' in args.GetSpecifiedArgsDict():
      return self._vmware_metal_lb_config_from_flag(args)
    else:
      return None

  def _vmware_manual_lb_config(self, args: parser_extensions.Namespace):
    """Constructs proto message VmwareManualLbConfig."""
    kwargs = {
        'controlPlaneNodePort': flags.Get(args, 'control_plane_node_port'),
        'ingressHttpNodePort': flags.Get(args, 'ingress_http_node_port'),
        'ingressHttpsNodePort': flags.Get(args, 'ingress_https_node_port'),
        'konnectivityServerNodePort': flags.Get(
            args, 'konnectivity_server_node_port'
        ),
    }
    if flags.IsSet(kwargs):
      return messages.VmwareManualLbConfig(**kwargs)
    return None

  def _address_pools(self, address_pools):
    """Constructs proto message field address_pools."""
    return [
        self._vmware_address_pool(address_pool)
        for address_pool in address_pools
    ]

  def _vmware_address_pool(self, address_pool) -> messages.VmwareAddressPool:
    """Constructs proto message VmwareAddressPool."""
    addresses = address_pool.get('addresses', [])
    if not addresses:
      raise InvalidConfigFile(
          'Missing field [addresses] in Metal LB configuration file.'
      )

    avoid_buggy_ips = address_pool.get('avoidBuggyIPs', None)
    manual_assign = address_pool.get('manualAssign', None)

    pool = address_pool.get('pool', None)
    if not pool:
      raise InvalidConfigFile(
          'Missing field [pool] in Metal LB configuration file.'
      )

    kwargs = {
        'addresses': addresses,
        'avoidBuggyIps': avoid_buggy_ips,
        'manualAssign': manual_assign,
        'pool': pool,
    }
    return messages.VmwareAddressPool(**kwargs)


class InvalidConfigFile(exceptions.Error):
  """Invalid Argument."""

  pass