HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/396/lib/googlecloudsdk/command_lib/compute/instance_templates/mesh_util.py
# -*- coding: utf-8 -*- #
# Copyright 2022 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utils for mesh flag in the instance template commands."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import collections
import io
import json
import re

from googlecloudsdk.api_lib.container import util as c_util
from googlecloudsdk.command_lib.compute.instance_templates import service_proxy_aux_data
from googlecloudsdk.command_lib.container.fleet import api_util
from googlecloudsdk.command_lib.container.fleet import kube_util as hub_kube_util
from googlecloudsdk.command_lib.projects import util as project_util
from googlecloudsdk.core import exceptions
from googlecloudsdk.core import execution_utils
from googlecloudsdk.core import yaml
from googlecloudsdk.core.util import files

_EXPANSION_GATEWAY_NAME = 'istio-eastwestgateway'
_SERVICE_PROXY_BUCKET_NAME = (
    'gs://gce-service-proxy/service-proxy-agent/releases/'
    'service-proxy-agent-asm-{}-stable.tgz')
_SERVICE_PROXY_INSTALLER_BUCKET_NAME = (
    'gs://gce-service-proxy/service-proxy-agent-installer/'
    'releases/installer.tgz')
_GCE_SERVICE_PROXY_ASM_VERSION_METADATA = 'gce-service-proxy-asm-version'
_GCE_SERVICE_PROXY_INSTALLER_BUCKET_METADATA = (
    'gce-service-proxy-installer-bucket')
_GCE_SERVICE_PROXY_AGENT_BUCKET_METADATA = 'gce-service-proxy-agent-bucket'

_ISTIO_CANONICAL_SERVICE_NAME_LABEL = 'service.istio.io/canonical-name'
_ISTIO_CANONICAL_SERVICE_REVISION_LABEL = 'service.istio.io/canonical-revision'
_KUBERNETES_APP_NAME_LABEL = 'app.kubernetes.io/name'
_KUBERNETES_APP_VERSION_LABEL = 'app.kubernetes.io/version'

_ISTIO_META_CLOUDRUN_ADDR_KEY = 'ISTIO_META_CLOUDRUN_ADDR'
_CLOUDRUN_ADDR_KEY = 'CLOUDRUN_ADDR'

_ISTIO_DISCOVERY_PORT = '15012'

_WORKLOAD_PATTERN = r'(.*)\/(.*)'
# Allow non-prod GKE Hub memberships to be specified.
_GKEHUB_PATTERN = r'\/\/gkehub(.*).googleapis.com\/(.*)'
_ASM_VERSION_PATTERN = r':(.*)-'

_INCLUSTER_WEBHOOK_PREFIX = 'istio-sidecar-injector'
_MCP_WEBHOOK_PREFIX = 'istiod'

_MCP_ADDRESS = 'meshconfig.googleapis.com:443'

ServiceProxyMetadataArgs = collections.namedtuple('ServiceProxyMetadataArgs', [
    'asm_version', 'project_id', 'expansionagateway_ip',
    'service_proxy_api_server', 'identity_provider', 'service_account',
    'asm_proxy_config', 'mcp_env_config', 'trust_domain', 'mesh_id', 'network',
    'asm_labels', 'workload_name', 'workload_namespace', 'canonical_service',
    'canonical_revision', 'asm_revision', 'root_cert'
])


def ParseWorkload(workload):
  """Parses the workload value to workload namespace and name.

  Args:
    workload: The workload value with namespace/name format.

  Returns:
    workload namespace and workload name.

  Raises:
    Error: if the workload value is invalid.
  """
  workload_match = re.search(_WORKLOAD_PATTERN, workload)
  if workload_match:
    return workload_match.group(1), workload_match.group(2)
  raise exceptions.Error(
      'value workload: {} is invalid. Workload value should have the format'
      'namespace/name.'.format(workload))


def _ParseMembershipName(owner_id):
  """Get membership name from an owner id value.

  Args:
    owner_id: The owner ID value of a membership. e.g.,
    //gkehub.googleapis.com/projects/123/locations/global/memberships/test.

  Returns:
    The full resource name of the membership, e.g.,
      projects/foo/locations/global/memberships/name.

  Raises:
    Error: if the membership name cannot be parsed.
  """
  membership_match = re.search(_GKEHUB_PATTERN, owner_id)
  if membership_match:
    return membership_match.group(2)
  raise exceptions.Error(
      'value owner_id: {} is invalid.'.format(owner_id))


def _GetVMIdentityProvider(membership_manifest, workload_namespace):
  """Get the identity provider for the VMs.

  Args:
    membership_manifest: The membership manifest from the cluster.
    workload_namespace: The namespace of the VM workload.

  Returns:
    The identity provider value to be used on the VM connected to the cluster.

  Raises:
    ClusterError: If the membership manifest cannot be read.
  """
  if not membership_manifest:
    raise ClusterError('Cannot verify an empty membership from the cluster')

  try:
    membership_data = yaml.load(membership_manifest)
  except yaml.Error as e:
    raise exceptions.Error(
        'Invalid membership from the cluster {}'.format(membership_manifest), e)

  owner_id = _GetNestedKeyFromManifest(
      membership_data, 'spec', 'owner', 'id')
  if not owner_id:
    raise ClusterError('Invalid membership does not have an owner id. Please '
                       'make sure your cluster is correctly registered and '
                       'retry.')

  membership_name = _ParseMembershipName(owner_id)
  membership = api_util.GetMembership(membership_name)
  if not membership.uniqueId:
    raise exceptions.Error('Invalid membership {} does not have a unique_Id '
                           'field. Please make sure your cluster is correctly '
                           'registered and retry.'.format(membership_name))

  return '{}@google@{}'.format(membership.uniqueId, workload_namespace)


def _RetrieveProxyConfig(is_mcp, mesh_config):
  """Retrieve proxy config from a mesh config.

  Args:
    is_mcp: Whether the control plane is managed or not.
    mesh_config: A mesh config from the cluster.

  Returns:
    proxy_config: The proxy config from the mesh config.
  """
  try:
    proxy_config = mesh_config['defaultConfig']
  except (KeyError, TypeError):
    if is_mcp:
      return {}
    raise exceptions.Error(
        'Proxy config cannot be found in the Anthos Service Mesh.')

  return proxy_config


def _RetrieveTrustDomain(is_mcp, mesh_config):
  """Retrieve trust domain from a mesh config.

  Args:
    is_mcp: Whether the control plane is managed or not.
    mesh_config: A mesh config from the cluster.

  Returns:
    trust_domain: The trust domain from the mesh config.
  """
  try:
    trust_domain = mesh_config['trustDomain']
  except (KeyError, TypeError):
    if is_mcp:
      return None
    raise exceptions.Error(
        'Trust Domain cannot be found in the Anthos Service Mesh.')

  return trust_domain


def _RetrieveMeshId(is_mcp, mesh_config):
  """Retrieve mesh id from a mesh config.

  Args:
    is_mcp: Whether the control plane is managed or not.
    mesh_config: A mesh config from the cluster.

  Returns:
    mesh_id: The mesh id from the mesh config.
  """
  proxy_config = _RetrieveProxyConfig(is_mcp, mesh_config)

  try:
    mesh_id = proxy_config['meshId']
  except (KeyError, TypeError):
    if is_mcp:
      return None
    raise exceptions.Error(
        'Mesh ID cannot be found in the Anthos Service Mesh.')

  return mesh_id


def _RetrieveDiscoveryAddress(mesh_config):
  """Get the discovery address used in the MCP installation.

  Args:
    mesh_config: A mesh config from the cluster.

  Returns:
    The discovery address.
  """
  proxy_config = _RetrieveProxyConfig(is_mcp=True, mesh_config=mesh_config)

  if proxy_config is None:
    proxy_config = {}

  # When user does not provide a discoveryAddress in the proxy config, the
  # default MCP discovery address is used here. We expect mesh agent to get
  # the correct discoveryAddress during bootstrap.
  return proxy_config.get('discoveryAddress', _MCP_ADDRESS)


def _IsMCP(kube_client, revision):
  """Check if ASM control plane is managed.

  Args:
    kube_client: A kubernetes client for the cluster.
    revision: The ASM revision to check for control plane type.

  Returns:
    True if the control plane is MCP, otherwise False.
  """
  # MCP uses the url field in the webhook ClientConfig and In-cluster CP uses
  # the service field instead.
  # TODO(b/195018417): Further validate the URL format when it is using
  # meshconfig endpoint.
  url = kube_client.RetrieveMutatingWebhookURL(revision)

  if url:
    return True
  return False


def _GetWorkloadLabels(workload_manifest):
  """Get the workload labels from a workload manifest.

  Args:
    workload_manifest: The manifest of the workload.

  Returns:
    The workload labels.

  Raises:
    WorkloadError: If the workload manifest cannot be read.
  """
  if not workload_manifest:
    raise WorkloadError('Cannot verify an empty workload from the cluster')

  try:
    workload_data = yaml.load(workload_manifest)
  except yaml.Error as e:
    raise exceptions.Error(
        'Invalid workload from the cluster {}'.format(workload_data), e)

  workload_labels = _GetNestedKeyFromManifest(workload_data, 'spec', 'metadata',
                                              'labels')

  return workload_labels


def _GetCanonicalServiceName(workload_name, workload_manifest):
  """Get the canonical service name of the workload.

  Args:
    workload_name: The name of the workload.
    workload_manifest: The manifest of the workload.

  Returns:
    The canonical service name of the workload.
  """
  workload_labels = _GetWorkloadLabels(workload_manifest)

  return _ExtractCanonicalServiceName(workload_labels, workload_name)


def _GetCanonicalServiceRevision(workload_manifest):
  """Get the canonical service revision of the workload.

  Args:
    workload_manifest: The manifest of the workload.

  Returns:
    The canonical service revision of the workload.
  """
  workload_labels = _GetWorkloadLabels(workload_manifest)

  return _ExtractCanonicalServiceRevision(workload_labels)


def _ExtractCanonicalServiceName(workload_labels, workload_name):
  """Get the canonical service name of the workload.

  Args:
    workload_labels: A map of workload labels.
    workload_name: The name of the workload.

  Returns:
    The canonical service name of the workload.
  """
  if not workload_labels:
    return workload_name

  svc = workload_labels.get(_ISTIO_CANONICAL_SERVICE_NAME_LABEL)
  if svc:
    return svc

  svc = workload_labels.get(_KUBERNETES_APP_NAME_LABEL)
  if svc:
    return svc

  svc = workload_labels.get('app')
  if svc:
    return svc

  return workload_name


def _ExtractCanonicalServiceRevision(workload_labels):
  """Get the canonical service revision of the workload.

  Args:
    workload_labels: A map of workload labels.

  Returns:
    The canonical service revision of the workload.
  """
  if not workload_labels:
    return 'latest'

  rev = workload_labels.get(_ISTIO_CANONICAL_SERVICE_REVISION_LABEL)
  if rev:
    return rev

  rev = workload_labels.get(_KUBERNETES_APP_VERSION_LABEL)
  if rev:
    return rev

  rev = workload_labels.get('version')
  if rev:
    return rev

  return 'latest'


def VerifyWorkloadSetup(workload_manifest):
  """Verify VM workload setup in the cluster."""
  if not workload_manifest:
    raise WorkloadError('Cannot verify an empty workload from the cluster')

  try:
    workload_data = yaml.load(workload_manifest)
  except yaml.Error as e:
    raise exceptions.Error(
        'Invalid workload from the cluster {}'.format(workload_manifest), e)

  identity_provider_value = _GetNestedKeyFromManifest(
      workload_data, 'spec', 'metadata', 'annotations',
      'security.cloud.google.com/IdentityProvider')
  if identity_provider_value != 'google':
    raise WorkloadError('Unable to find the GCE IdentityProvider in the '
                        'specified WorkloadGroup. Please make sure the '
                        'GCE IdentityProvider is specified in the '
                        'WorkloadGroup.')


def RetrieveWorkloadRevision(namespace_manifest):
  """Retrieve the Anthos Service Mesh revision for the workload."""
  if not namespace_manifest:
    raise WorkloadError('Cannot verify an empty namespace from the cluster')

  try:
    namespace_data = yaml.load(namespace_manifest)
  except yaml.Error as e:
    raise exceptions.Error(
        'Invalid namespace from the cluster {}'.format(namespace_manifest), e)

  workload_revision = _GetNestedKeyFromManifest(namespace_data, 'metadata',
                                                'labels', 'istio.io/rev')
  if not workload_revision:
    raise WorkloadError('Workload namespace does not have an Anthos Service '
                        'Mesh revision label. Please make sure the namespace '
                        'is labeled and try again.')

  return workload_revision


def _RetrieveWorkloadServiceAccount(workload_manifest):
  """Retrieve the service account used for the workload."""
  if not workload_manifest:
    raise WorkloadError('Cannot verify an empty workload from the cluster')

  try:
    workload_data = yaml.load(workload_manifest)
  except yaml.Error as e:
    raise exceptions.Error(
        'Invalid workload from the cluster {}'.format(workload_manifest), e)

  service_account = _GetNestedKeyFromManifest(workload_data, 'spec', 'template',
                                              'serviceAccount')
  return service_account


def _RetrieveServiceProxyMetadata(args, is_mcp, kube_client, project_id,
                                  network_resource, workload_namespace,
                                  workload_name, workload_manifest,
                                  membership_manifest, asm_revision,
                                  mesh_config):
  """Retrieve the necessary metadata to configure the service proxy."""

  if is_mcp:
    asm_version = None
    expansionagateway_ip = None
    root_cert = None
    service_proxy_api_server = _RetrieveDiscoveryAddress(mesh_config)
    env_config = kube_client.RetrieveEnvConfig(asm_revision)
  else:
    if _GCE_SERVICE_PROXY_ASM_VERSION_METADATA in args.metadata:
      asm_version = args.metadata[_GCE_SERVICE_PROXY_ASM_VERSION_METADATA]
    else:
      asm_version = kube_client.RetrieveASMVersion(asm_revision)
    expansionagateway_ip = kube_client.RetrieveExpansionGatewayIP()
    root_cert = kube_client.RetrieveKubernetesRootCert()
    service_proxy_api_server = '{}:{}'.format(expansionagateway_ip,
                                              _ISTIO_DISCOVERY_PORT)
    env_config = None

  identity_provider = _GetVMIdentityProvider(membership_manifest,
                                             workload_namespace)

  service_account = _RetrieveWorkloadServiceAccount(workload_manifest)

  asm_proxy_config = _RetrieveProxyConfig(is_mcp, mesh_config)

  trust_domain = _RetrieveTrustDomain(is_mcp, mesh_config)

  mesh_id = _RetrieveMeshId(is_mcp, mesh_config)

  network = network_resource.split('/')[-1]

  asm_labels = _GetWorkloadLabels(workload_manifest)

  canonical_service = _GetCanonicalServiceName(workload_name, workload_manifest)

  canonical_revision = _GetCanonicalServiceRevision(workload_manifest)

  return ServiceProxyMetadataArgs(
      asm_version, project_id, expansionagateway_ip, service_proxy_api_server,
      identity_provider, service_account, asm_proxy_config, env_config,
      trust_domain, mesh_id, network, asm_labels, workload_name,
      workload_namespace, canonical_service, canonical_revision, asm_revision,
      root_cert)


def _ModifyInstanceTemplate(args, is_mcp, metadata_args):
  """Modify the instance template to include the service proxy metadata."""

  if metadata_args.asm_labels:
    asm_labels = metadata_args.asm_labels
  else:
    asm_labels = collections.OrderedDict()

  asm_labels[
      _ISTIO_CANONICAL_SERVICE_NAME_LABEL] = metadata_args.canonical_service
  asm_labels[
      _ISTIO_CANONICAL_SERVICE_REVISION_LABEL] = metadata_args.canonical_revision

  asm_labels_string = json.dumps(asm_labels, sort_keys=True)

  service_proxy_config = collections.OrderedDict()
  service_proxy_config['mode'] = 'ON'

  service_proxy_config['proxy-spec'] = {
      'network': metadata_args.network,
      'api-server': metadata_args.service_proxy_api_server,
      'log-level': 'info',
  }

  service_proxy_config['service'] = {}

  proxy_config = metadata_args.asm_proxy_config
  if not proxy_config:
    proxy_config = collections.OrderedDict()
  if 'proxyMetadata' not in proxy_config:
    proxy_config['proxyMetadata'] = collections.OrderedDict()
  else:
    proxy_config['proxyMetadata'] = collections.OrderedDict(
        proxy_config['proxyMetadata'])

  proxy_metadata = proxy_config['proxyMetadata']
  proxy_metadata['ISTIO_META_WORKLOAD_NAME'] = metadata_args.workload_name
  proxy_metadata['POD_NAMESPACE'] = metadata_args.workload_namespace
  proxy_metadata['USE_TOKEN_FOR_CSR'] = 'true'
  proxy_metadata['ISTIO_META_DNS_CAPTURE'] = 'true'
  proxy_metadata['ISTIO_META_AUTO_REGISTER_GROUP'] = metadata_args.workload_name
  proxy_metadata['SERVICE_ACCOUNT'] = metadata_args.service_account
  proxy_metadata[
      'CREDENTIAL_IDENTITY_PROVIDER'] = metadata_args.identity_provider
  if metadata_args.trust_domain:
    proxy_metadata['TRUST_DOMAIN'] = metadata_args.trust_domain
  if metadata_args.mesh_id:
    proxy_metadata['ISTIO_META_MESH_ID'] = metadata_args.mesh_id
  proxy_metadata['ISTIO_META_NETWORK'] = '{}-{}'.format(
      metadata_args.project_id, metadata_args.network)
  proxy_metadata['CANONICAL_SERVICE'] = metadata_args.canonical_service
  proxy_metadata['CANONICAL_REVISION'] = metadata_args.canonical_revision
  proxy_metadata['ISTIO_METAJSON_LABELS'] = asm_labels_string

  if metadata_args.asm_revision == 'default':
    proxy_metadata['ASM_REVISION'] = ''
  else:
    proxy_metadata['ASM_REVISION'] = metadata_args.asm_revision

  gce_software_declaration = collections.OrderedDict()
  service_proxy_agent_recipe = collections.OrderedDict()

  service_proxy_agent_recipe['name'] = 'install-gce-service-proxy-agent'
  service_proxy_agent_recipe['desired_state'] = 'INSTALLED'

  if is_mcp:
    service_proxy_agent_recipe['installSteps'] = [{
        'scriptRun': {
            'script':
                service_proxy_aux_data
                .startup_script_for_asm_service_proxy_installer
        }
    }]
    proxy_metadata.update(metadata_args.mcp_env_config)
    # ISTIO_META_CLOUDRUN_ADDR must be set to generate node metadata on VM.
    if _CLOUDRUN_ADDR_KEY in proxy_metadata:
      proxy_metadata[_ISTIO_META_CLOUDRUN_ADDR_KEY] = proxy_metadata[
          _CLOUDRUN_ADDR_KEY]
    if 'gce-service-proxy-installer-bucket' not in args.metadata:
      args.metadata['gce-service-proxy-installer-bucket'] = (
          _SERVICE_PROXY_INSTALLER_BUCKET_NAME)
  else:
    service_proxy_agent_recipe['installSteps'] = [{
        'scriptRun': {
            'script':
                service_proxy_aux_data.startup_script_for_asm_service_proxy
                .format(
                    ingress_ip=metadata_args.expansionagateway_ip,
                    asm_revision=metadata_args.asm_revision)
        }
    }]
    proxy_metadata['ISTIO_META_ISTIO_VERSION'] = metadata_args.asm_version
    args.metadata['rootcert'] = metadata_args.root_cert
    if _GCE_SERVICE_PROXY_AGENT_BUCKET_METADATA not in args.metadata:
      args.metadata[
          _GCE_SERVICE_PROXY_AGENT_BUCKET_METADATA] = (
              _SERVICE_PROXY_BUCKET_NAME.format(metadata_args.asm_version))

  gce_software_declaration['softwareRecipes'] = [service_proxy_agent_recipe]

  service_proxy_config['asm-config'] = proxy_config

  args.metadata['enable-osconfig'] = 'true'
  args.metadata['enable-guest-attributes'] = 'true'
  args.metadata['osconfig-disabled-features'] = 'tasks'
  args.metadata['gce-software-declaration'] = json.dumps(
      gce_software_declaration)
  args.metadata['gce-service-proxy'] = json.dumps(
      service_proxy_config, sort_keys=True)

  if args.labels is None:
    args.labels = collections.OrderedDict()
  args.labels['asm_service_name'] = metadata_args.canonical_service
  args.labels['asm_service_namespace'] = metadata_args.workload_namespace
  if metadata_args.mesh_id:
    args.labels['mesh_id'] = metadata_args.mesh_id
  else:
    # This works for now as we only support adding VM to the Fleet project. But
    # it should be the Fleet project instead.
    project_number = project_util.GetProjectNumber(metadata_args.project_id)
    args.labels['mesh_id'] = 'proj-{}'.format(project_number)
  # For ASM VM usage tracking.
  args.labels['gce-service-proxy'] = 'asm-istiod'


def ConfigureInstanceTemplate(args, kube_client, project_id, network_resource,
                              workload_namespace, workload_name,
                              workload_manifest, membership_manifest,
                              asm_revision, mesh_config):
  """Configure the provided instance template args with ASM metadata."""
  is_mcp = _IsMCP(kube_client, asm_revision)

  service_proxy_metadata_args = _RetrieveServiceProxyMetadata(
      args, is_mcp, kube_client, project_id, network_resource,
      workload_namespace, workload_name, workload_manifest, membership_manifest,
      asm_revision, mesh_config)

  _ModifyInstanceTemplate(args, is_mcp, service_proxy_metadata_args)


class KubernetesClient(object):
  """Kubernetes client for access Kubernetes APIs."""

  def __init__(self, gke_cluster=None):
    """KubernetesClient constructor.

    Args:
      gke_cluster: the location/name of the GKE cluster.
    """
    self.kubectl_timeout = '20s'

    self.temp_kubeconfig_dir = files.TemporaryDirectory()
    self.processor = hub_kube_util.KubeconfigProcessor(
        api_adapter=None, gke_uri=None, gke_cluster=gke_cluster,
        kubeconfig=None, internal_ip=False, cross_connect_subnetwork=None,
        private_endpoint_fqdn=None, context=None)
    self.kubeconfig, self.context = self.processor.GetKubeconfigAndContext(
        self.temp_kubeconfig_dir)

  def __enter__(self):
    return self

  def __exit__(self, *_):
    # delete temp directory
    if self.temp_kubeconfig_dir is not None:
      self.temp_kubeconfig_dir.Close()

  def HasNamespaceReaderPermissions(self, *namespaces):
    """Check to see if the user has read permissions in the namespaces.

    Args:
      *namespaces: The namespaces to verify reader permissions.

    Returns:
      true, if reads can be performed on all of the specified namespaces.

    Raises:
      Error: if failing to get check for read permissions.
      Error: if read permissions are not found.
    """
    for ns in namespaces:
      out, err = self._RunKubectl(
          ['auth', 'can-i', 'get', '*', '-n', ns], None)
      if err:
        raise exceptions.Error(
            'Failed to check if the user can read resources in {} namespace: {}'
            .format(ns, err))
      if 'yes' not in out:
        raise exceptions.Error(
            'Missing permissions to read resources in {} namespace'.format(ns))
    return True

  def NamespacesExist(self, *namespaces):
    """Check to see if the namespaces exist in the cluster.

    Args:
      *namespaces: The namespaces to check.

    Returns:
      true, if namespaces exist.

    Raises:
      Error: if failing to verify the namespaces.
      Error: if at least one of the namespaces do not exist.
    """
    for ns in namespaces:
      _, err = self._RunKubectl(['get', 'namespace', ns], None)
      if err:
        if 'NotFound' in err:
          raise exceptions.Error('Namespace {} does not exist: {}'.format(
              ns, err))
        raise exceptions.Error(
            'Failed to check if namespace {} exists: {}'.format(ns, err))
    return True

  def GetNamespace(self, namespace):
    """Get the YAML output of the specified namespace."""
    out, err = self._RunKubectl([
        'get', 'namespace', namespace, '-o', 'yaml'], None)
    if err:
      raise exceptions.Error(
          'Error retrieving Namespace {}: {}'.format(namespace, err))
    return out

  def GetMembershipCR(self):
    """Get the YAML output of the Membership CR."""
    if not self._MembershipCRDExists():
      raise ClusterError(
          'Membership CRD is not found in the cluster. Please make sure your '
          'cluster is registered and retry.')

    out, err = self._RunKubectl(
        ['get',
         'memberships.hub.gke.io',
         'membership', '-o', 'yaml'], None)
    if err:
      if 'NotFound' in err:
        raise ClusterError(
            'The specified cluster is not registered to a fleet. '
            'Please make sure your cluster is registered and retry.')
      raise exceptions.Error(
          'Error retrieving the Membership CR: {}'.format(err))
    return out

  def _MembershipCRDExists(self):
    """Verifies if GKE Hub membership CRD exists."""
    _, err = self._RunKubectl(
        ['get',
         'customresourcedefinitions.v1.apiextensions.k8s.io',
         'memberships.hub.gke.io'], None)
    if err:
      if 'NotFound' in err:
        return False
      raise exceptions.Error(
          'Error retrieving the Membership CRD: {}'.format(err))
    return True

  def GetIdentityProviderCR(self):
    """Get the YAML output of the IdentityProvider CR."""
    if not self._IdentityProviderCRDExists():
      raise ClusterError(
          'IdentityProvider CRD is not found in the cluster. Please install '
          'Anthos Service Mesh with VM support and retry.')

    out, err = self._RunKubectl(
        ['get',
         'identityproviders.security.cloud.google.com',
         'google', '-o', 'yaml'], None)
    if err:
      if 'NotFound' in err:
        raise ClusterError(
            'GCE identity provider is not found in the cluster. '
            'Please install Anthos Service Mesh with VM support.')
      raise exceptions.Error(
          'Error retrieving IdentityProvider google in default namespace: {}'
          .format(err))
    return out

  def _IdentityProviderCRDExists(self):
    """Verifies if Identity Provider CRD exists."""
    _, err = self._RunKubectl(
        ['get',
         'customresourcedefinitions.v1.apiextensions.k8s.io',
         'identityproviders.security.cloud.google.com'], None)
    if err:
      if 'NotFound' in err:
        return False
      raise exceptions.Error(
          'Error retrieving the Identity Provider CRD: {}'.format(err))
    return True

  def GetWorkloadGroupCR(self, workload_namespace, workload_name):
    """Get the YAML output of the specified WorkloadGroup CR."""
    if not self._WorkloadGroupCRDExists():
      raise ClusterError(
          'WorkloadGroup CRD is not found in the cluster. Please install '
          'Anthos Service Mesh and retry.')

    out, err = self._RunKubectl([
        'get', 'workloadgroups.networking.istio.io', workload_name, '-n',
        workload_namespace, '-o', 'yaml'
    ], None)
    if err:
      if 'NotFound' in err:
        raise WorkloadError(
            'WorkloadGroup {} in namespace {} is not found in the '
            'cluster. Please create the WorkloadGroup and retry.'.format(
                workload_name, workload_namespace))
      raise exceptions.Error(
          'Error retrieving WorkloadGroup {} in namespace {}: {}'.format(
              workload_name, workload_namespace, err))
    return out

  def _WorkloadGroupCRDExists(self):
    """Verifies if WorkloadGroup CRD exists."""
    _, err = self._RunKubectl(
        ['get',
         'customresourcedefinitions.v1.apiextensions.k8s.io',
         'workloadgroups.networking.istio.io'], None)
    if err:
      if 'NotFound' in err:
        return False
      raise exceptions.Error(
          'Error retrieving the WorkloadGroup CRD: {}'.format(err))
    return True

  def ExpansionGatewayDeploymentExists(self):
    """Verifies if the ASM Expansion Gateway deployment exists."""
    _, err = self._RunKubectl(
        ['get', 'deploy', _EXPANSION_GATEWAY_NAME, '-n', 'istio-system'], None)
    if err:
      if 'NotFound' in err:
        return False
      raise exceptions.Error(
          'Error retrieving the expansion gateway deployment: {}'.format(err))
    return True

  def ExpansionGatewayServiceExists(self):
    """Verifies if the ASM Expansion Gateway service exists."""
    _, err = self._RunKubectl(
        ['get', 'service', _EXPANSION_GATEWAY_NAME, '-n', 'istio-system'], None)
    if err:
      if 'NotFound' in err:
        return False
      raise exceptions.Error(
          'Error retrieving the expansion gateway service: {}'.format(err))
    return True

  def RetrieveExpansionGatewayIP(self):
    """Retrieves the expansion gateway IP from the cluster."""
    if not self.ExpansionGatewayDeploymentExists():
      raise ClusterError(
          'The gateway {} deployment is not found in the cluster. Please '
          'install Anthos Service Mesh with VM support and retry.'.format(
              _EXPANSION_GATEWAY_NAME))

    if not self.ExpansionGatewayServiceExists():
      raise ClusterError(
          'The gateway {} service is not found in the cluster. Please '
          'install Anthos Service Mesh with VM support and retry.'.format(
              _EXPANSION_GATEWAY_NAME))

    out, err = self._RunKubectl([
        'get', 'svc', _EXPANSION_GATEWAY_NAME, '-n', 'istio-system', '-o',
        'jsonpath={.status.loadBalancer.ingress[0].ip}'
    ], None)
    if err:
      raise exceptions.Error(
          'Error retrieving expansion gateway IP: {}'.format(err))
    return out

  def RetrieveKubernetesRootCert(self):
    """Retrieves the root cert from the cluster."""
    out, err = self._RunKubectl(
        ['get', 'configmap', 'kube-root-ca.crt', '-o',
         r'jsonpath="{.data.ca\.crt}"'], None)
    if err:
      if 'NotFound' in err:
        raise ClusterError(
            'Cluster root certificate is not found.')
      raise exceptions.Error(
          'Error retrieving Kubernetes root cert: {}'.format(err))
    return out.strip('\"')

  def RetrieveASMVersion(self, revision):
    """Retrieves the version of ASM."""
    image, err = self._RunKubectl([
        'get', 'deploy', '-l', 'istio.io/rev={},app=istiod'.format(revision),
        '-n', 'istio-system', '-o',
        'jsonpath="{.items[0].spec.template.spec.containers[0].image}"'
    ], None)
    if err:
      if 'NotFound' in err:
        raise ClusterError(
            'Anthos Service Mesh revision {} is not found in '
            'the cluster. Please install Anthos Service Mesh and '
            'try again.'.format(revision))
      raise exceptions.Error(
          'Error retrieving the version of Anthos Service Mesh: {}'.format(err))

    if not image:
      raise ClusterError('Anthos Service Mesh revision {} does not have an '
                         'image property. Please re-install Anthos Service '
                         'Mesh.'.format(revision))

    version_match = re.search(_ASM_VERSION_PATTERN, image)
    if version_match:
      return version_match.group(1)
    raise exceptions.Error(
        'Value image: {} is invalid.'.format(image))

  def RetrieveEnvConfig(self, revision):
    """Retrieves the env-* config map for MCP."""
    if revision == 'default':
      env_config_name = 'env'
    else:
      env_config_name = 'env-{}'.format(revision)

    out, err = self._RunKubectl(
        ['get', 'configmap', env_config_name, '-n', 'istio-system',
         '-o', 'jsonpath={.data}'], None)
    if err:
      if 'NotFound' in err:
        raise ClusterError(
            'Managed Control Plane revision {} is not found in '
            'the cluster. Please install Managed Control Plane and '
            'try again.'.format(revision))
      raise exceptions.Error(
          'Error retrieving the config map {} from the cluster: {}'.format(
              env_config_name, err))

    try:
      env_config = yaml.load(out)
    except yaml.Error:
      raise exceptions.Error(
          'Invalid config map from the cluster: {}'.format(out))

    return env_config

  def RetrieveMeshConfig(self, revision):
    """Retrieves the MeshConfig for the ASM revision."""
    if revision == 'default':
      mesh_config_name = 'istio'
    else:
      mesh_config_name = 'istio-{}'.format(revision)

    out, err = self._RunKubectl(
        ['get', 'configmap', mesh_config_name, '-n', 'istio-system',
         '-o', 'jsonpath={.data.mesh}'], None)
    if err:
      if 'NotFound' in err:
        raise ClusterError(
            'Anthos Service Mesh revision {} is not found in '
            'the cluster. Please install Anthos Service Mesh and '
            'try again.'.format(revision))
      raise exceptions.Error(
          'Error retrieving the mesh config from the cluster: {}'.format(
              err))

    try:
      mesh_config = yaml.load(out)
    except yaml.Error:
      raise exceptions.Error(
          'Invalid mesh config from the cluster: {}'.format(out))

    return mesh_config

  def RetrieveMutatingWebhookURL(self, revision):
    """Retrieves the Mutating Webhook URL used for a revision."""
    # There are two patterns of mutating webhook name in ASM. We check both in
    # case they are being used interchaeably.
    if revision == 'default':
      incluster_webhook = _INCLUSTER_WEBHOOK_PREFIX
    else:
      incluster_webhook = '{}-{}'.format(_INCLUSTER_WEBHOOK_PREFIX, revision)

    mcp_webhook = '{}-{}'.format(_MCP_WEBHOOK_PREFIX, revision)

    incluster_out, incluster_err = self._RunKubectl(
        ['get', 'mutatingwebhookconfiguration', incluster_webhook,
         '-o', 'jsonpath={.webhooks[0].clientConfig.url}'], None)
    mcp_out, mcp_err = self._RunKubectl(
        ['get', 'mutatingwebhookconfiguration', mcp_webhook,
         '-o', 'jsonpath={.webhooks[0].clientConfig.url}'], None)

    if incluster_err and mcp_err:
      if 'NotFound' in incluster_err and 'NotFound' in mcp_err:
        raise ClusterError(
            'Anthos Service Mesh revision {} is not found in '
            'the cluster. Please install Anthos Service Mesh and '
            'try again.'.format(revision))
      raise exceptions.Error(
          'Error retrieving the mutating webhook configuration from the cluster'
          ': {}'.format(incluster_err))

    if incluster_out:
      return incluster_out
    return mcp_out

  def _RunKubectl(self, args, stdin=None):
    """Runs a kubectl command with the cluster referenced by this client.

    Args:
      args: command line arguments to pass to kubectl
      stdin: text to be passed to kubectl via stdin

    Returns:
      The contents of stdout if the return code is 0, stderr (or a fabricated
      error if stderr is empty) otherwise
    """
    cmd = [c_util.CheckKubectlInstalled()]
    if self.context:
      cmd.extend(['--context', self.context])

    if self.kubeconfig:
      cmd.extend(['--kubeconfig', self.kubeconfig])

    cmd.extend(['--request-timeout', self.kubectl_timeout])
    cmd.extend(args)
    out = io.StringIO()
    err = io.StringIO()
    returncode = execution_utils.Exec(
        cmd, no_exit=True, out_func=out.write, err_func=err.write, in_str=stdin
    )

    if returncode != 0 and not err.getvalue():
      err.write('kubectl exited with return code {}'.format(returncode))

    return out.getvalue() if returncode == 0 else None, err.getvalue(
    ) if returncode != 0 else None


def _GetNestedKeyFromManifest(manifest, *keys):
  """Get the value of a key path from a dict.

  Args:
    manifest: the dict representation of a manifest
    *keys: an ordered list of items in the nested key

  Returns:
    The value of the nested key in the manifest. None, if the nested key does
    not exist.
  """
  for key in keys:
    if not isinstance(manifest, dict):
      return None
    try:
      manifest = manifest[key]
    except KeyError:
      return None
  return manifest


class PermissionsError(exceptions.Error):
  """Class for errors raised when verifying permissions."""


class ClusterError(exceptions.Error):
  """Class for errors raised when verifying cluster setup."""


class WorkloadError(exceptions.Error):
  """Class for errors raised when verifying workload setup."""