HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/command_lib/pam/util.py
# -*- coding: utf-8 -*- #
# Copyright 2024 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Utility functions for `gcloud pam` commands."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import re

from googlecloudsdk.api_lib.util import apis
from googlecloudsdk.calliope import arg_parsers
from googlecloudsdk.calliope import base
from googlecloudsdk.core import log
from googlecloudsdk.core import yaml


def SetForceFieldInDeleteEntitlementRequest(unused_ref, unused_args, req):
  """Modify request hook to set the force field in delete entitlement requests to true."""
  req.force = True
  return req


def ParseEntitlementNameIntoCreateEntitlementRequest(unused_ref, args, req):
  """Modify request hook to parse the entitlement name into a CreateEntitlementRequest."""
  entitlement = args.CONCEPTS.entitlement.Parse()
  req.parent = entitlement.result.Parent().RelativeName()
  req.entitlementId = entitlement.result.Name()
  return req


def SetUpdateMaskInUpdateEntitlementRequest(unused_ref, unused_args, req):
  """Modify request hook to set the update mask field in update entitlement requests to '*'."""
  req.updateMask = '*'
  return req


def FormatWithdrawResponse(response, unused_args):
  """Formats the response of the withdraw command."""
  modified_response = {}
  if response.name:
    modified_response['name'] = response.name
  if not response.metadata:
    return modified_response
  modified_response['metadata'] = {}
  properties = response.metadata.additionalProperties
  for prop in properties:
    if prop.key in ('apiVersion', 'createTime', 'target'):
      modified_response['metadata'][prop.key] = prop.value.string_value

  log.status.Print(
      'Grant withdrawal initiated. The operation will complete in some time. To'
      ' track its status, run:\n`gcloud pam operations wait {}`\nNote that'
      ' the wait command requires you to have the'
      ' `privilegedaccessmanager.operations.get` permission on the resource.'
      .format(response.name)
  )
  return modified_response


def GetApiVersionFromArgs(args):
  """Return API version based on args.

  Args:
    args: The argparse namespace.

  Returns:
    API version (e.g. v1alpha or v1beta).
  """
  release_track = args.calliope_command.ReleaseTrack()
  if release_track == base.ReleaseTrack.ALPHA:
    return 'v1alpha'
  if release_track == base.ReleaseTrack.BETA:
    return 'v1beta'
  if release_track == base.ReleaseTrack.GA:
    return 'v1'


def SetRequestedPrivilegedAccessInCreateGrantRequest(unused_ref, args, req):
  """Modify request hook to populate the requestedPrivilegedAccess field in create grant requests."""
  if not args.requested_resources:
    return req
  messages = apis.GetMessagesModule(
      'privilegedaccessmanager', GetApiVersionFromArgs(args)
  )
  if len(args.requested_resources) > 1:
    raise arg_parsers.ArgumentTypeError(
        'Only one resource is supported for grant scope.'
    )
  for resource in args.requested_resources:
    resource = resource.strip()
    pattern = r'^(projects|organizations|folders)\/.+'
    components = resource.split('/')
    if not re.match(pattern, resource) or len(components) != 2:
      raise arg_parsers.ArgumentTypeError(
          'Invalid resource name: {}. Resource name must be of the form'
          ' (projects|organizations|folders)/<id>.'.format(resource)
      )
    resource_type = (
        'cloudresourcemanager.googleapis.com/' + components[0].capitalize()[:-1]
    )
    full_name = '//cloudresourcemanager.googleapis.com/' + resource
    requested_privileged_access = messages.RequestedPrivilegedAccess()
    requested_privileged_access.gcpIamAccess = (
        messages.RequestedPrivilegedAccessGcpIamAccess()
    )
    requested_privileged_access.gcpIamAccess.resourceType = resource_type
    requested_privileged_access.gcpIamAccess.resource = full_name
    req.grant.requestedPrivilegedAccess.append(requested_privileged_access)

  return req


def LoadGrantScopeFromYaml(stream):
  """Loads a YAML document from a stream.

  This function takes a stream (expected to be a list with a single string
  element) and parses it as a YAML document. It returns the loaded YAML data as
  a Python object (typically a list or dictionary).

  Args:
    stream: The stream to load from.

  Returns:
   The loaded YAML data.
  """
  if not stream or not stream[0]:
    # Return an empty list if no file is provided or the file is empty.
    return []
  return yaml.load(stream[0])


# TODO(b/261183749): Remove modify_request_hook when singleton resource args
# are enabled in declarative.
def UpdateSettingsResource(unused_ref, unused_args, req):
  """Modify request hook to update the resource field in settings requests."""
  req.name = req.name + '/settings'
  return req


def SetUpdateMaskInUpdateSettingsRequest(unused_ref, unused_args, req):
  """Modify request hook to set the update mask field in update settings requests to '*'."""
  req.updateMask = '*'
  return req