HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/396/lib/googlecloudsdk/command_lib/container/fleet/scopes/util.py
# -*- coding: utf-8 -*- #
# Copyright 2022 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utils for Fleet scopes commands."""
from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

from apitools.base.py import encoding
from googlecloudsdk.api_lib.container.fleet import client
from googlecloudsdk.command_lib.util.args import labels_util
from googlecloudsdk.core import exceptions


class ScopeLogViewCondition:
  """Helper class for creating a scope log view iam condition.

  This class defines a `get` object method that can be used by the iam util
  lib to get the iam condition spec.
  """

  def __init__(self, project_id, scope_id):
    self.project_id = project_id
    self.scope_id = scope_id

  # The condition should be iterable.
  def __iter__(self):
    return self

  def __next__(self):
    # There is only one condition.
    raise StopIteration

  def IsSpecified(self):
    return True

  def get(self, condition_spec):  # pylint: disable=invalid-name
    # This method is called by the iam util lib.
    if condition_spec == 'title':
      return 'conditional log view access'
    if condition_spec == 'description':
      return 'log view access for scope {}'.format(self.scope_id)
    if condition_spec == 'expression':
      return (
          'resource.name =='
          f' "projects/{self.project_id}/locations/global/buckets/fleet-o11y-scope-{self.scope_id}/views/fleet-o11y-scope-{self.scope_id}-k8s_container"'
          ' || resource.name =='
          f' "projects/{self.project_id}/locations/global/buckets/fleet-o11y-scope-{self.scope_id}/views/fleet-o11y-scope-{self.scope_id}-k8s_pod"'
      )


class AppOperatorBinding:
  """Helper class for containing a principal with their project-level IAM role, fleet scope-level role, and fleet scope RBAC role.
  """

  def __init__(self, principal, overall_role, scope_rrb_role, scope_iam_role, project_iam_role, log_view_access):
    # The principal in the IAM format, e.g., "user:person@google.com".
    self.principal = principal
    # Overall role can be "view", "edit", or "admin" if the IAM and RBAC roles
    # are known and consistent. If inconsistent, it will be "custom". Otherwise,
    # it will be "unknown".
    self.overall_role = overall_role
    # Scope RBAC role can be "view", "edit", "admin", "not found", or
    # "permission denied".
    self.scope_rrb_role = scope_rrb_role
    # Scope-level IAM role can be "roles/gkehub.scopeViewer",
    # "roles/gkehub.scopeEditor", "roles/gkehub.scopeAdmin", "not found", or
    # "permission denied".
    self.scope_iam_role = scope_iam_role
    # Project-level IAM role can be "roles/gkehub.scopeViewerProjectLevel",
    # "roles/gkehub.scopeEditorProjectLevel", "not found", or
    # "permission denied".
    self.project_iam_role = project_iam_role
    # Log view access can be "granted", "not found", or "permission denied".
    self.log_view_access = log_view_access


def SetParentCollection(ref, args, request):
  """Set parent collection with location for created resources.

  Args:
    ref: reference to the scope object.
    args: command line arguments.
    request: API request to be issued

  Returns:
    modified request
  """
  del ref, args  # Unused.
  request.parent = request.parent + '/locations/-'
  return request


def CheckUpdateArguments(ref, args, request):
  del ref, args  # Unused.
  if request.updateMask is None or not request.updateMask:
    request.updateMask = 'name'
  return request


def HandleNamespaceLabelsUpdateRequest(ref, args):
  """Add namespace labels to update request.

  Args:
    ref: reference to the scope object.
    args: command line arguments.

  Returns:
    response

  """
  mask = []
  release_track = args.calliope_command.ReleaseTrack()
  fleetclient = client.FleetClient(release_track)

  labels_diff = labels_util.Diff.FromUpdateArgs(args)
  namespace_labels_diff = labels_util.Diff(
      args.update_namespace_labels,
      args.remove_namespace_labels,
      args.clear_namespace_labels,
  )

  current_scope = fleetclient.GetScope(ref.RelativeName())

  # update GCP labels for namespace resource
  new_labels = labels_diff.Apply(
      fleetclient.messages.Scope.LabelsValue, current_scope.labels
  ).GetOrNone()
  if new_labels:
    mask.append('labels')

  # add cluster namespace level labels to resource
  new_namespace_labels = namespace_labels_diff.Apply(
      fleetclient.messages.Scope.NamespaceLabelsValue,
      current_scope.namespaceLabels,
  ).GetOrNone()
  if new_namespace_labels:
    mask.append('namespace_labels')

  # if there are no fields to update, don't make update api call
  if not mask:
    response = fleetclient.messages.Scope(name=ref.RelativeName())
    return response

  return fleetclient.UpdateScope(
      ref.RelativeName(), new_labels, new_namespace_labels, ','.join(mask)
  )


def HandleNamespaceLabelsCreateRequest(ref, args, request):
  """Add namespace labels to create request.

  Args:
    ref: reference to the scope object.
    args: command line arguments.
    request: API request to be issued

  Returns:
    modified request

  """
  del ref
  release_track = args.calliope_command.ReleaseTrack()
  fleetclient = client.FleetClient(release_track)
  namespace_labels_diff = labels_util.Diff(additions=args.namespace_labels)
  ns_labels = namespace_labels_diff.Apply(
      fleetclient.messages.Scope.NamespaceLabelsValue, None
  ).GetOrNone()
  request.scope.namespaceLabels = ns_labels
  return request


def IamMemberFromRbac(user, group):
  """Returns Iam member for the specified RBAC user/group.

  Args:
    user: user email, principal or None
    group: group email, principal set or None

  Returns:
    an Iam member, e.g., "user:person@google.com" or "group:people@google.com"

  Raises:
    a core.Error, if both user and group are None
  """
  if user:
    if user.startswith('principal://'):
      return user
    if user.endswith('gserviceaccount.com'):
      return 'serviceAccount:' + user
    return 'user:' + user
  if group:
    if group.startswith('principalSet://'):
      return group
    return 'group:' + group
  raise exceptions.Error(
      'User or group is required in the args.'
  )


def IamScopeLevelScopeRoleFromRbac(role):
  """Returns Iam scope role (scope-level) based on the specified RBAC role.

  Args:
    role: RBAC role

  Returns:
    a scope-related Iam role, e.g., "roles/gkehub.scopeEditor"

  Raises:
    a core.Error, if the role is not admin, edit, or view
  """
  if role == 'admin':
    return 'roles/gkehub.scopeAdmin'
  elif role == 'edit':
    return 'roles/gkehub.scopeEditor'
  elif role == 'view':
    return 'roles/gkehub.scopeViewer'
  elif role:
    return 'roles/gkehub.scopeViewer'
  raise exceptions.Error(
      'Role is required to be admin, edit, view or a custom role.'
  )


def AllIamScopeLevelScopeRoles():
  """Returns all valid Iam scope roles at scope level.
  """
  return [
      'roles/gkehub.scopeAdmin',
      'roles/gkehub.scopeEditor',
      'roles/gkehub.scopeViewer',
  ]


def IamProjectLevelScopeRoleFromRbac(role):
  """Returns Iam scope role (project-level) based on the specified RBAC role.

  Args:
    role: RBAC role

  Returns:
    a scope-related Iam role, e.g., "roles/gkehub.scopeEditorProjectLevel"

  Raises:
    a core.Error, if the role is not admin, edit, or view
  """
  if role == 'admin':
    # Admin needs the same project-level permissions as Editor.
    return 'roles/gkehub.scopeEditorProjectLevel'
  elif role == 'edit':
    return 'roles/gkehub.scopeEditorProjectLevel'
  elif role == 'view':
    return 'roles/gkehub.scopeViewerProjectLevel'
  elif role:
    # Custom role gives minimal editor project-level permissions.
    return 'roles/gkehub.scopeEditorProjectLevel'
  raise exceptions.Error(
      'Role is required to be admin, edit, or view.'
  )


def AllIamProjectLevelScopeRoles():
  """Returns all valid Iam scope roles at project level.
  """
  return [
      'roles/gkehub.scopeEditorProjectLevel',
      'roles/gkehub.scopeViewerProjectLevel',
  ]


def ScopeRbacRoleString(role):
  """Returns the RBAC role string from the specifiedRBAC role message.

  Args:
    role: RBAC role

  Returns:
    RBAC role string (admin, edit, or view)

  Raises:
    a core.Error, if the role is not admin, edit, or view
  """
  if role.customRole:
    return role.customRole
  elif str(encoding.MessageToPyValue(role)['predefinedRole']) == 'ADMIN':
    return 'admin'
  elif str(encoding.MessageToPyValue(role)['predefinedRole']) == 'EDIT':
    return 'edit'
  elif str(encoding.MessageToPyValue(role)['predefinedRole']) == 'VIEW':
    return 'view'
  raise exceptions.Error(
      'Role is required to be admin, edit, view or a custom role.'
  )


def RbacAndScopeIamRolesMatch(rbac_role, scope_iam_role):
  """Returns true if the specified RBAC role and scope IAM role match.
  """
  if rbac_role == 'admin' and scope_iam_role == 'roles/gkehub.scopeAdmin':
    return True
  if rbac_role == 'edit' and scope_iam_role == 'roles/gkehub.scopeEditor':
    return True
  return rbac_role == 'view' and scope_iam_role == 'roles/gkehub.scopeViewer'