HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/command_lib/container/fleet/gateway.py
# -*- coding: utf-8 -*- #
# Copyright 2024 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilities for interacting with the Connect Gateway API."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

from typing import List, Union

from googlecloudsdk.api_lib.cloudresourcemanager import projects_api
from googlecloudsdk.api_lib.container import util as container_util
from googlecloudsdk.api_lib.container.fleet.connectgateway import client as gateway_client
from googlecloudsdk.api_lib.container.fleet.connectgateway import util as gateway_util
from googlecloudsdk.api_lib.services import enable_api
from googlecloudsdk.api_lib.util import apis
from googlecloudsdk.calliope import base
from googlecloudsdk.command_lib.container.fleet import api_util as hubapi_util
from googlecloudsdk.command_lib.container.fleet import base as hub_base
from googlecloudsdk.command_lib.container.fleet import gwkubeconfig_util as kconfig
from googlecloudsdk.command_lib.container.fleet import overrides
from googlecloudsdk.command_lib.container.fleet.memberships import errors as memberships_errors
from googlecloudsdk.command_lib.container.fleet.memberships import util
from googlecloudsdk.command_lib.projects import util as project_util
from googlecloudsdk.core import log
from googlecloudsdk.core import properties
from googlecloudsdk.core.util import platforms

KUBECONTEXT_FORMAT = 'connectgateway_{project}_{location}_{membership}'
SERVER_FORMAT = 'https://{service_name}/{version}/projects/{project_number}/locations/{location}/{collection}/{membership}'
REQUIRED_SERVER_PERMISSIONS = [
    'gkehub.gateway.get',
]
REQUIRED_CLIENT_PERMISSIONS = [
    'gkehub.memberships.get',
    'gkehub.gateway.get',
    'serviceusage.services.get',
]


class GetCredentialsCommand(hub_base.HubCommand, base.Command):
  """GetCredentialsCommand is a base class with util functions for Gateway credential generating commands."""

  # TODO(b/368039642): Remove once we're sure server-side generation is stable
  def RunGetCredentials(self, membership_id, arg_location, arg_namespace=None):
    container_util.CheckKubectlInstalled()
    project_id = hub_base.HubCommand.Project()

    log.status.Print('Starting to build Gateway kubeconfig...')
    log.status.Print('Current project_id: ' + project_id)

    self.RunIamCheck(project_id, REQUIRED_CLIENT_PERMISSIONS)
    try:
      hub_endpoint_override = properties.VALUES.api_endpoint_overrides.Property(
          'gkehub'
      ).Get()
    except properties.NoSuchPropertyError:
      hub_endpoint_override = None
    # API enablement is only done once per environment, regardless of which
    # region is being accessed.
    CheckGatewayApiEnablement(
        project_id,
        util.GetConnectGatewayServiceName(hub_endpoint_override, None),
    )

    membership = self.ReadClusterMembership(
        project_id, arg_location, membership_id
    )

    # Registered GKE clusters use a different URL scheme, having
    # `gkeMemberships/` rather than the standard `memberships/` resource type.
    collection = 'memberships'
    # Probers use GKE clusters to emulate attached clusters, and so must be
    # exempt from this.
    if project_id == 'gkeconnect-prober':
      pass
    elif (
        hasattr(membership, 'endpoint')
        and hasattr(membership.endpoint, 'gkeCluster')
        and membership.endpoint.gkeCluster
    ):
      collection = 'gkeMemberships'

    self.GenerateKubeconfig(
        util.GetConnectGatewayServiceName(hub_endpoint_override, arg_location),
        project_id,
        arg_location,
        collection,
        membership_id,
        arg_namespace,
    )
    msg = (
        'A new kubeconfig entry "'
        + self.KubeContext(
            project_id, arg_location, membership_id, arg_namespace
        )
        + '" has been generated and set as the current context.'
    )
    log.status.Print(msg)

  def RunServerSide(
      self,
      membership_id: str,
      arg_location: str,
      force_use_agent: bool = False,
      arg_namespace: Union[str, None] = None,
  ):
    """RunServerSide generates credentials using server-side kubeconfig generation.

    Args:
      membership_id: The short name of the membership to generate credentials
        for.
      arg_location: The location of the membership to generate credentials for.
      force_use_agent: Whether to force the use of Connect Agent in generated
        credentials.
      arg_namespace: The namespace to use in the kubeconfig context.
    """
    log.status.Print('Fetching Gateway kubeconfig...')
    container_util.CheckKubectlInstalled()
    project_id = hub_base.HubCommand.Project()
    project_number = hub_base.HubCommand.Project(number=True)

    # Ensure at the minimum that the user has gkehub.gateway.get. This is
    # because the user might have gkehub.gateway.generateCredentials but not
    # gkehub.gateway.get, which would lead to unclear errors when using kubectl.
    self.RunIamCheck(project_id, REQUIRED_SERVER_PERMISSIONS)

    operating_system = None
    if platforms.OperatingSystem.IsWindows():
      operating_system = gateway_util.WindowsOperatingSystem(
          self.ReleaseTrack()
      )

    with overrides.RegionalGatewayEndpoint(arg_location):
      client = gateway_client.GatewayClient(self.ReleaseTrack())
      resp = client.GenerateCredentials(
          name=f'projects/{project_number}/locations/{arg_location}/memberships/{membership_id}',
          force_use_agent=force_use_agent,
          kubernetes_namespace=arg_namespace,
          operating_system=operating_system,
      )

    new = kconfig.Kubeconfig.LoadFromBytes(resp.kubeconfig)
    kubeconfig = kconfig.Kubeconfig.Default()
    kubeconfig.Merge(new, overwrite=True)
    # The returned kubeconfig should only have one context.
    kubeconfig.SetCurrentContext(list(new.contexts.keys())[0])
    kubeconfig.SaveToFile()

    msg = (
        f'A new kubeconfig entry "{kubeconfig.current_context}" has been'
        ' generated and set as the current context.'
    )
    log.status.Print(msg)

  def KubeContext(self, project_id, location, membership, namespace=None):
    kc = KUBECONTEXT_FORMAT.format(
        project=project_id, location=location, membership=membership
    )
    if namespace:
      kc += '_ns-' + namespace
    return kc

  def RunIamCheck(self, project_id: str, permissions: List[str]):
    """Run an IAM check, making sure the caller has the necessary permissions to use the Gateway API."""
    project_ref = project_util.ParseProject(project_id)
    result = projects_api.TestIamPermissions(project_ref, permissions)
    granted_permissions = result.permissions

    if not set(permissions).issubset(set(granted_permissions)):
      raise memberships_errors.InsufficientPermissionsError()

  def ReadClusterMembership(self, project_id, location, membership):
    resource_name = hubapi_util.MembershipRef(project_id, location, membership)
    # If membership doesn't exist, exception will be raised to caller.
    return hubapi_util.GetMembership(resource_name)

  def GenerateKubeconfig(
      self,
      service_name,
      project_id,
      location,
      collection,
      membership,
      namespace=None,
  ):
    project_number = project_util.GetProjectNumber(project_id)
    kwargs = {
        'membership': membership,
        'location': location,
        'project_id': project_id,
        'server': SERVER_FORMAT.format(
            service_name=service_name,
            version=self.GetVersion(),
            project_number=project_number,
            location=location,
            collection=collection,
            membership=membership,
        ),
        'auth_provider': 'gcp',
    }
    user_kwargs = {
        'auth_provider': 'gcp',
    }

    cluster_kwargs = {}
    context = self.KubeContext(project_id, location, membership, namespace)
    cluster = self.KubeContext(project_id, location, membership)
    kubeconfig = kconfig.Kubeconfig.Default()
    # Use same key for context, cluster, and user.
    kubeconfig.contexts[context] = kconfig.Context(
        context, cluster, context, namespace
    )
    kubeconfig.users[context] = kconfig.User(context, **user_kwargs)
    kubeconfig.clusters[cluster] = kconfig.Cluster(
        cluster, kwargs['server'], **cluster_kwargs
    )
    kubeconfig.SetCurrentContext(context)
    kubeconfig.SaveToFile()
    return kubeconfig

  @classmethod
  def GetVersion(cls):
    if cls.ReleaseTrack() is base.ReleaseTrack.ALPHA:
      return 'v1alpha1'
    elif cls.ReleaseTrack() is base.ReleaseTrack.BETA:
      return 'v1beta1'
    elif cls.ReleaseTrack() is base.ReleaseTrack.GA:
      return 'v1'
    else:
      return ''


def CheckGatewayApiEnablement(project_id, service_name):
  """Checks if the Connect Gateway API is enabled for a given project.

  Prompts the user to enable the API if the API is not enabled. Defaults to
  "No". Throws an error if the user declines to enable the API.

  Args:
    project_id: The ID of the project on which to check/enable the API.
    service_name: The name of the service to check/enable the API.

  Raises:
    memberships_errors.ServiceNotEnabledError: if the user declines to attempt
      to enable the API.
    exceptions.GetServicesPermissionDeniedException: if a 403 or 404 error is
      returned by the Get request.
    apitools_exceptions.HttpError: Another miscellaneous error with the
      listing service.
    api_exceptions.HttpException: API not enabled error if the user chooses to
      not enable the API.
  """
  if not enable_api.IsServiceEnabled(project_id, service_name):
    try:
      apis.PromptToEnableApi(
          project_id,
          service_name,
          memberships_errors.ServiceNotEnabledError(
              'Connect Gateway API', service_name, project_id
          ),
      )
    except apis.apitools_exceptions.RequestError:
      # Since we are not actually calling the API, there is nothing to retry,
      # so this signal to retry can be ignored
      pass