HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/396/lib/googlecloudsdk/command_lib/storage/iam_command_util.py
# -*- coding: utf-8 -*- #
# Copyright 2022 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilities for storage surface IAM commands."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

from googlecloudsdk.api_lib.storage import cloud_api
from googlecloudsdk.api_lib.storage import gcs_iam_util
from googlecloudsdk.command_lib.iam import iam_util
from googlecloudsdk.command_lib.storage import errors
from googlecloudsdk.command_lib.storage import plurality_checkable_iterator
from googlecloudsdk.command_lib.storage import storage_url
from googlecloudsdk.command_lib.storage import wildcard_iterator
from googlecloudsdk.command_lib.storage.tasks import task
from googlecloudsdk.command_lib.storage.tasks import task_executor
from googlecloudsdk.command_lib.storage.tasks import task_graph_executor
from googlecloudsdk.command_lib.storage.tasks import task_status
from googlecloudsdk.command_lib.storage.tasks import task_util


def get_single_matching_url(url_string):
  """Gets cloud resource, allowing wildcards that match only one resource."""
  if not wildcard_iterator.contains_wildcard(url_string):
    return storage_url.storage_url_from_string(url_string)

  resource_iterator = wildcard_iterator.get_wildcard_iterator(
      url_string, fields_scope=cloud_api.FieldsScope.SHORT)
  plurality_checkable_resource_iterator = (
      plurality_checkable_iterator.PluralityCheckableIterator(resource_iterator)
  )

  if plurality_checkable_resource_iterator.is_plural():
    raise errors.InvalidUrlError(
        'get-iam-policy must match a single cloud resource.')
  return list(plurality_checkable_resource_iterator)[0].storage_url


def execute_set_iam_task_iterator(iterator, continue_on_error):
  """Executes single or multiple set-IAM tasks with different handling.

  Args:
    iterator (iter[set_iam_policy_task._SetIamPolicyTask]): Contains set IAM
      task(s) to execute.
    continue_on_error (bool): If multiple tasks in iterator, determines whether
      to continue executing after an error.

  Returns:
    int: Status code. For multiple tasks, the task executor will return if
      any of the tasks failed.
    object|None: If executing a single task, the newly set IAM policy. This
      is useful for outputting to the terminal.
  """
  plurality_checkable_task_iterator = (
      plurality_checkable_iterator.PluralityCheckableIterator(iterator))

  if not plurality_checkable_task_iterator.is_plural():
    # Underlying iterator should error if no matches, so this means there is a
    # single match. Output its new policy to match other IAM set commands.
    return 0, task_util.get_first_matching_message_payload(
        next(plurality_checkable_task_iterator).execute().messages,
        task.Topic.SET_IAM_POLICY)

  task_status_queue = task_graph_executor.multiprocessing_context.Queue()
  exit_code = task_executor.execute_tasks(
      plurality_checkable_task_iterator,
      parallelizable=True,
      task_status_queue=task_status_queue,
      progress_manager_args=task_status.ProgressManagerArgs(
          increment_type=task_status.IncrementType.INTEGER, manifest_path=None),
      continue_on_error=continue_on_error)
  return exit_code, None


def add_iam_binding_to_resource(args, url, messages, policy, task_type):
  """Extracts new binding from args and applies to existing policy.

  Args:
    args (argparse Args): Contains flags user ran command with.
    url (CloudUrl): URL of target resource, already validated for type.
    messages (object): Must contain IAM data types needed to create new policy.
    policy (object): Existing IAM policy on target to update.
    task_type (set_iam_policy_task._SetIamPolicyTask): The task instance to use
      to execute the iam binding change.

  Returns:
    object: The updated IAM policy set in the cloud.
  """
  condition = iam_util.ValidateAndExtractCondition(args)

  policy.version = gcs_iam_util.IAM_POLICY_VERSION
  iam_util.AddBindingToIamPolicyWithCondition(
      messages.Policy.BindingsValueListEntry, messages.Expr, policy,
      args.member, args.role, condition)

  task_output = task_type(url, policy).execute()
  return task_util.get_first_matching_message_payload(task_output.messages,
                                                      task.Topic.SET_IAM_POLICY)


def remove_iam_binding_from_resource(args, url, policy, task_type):
  """Extracts binding from args and removes it from existing policy.

  Args:
    args (argparse Args): Contains flags user ran command with.
    url (CloudUrl): URL of target resource, already validated for type.
    policy (object): Existing IAM policy on target to update.
    task_type (set_iam_policy_task._SetIamPolicyTask): The task instance to use
      to execute the iam binding change.

  Returns:
    object: The updated IAM policy set in the cloud.
  """
  condition = iam_util.ValidateAndExtractCondition(args)

  policy.version = gcs_iam_util.IAM_POLICY_VERSION
  iam_util.RemoveBindingFromIamPolicyWithCondition(policy, args.member,
                                                   args.role, condition,
                                                   args.all)

  task_output = task_type(url, policy).execute()
  return task_util.get_first_matching_message_payload(task_output.messages,
                                                      task.Topic.SET_IAM_POLICY)