HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/394/lib/googlecloudsdk/command_lib/container/fleet/rbac_util.py
# -*- coding: utf-8 -*- #
# Copyright 2022 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utils for GKE Connect generate gateway RBAC policy files."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

import os
import re

from googlecloudsdk.api_lib.cloudresourcemanager import projects_api
from googlecloudsdk.command_lib.container.fleet import format_util
from googlecloudsdk.command_lib.container.fleet import invalid_args_error
from googlecloudsdk.command_lib.container.fleet import util as hub_util
from googlecloudsdk.command_lib.container.fleet.memberships import errors as memberships_errors
from googlecloudsdk.command_lib.projects import util as projects_util
from googlecloudsdk.core import log

CLUSTER_ROLE = 'clusterrole'
NAMESPACE_ROLE = 'role'
ANTHOS_SUPPORT_USER = 'service-{project_number}@gcp-sa-{instance_name}anthossupport.iam.gserviceaccount.com'
IMPERSONATE_POLICY_FORMAT = """\
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: {impersonate_metadata_name}
  labels:
    connect.gke.io/owner-feature: connect-gateway
rules:
- apiGroups:
  - ""
  resourceNames:{user_account}
  resources:
  - users
  verbs:
  - impersonate
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: {impersonate_metadata_name}
  labels:
    connect.gke.io/owner-feature: connect-gateway
roleRef:
  kind: ClusterRole
  name: {impersonate_metadata_name}
  apiGroup: rbac.authorization.k8s.io
subjects:
- kind: ServiceAccount
  name: connect-agent-sa
  namespace: gke-connect
"""
PERMISSION_POLICY_CLUSTER_FORMAT = """\
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: {permission_metadata_name}
  labels:
    connect.gke.io/owner-feature: connect-gateway
subjects:{users}
roleRef:
  kind: ClusterRole
  name: {permission}
  apiGroup: rbac.authorization.k8s.io
---
"""
PERMISSION_POLICY_NAMESPACE_FORMAT = """\
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: {permission_metadata_name}
  labels:
    connect.gke.io/owner-feature: connect-gateway
  namespace: {namespace}
subjects:{users}
roleRef:
  kind: Role
  name: {permission}
  apiGroup: rbac.authorization.k8s.io
---
"""
PERMISSION_POLICY_ANTHOS_SUPPORT_FORMAT = """\
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: anthos-support-reader
  labels:
    connect.gke.io/owner-feature: connect-gateway
rules:
- apiGroups: ["acme.cert-manager.io"]
  resources: ["challenges", "orders"]
  verbs: ["get", "list", "watch"]
- apiGroups: ["addon.baremetal.cluster.gke.io"]
  resources: ["addonmanifests", "addonoverrides", "addons", "addonsets", "addonsettemplates"]
  verbs: ["get", "list", "watch"]
- apiGroups: ["addons.gke.io"]
  resources: ["metricsserver", "monitoring", "stackdrivers"]
  verbs: ["get", "list", "watch"]
- apiGroups: ["admissionregistration.k8s.io"]
  resources: ["mutatingwebhookconfigurations", "validatingwebhookconfigurations"]
  verbs: ["get", "list", "watch"]
- apiGroups: ["anthos.gke.io"]
  resources: ["entitlements"]
  verbs: ["get", "list", "watch"]
- apiGroups: ["apiextensions.k8s.io"]
  resources: ["customresourcedefinitions"]
  verbs: ["get", "list", "watch"]
- apiGroups: ["apiregistration.k8s.io"]
  resources: ["apiservices"]
  verbs: ["get", "list", "watch"]
- apiGroups: ["apiserver.k8s.io"]
  resources: ["flowschemas", "prioritylevelconfigurations"]
  verbs: ["get", "list", "watch"]
- apiGroups: ["apps"]
  resources: ["controllerrevisions", "daemonsets", "deployments", "replicasets", "statefulset"]
  verbs: ["get", "list", "watch"]
- apiGroups: ["apps.k8s.io"]
  resources: ["applications"]
  verbs: ["get", "list", "watch"]
- apiGroups: ["baremetal.cluster.gke.io"]
  resources: ["addonconfigurations", "clustercidrconfigs", "clustercredentials", "clustermanifestdeployments", "clusters", "inventorymachines", "machineclasses", "machinecredentials", "machines", "nodepools"]
  verbs: ["get", "list", "watch"]
- apiGroups: ["batch"]
  resources: ["cronjobs", "jobs"]
  verbs: ["get", "list", "watch"]
- apiGroups: ["bootstrap.cluster.x-k8s.io"]
  resources: ["kubeadmconfigs", "kubeadmconfigtemplates"]
  verbs: ["get", "list", "watch"]
- apiGroups: ["bundle.gke.io"]
  resources: ["bundlebuilders", "bundles", "clusterbundles", "componentbuilders", "componentlists", "components", "componentsets", "gkeonprembundles", "packagedeploymentclasses", "packagedeployments", "patchtemplatebuilders", "patchtemplates", "requirements"]
  verbs: ["get", "list", "watch"]
- apiGroups: ["bundleext.gke.io"]
  resources: ["nodeconfigs"]
  verbs: ["get", "list", "watch"]
- apiGroups: ["certificates.k8s.io"]
  resources: ["certificatesigningrequests"]
  verbs: ["get", "list", "watch"]
- apiGroups: ["cert-manager.io"]
  resources: ["certificaterequests", "certificates", "clusterissuers", "issuers"]
  verbs: ["get", "list", "watch"]
- apiGroups: ["cilium.io"]
  resources: ["ciliumnodes", "ciliumendpoints", "ciliumidentities"]
  verbs: ["get", "list", "watch"]
- apiGroups: ["configmanagement.gke.io"]
  resources: ["configmanagements"]
  verbs: ["get", "list", "watch"]
- apiGroups: ["coordination.k8s.io"]
  resources: ["leases"]
  verbs: ["get", "list", "watch"]
- apiGroups: ["cluster.k8s.io"]
  resources: ["clusters", "controlplanes", "machineclasses", "machinedeployments", "machines", "machinesets"]
  verbs: ["get", "list", "watch"]
- apiGroups: ["cluster.x-k8s.io"]
  resources: ["clusters", "controlplanes", "machineclasses", "machinedeployments", "machines", "machinesets"]
  verbs: ["get", "list", "watch"]
- apiGroups: ["crd.projectcalico.org"]
  resources: ["bgpconfigurations", "bgppeers", "blockaffinities", "clusterinformations", "felixconfigurations", "globalnetworkpolicies", "globalnetworksets", "hostendpoints", "ipamblocks", "ipamconfigs", "ipamhandles", "ippools", "networkpolicies", "networksets"]
  verbs: ["get", "list", "watch"]
- apiGroups: ["discovery.k8s.io"]
  resources: ["endpointslices"]
  verbs: ["get", "list", "watch"]
- apiGroups: ["infrastructure.baremetal.cluster.gke.io"]
  resources: ["baremetalclusters", "baremetalmachines"]
  verbs: ["get", "list", "watch"]
- apiGroups: ["networking.gke.io"]
  resources: ["bgpadvertisedroutes", "bgploadbalancers", "bgppeers", "bgpreceivedroutes", "bgpsessions", "clustercidrconfigs", "clusterdns", "egressnatpolicies"]
  verbs: ["get", "list", "watch"]
- apiGroups: ["networking.k8s.io"]
  resources: ["ingressclasses", "ingresses"]
  verbs: ["get", "list", "watch"]
- apiGroups: ["node.k8s.io"]
  resources: ["runtimeclasses"]
  verbs: ["get", "list", "watch"]
- apiGroups: ["onprem.cluster.gke.io"]
  resources: ["onpremadminclusters", "onpremnodepools", "onpremuserclusters", "validations", "onpremplatforms", "onprembundles", "clusterstates"]
  verbs: ["get", "list", "watch"]
- apiGroups: ["policy"]
  resources: ["poddisruptionbudgets", "podsecuritypolicies"]
  verbs: ["get", "list", "watch"]
- apiGroups: ["rbac.authorization.k8s.io"]
  resources: ["clusterroles", "clusterrolebindings", "roles", "rolebindings"]
  verbs: ["get", "list", "watch"]
- apiGroups: ["storage.k8s.io"]
  resources: ["csidrivers", "csinodes", "csistoragecapacities", "storageclasses", "volumeattachments"]
  verbs: ["get", "list", "watch"]
- apiGroups: ["sriovnetwork.k8s.cni.cncf.io"]
  resources: ["sriovnetworknodepolicies", "sriovnetworknodestates", "sriovoperatorconfigs"]
  verbs: ["get", "list", "watch"]
- apiGroups: ["vm.cluster.gke.io"]
  resources: ["gpuallocation", "guestenvironmentdata", "virtualmachineaccessrequest", "virtualmachinedisk", "virtualmachinepasswordresetrequest", "virtualmachinetype", "vmhighavailabilitypolicy", "vmruntime"]
  verbs: ["get", "list", "watch"]
- apiGroups: ["vsphereproviderconfig.k8s.io"]
  resources: ["vsphereclusterproviderconfigs", "vspheremachineproviderconfigs"]
  verbs: ["get", "list", "watch"]
- apiGroups:
  - '*'
  resources: ["componentstatuses", "configmaps", "endpoints", "events", "horizontalpodautoscalers", "limitranges", "namespaces", "nodes", "persistentvolumeclaims", "persistentvolumes", "pods", "pods/log", "podtemplates", "replicationcontrollers", "resourcequotas", "serviceaccounts", "services"]
  verbs: ["get", "list", "watch"]
- nonResourceURLs:
  - '*'
  verbs: ["get"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: {anthos_metadata_name}
  labels:
    connect.gke.io/owner-feature: connect-gateway
subjects:{users}
roleRef:
  kind: ClusterRole
  name: anthos-support-reader
  apiGroup: rbac.authorization.k8s.io
---
"""


def ValidateRole(role):
  """Validation for the role in correct format."""
  cluster_pattern = re.compile('^clusterrole/')
  namespace_pattern = re.compile('^role/')
  if cluster_pattern.match(role.lower()):
    log.status.Print('Specified Cluster Role is:', role)
    if len(role.split('/')) != 2:
      raise invalid_args_error.InvalidArgsError(
          'Cluster role is not specified in correct format. Please specify the '
          'cluster role as: clusterrole/cluster-permission.'
      )
  elif namespace_pattern.match(role.lower()):
    log.status.Print('Specified Namespace Role is:', role)
    if len(role.split('/')) != 3:
      raise invalid_args_error.InvalidArgsError(
          'Namespace role is not specified in correct format. Please specify '
          'the namespace role as: role/namespace/namespace-permission'
      )
  else:
    raise invalid_args_error.InvalidArgsError(
        'The required role is not a cluster role or a namespace role.'
    )


def ValidateArgs(args):
  """Validate Args in correct format."""
  # Validate the confliction between '--anthos-support' and '--role'.
  if not args.revoke and (
      (args.anthos_support and args.role)
      or (not args.anthos_support and not args.role)
  ):
    raise invalid_args_error.InvalidArgsError(
        'Please specify either --role or --anthos-support in the flags.'
    )
  if args.role:
    ValidateRole(args.role)
  # Validate either users/anthos-support or groups is set.
  if not args.users and not args.groups and not args.anthos_support:
    raise invalid_args_error.InvalidArgsError(
        'Please specify --groups: (--anthos-support --users) in flags.'
    )
  # Validate required flags when apply RBAC policy to cluster.
  if args.apply:
    if not args.membership:
      raise invalid_args_error.InvalidArgsError(
          'Please specify the --membership in flags.'
      )
    if not args.kubeconfig:
      raise invalid_args_error.InvalidArgsError(
          'Please specify the --kubeconfig in flags.'
      )
    if not args.context:
      raise invalid_args_error.InvalidArgsError(
          'Please specify the --context in flags.'
      )
  if args.revoke and args.apply:
    # Validate confliction between --apply and --revoke.
    raise invalid_args_error.InvalidArgsError(
        'Please specify either --apply or --revoke in flags.'
    )
  if args.revoke:
    # Validate required flags when revoke RBAC policy for specified user from
    # from cluster.
    if not args.membership:
      raise invalid_args_error.InvalidArgsError(
          'Please specify the --membership in flags.'
      )
    if not args.kubeconfig:
      raise invalid_args_error.InvalidArgsError(
          'Please specify the --kubeconfig in flags.'
      )
    if not args.context:
      raise invalid_args_error.InvalidArgsError(
          'Please specify the --context in flags.'
      )


def GetAnthosSupportUser(project_id):
  """Get P4SA account name for Anthos Support user."""
  project_number = projects_api.Get(
      projects_util.ParseProject(project_id)
  ).projectNumber
  hub_endpoint_override = hub_util.APIEndpoint()
  if hub_endpoint_override == hub_util.PROD_API:
    return ANTHOS_SUPPORT_USER.format(
        project_number=project_number, instance_name=''
    )
  elif hub_endpoint_override == hub_util.STAGING_API:
    return ANTHOS_SUPPORT_USER.format(
        project_number=project_number, instance_name='staging-'
    )
  elif hub_endpoint_override == hub_util.AUTOPUSH_API:
    return ANTHOS_SUPPORT_USER.format(
        project_number=project_number, instance_name='autopush-'
    )
  else:
    raise memberships_errors.UnknownApiEndpointOverrideError('gkehub')


def GenerateRBAC(args, project_id):
  """Returns the generated RBAC policy file with args provided."""
  generated_rbac = {}
  cluster_pattern = re.compile('^clusterrole/')
  namespace_pattern = re.compile('^role/')
  role_permission = ''
  rbac_policy_format = ''
  namespace = ''
  users_list = list()
  groups_list = list()

  if args.anthos_support:
    rbac_policy_format = (
        IMPERSONATE_POLICY_FORMAT + PERMISSION_POLICY_ANTHOS_SUPPORT_FORMAT
    )
  elif cluster_pattern.match(args.role.lower()):
    role_permission = args.role.split('/')[1]
    if args.users:
      rbac_policy_format = (
          IMPERSONATE_POLICY_FORMAT + PERMISSION_POLICY_CLUSTER_FORMAT
      )
    elif args.groups:
      rbac_policy_format = PERMISSION_POLICY_CLUSTER_FORMAT
  elif namespace_pattern.match(args.role.lower()):
    namespace = args.role.split('/')[1]
    role_permission = args.role.split('/')[2]
    if args.users:
      rbac_policy_format = (
          IMPERSONATE_POLICY_FORMAT + PERMISSION_POLICY_NAMESPACE_FORMAT
      )
    elif args.groups:
      rbac_policy_format = PERMISSION_POLICY_NAMESPACE_FORMAT
  else:
    raise invalid_args_error.InvalidArgsError(
        'Invalid flags, please specify either the --role or --anthos-support in'
        'your flags.'
    )

  if args.users:
    users_list = args.users.split(',')
  elif args.anthos_support:
    users_list.append(GetAnthosSupportUser(project_id))
  for user in users_list:
    impersonate_users = os.linesep + '  - {user}'.format(user=user)
    permission_users = os.linesep + '- kind: User'
    permission_users += os.linesep + '  name: {user}'.format(user=user)
    impersonate_metadata_name = format_util.RbacPolicyName(
        'impersonate', project_id, args.membership, user, True
    )
    permission_metadata_name = format_util.RbacPolicyName(
        'permission', project_id, args.membership, user, True
    )
    anthos_metadata_name = format_util.RbacPolicyName(
        'anthos', project_id, args.membership, user, True
    )

    # Assign value to the RBAC file templates.
    single_generated_rbac = rbac_policy_format.format(
        impersonate_metadata_name=impersonate_metadata_name,
        permission_metadata_name=permission_metadata_name,
        anthos_metadata_name=anthos_metadata_name,
        namespace=namespace,
        user_account=impersonate_users,
        users=permission_users,
        permission=role_permission,
    )
    # Map the user and is_user to the generated RBAC policy.
    generated_rbac[(user, True)] = single_generated_rbac

  if args.groups:
    groups_list = args.groups.split(',')
  for group in groups_list:
    permission_users = os.linesep + '- kind: Group'
    permission_users += os.linesep + '  name: {group}'.format(group=group)
    permission_metadata_name = format_util.RbacPolicyName(
        'permission', project_id, args.membership, group, False
    )

    # Assign value to the RBAC file templates.
    single_generated_rbac = rbac_policy_format.format(
        permission_metadata_name=permission_metadata_name,
        namespace=namespace,
        users=permission_users,
        permission=role_permission,
    )
    # Map the group and is_user to the generated RBAC policy.
    generated_rbac[(group, False)] = single_generated_rbac

  return generated_rbac