HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/394/lib/googlecloudsdk/api_lib/container/gkeonprem/vmware_admin_clusters.py
# -*- coding: utf-8 -*- #
# Copyright 2022 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilities for gkeonprem API clients for VMware admin cluster resources."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

from typing import Generator

from apitools.base.py import exceptions as apitools_exceptions
from apitools.base.py import list_pager
from googlecloudsdk.api_lib.container.gkeonprem import client
from googlecloudsdk.api_lib.container.gkeonprem import update_mask
from googlecloudsdk.calliope import parser_extensions
from googlecloudsdk.command_lib.container.vmware import flags
from googlecloudsdk.core import properties
from googlecloudsdk.generated_clients.apis.gkeonprem.v1 import gkeonprem_v1_messages as messages


class AdminClustersClient(client.ClientBase):
  """Client for clusters in gkeonprem vmware API."""

  def __init__(self, **kwargs):
    super(AdminClustersClient, self).__init__(**kwargs)
    self._service = self._client.projects_locations_vmwareAdminClusters

  def Enroll(
      self, args, membership=None, vmware_admin_cluster_id=None, parent=None
  ) -> messages.Operation:
    """Enrolls an admin cluster to Anthos on VMware."""
    kwargs = {
        'membership': (
            membership
            if membership
            else self._admin_cluster_membership_name(args)
        ),
        'vmwareAdminClusterId': (
            vmware_admin_cluster_id
            if vmware_admin_cluster_id
            else self._admin_cluster_id(args)
        ),
    }
    req = messages.GkeonpremProjectsLocationsVmwareAdminClustersEnrollRequest(
        parent=parent if parent else self._admin_cluster_parent(args),
        enrollVmwareAdminClusterRequest=messages.EnrollVmwareAdminClusterRequest(
            **kwargs
        ),
    )
    return self._service.Enroll(req)

  def Unenroll(self, args: parser_extensions.Namespace) -> messages.Operation:
    """Unenrolls an Anthos on VMware admin cluster."""
    kwargs = {
        'name': self._admin_cluster_name(args),
        'validateOnly': self.GetFlag(args, 'validate_only'),
        'allowMissing': self.GetFlag(args, 'allow_missing'),
        'ignoreErrors': self.GetFlag(args, 'ignore_errors'),
    }
    req = messages.GkeonpremProjectsLocationsVmwareAdminClustersUnenrollRequest(
        **kwargs
    )
    return self._service.Unenroll(req)

  def List(
      self, args: parser_extensions.Namespace
  ) -> Generator[messages.VmwareAdminCluster, None, None]:
    """Lists Admin Clusters in the GKE On-Prem VMware API."""
    # Workaround for P4SA: Call query version config first, ignore the result.
    # Context: b/296435390#comment2
    project = (
        args.project if args.project else properties.VALUES.core.project.Get()
    )
    # Hard code location to `us-west1`, because it cannot handle `--location=-`.
    parent = 'projects/{project}/locations/{location}'.format(
        project=project, location='us-west1'
    )
    dummy_request = messages.GkeonpremProjectsLocationsVmwareClustersQueryVersionConfigRequest(
        parent=parent,
    )
    try:
      _ = self._client.projects_locations_vmwareClusters.QueryVersionConfig(
          dummy_request
      )
    except (
        apitools_exceptions.HttpUnauthorizedError,
        apitools_exceptions.HttpForbiddenError,
    ):
      # Ignore unauthorized or forbidden errors. This may happen during the
      # support case when googler inspect customer's project via Google Admin.
      pass

    if (
        'location' not in args.GetSpecifiedArgsDict()
        and not properties.VALUES.container_vmware.location.Get()
    ):
      args.location = '-'

    list_req = (
        messages.GkeonpremProjectsLocationsVmwareAdminClustersListRequest(
            parent=self._location_name(args)
        )
    )

    return list_pager.YieldFromList(
        self._service,
        list_req,
        field='vmwareAdminClusters',
        batch_size=flags.Get(args, 'page_size'),
        limit=flags.Get(args, 'limit'),
        batch_size_attribute='pageSize',
    )

  def Update(
      self,
      args: parser_extensions.Namespace,
      cluster_ref=None,
      is_user_cluster_upgrade=False,
  ) -> messages.Operation:
    """Updates an admin cluster to Anthos on VMware."""

    mask = update_mask.get_update_mask(
        args, update_mask.VMWARE_ADMIN_CLUSTER_ARGS_TO_UPDATE_MASKS
    )
    if is_user_cluster_upgrade:
      mask = ''

    kwargs = {
        'name': (
            cluster_ref.RelativeName()
            if cluster_ref
            else self._admin_cluster_name(args)
        ),
        'updateMask': mask,
        'vmwareAdminCluster': self._vmware_admin_cluster(
            args, is_user_cluster_upgrade
        ),
    }
    req = messages.GkeonpremProjectsLocationsVmwareAdminClustersPatchRequest(
        **kwargs
    )
    return self._service.Patch(req)

  def _vmware_admin_cluster(
      self, args: parser_extensions.Namespace, is_user_cluster_upgrade=False
  ):
    """Constructs proto message VmwareAdminCluster."""
    kwargs = {}
    if is_user_cluster_upgrade:
      kwargs['platformConfig'] = self._platform_config(args, True)
    else:
      kwargs['platformConfig'] = self._platform_config(args, False)
      kwargs['onPremVersion'] = flags.Get(args, 'version')

    if any(kwargs.values()):
      return messages.VmwareAdminCluster(**kwargs)
    return None

  def _platform_config(
      self, args: parser_extensions.Namespace, is_user_cluster_upgrade=False
  ):
    """Constructs proto message field platform_config."""
    required_platform_version = flags.Get(args, 'required_platform_version')
    if required_platform_version is None and is_user_cluster_upgrade:
      required_platform_version = flags.Get(args, 'version')

    kwargs = {
        'requiredPlatformVersion': required_platform_version,
    }
    if any(kwargs.values()):
      return messages.VmwarePlatformConfig(**kwargs)
    return None