HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/396/lib/googlecloudsdk/command_lib/iam/iam_util.py
# -*- coding: utf-8 -*- #
# Copyright 2019 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""General IAM utilities used by the Cloud SDK."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import binascii
import re
import textwrap

from apitools.base.protorpclite import messages as apitools_messages
from apitools.base.py import encoding

from googlecloudsdk.api_lib.util import apis as core_apis
from googlecloudsdk.api_lib.util import messages as messages_util
from googlecloudsdk.calliope import arg_parsers
from googlecloudsdk.calliope import exceptions as gcloud_exceptions
from googlecloudsdk.command_lib.iam import completers
from googlecloudsdk.core import exceptions as core_exceptions
from googlecloudsdk.core import log
from googlecloudsdk.core import properties
from googlecloudsdk.core import resources
from googlecloudsdk.core import yaml
from googlecloudsdk.core.console import console_io
from googlecloudsdk.core.util import files
import six


msgs = core_apis.GetMessagesModule('iam', 'v1')
MANAGED_BY = (
    msgs.IamProjectsServiceAccountsKeysListRequest.KeyTypesValueValuesEnum)
CREATE_KEY_TYPES = (
    msgs.CreateServiceAccountKeyRequest.PrivateKeyTypeValueValuesEnum)
KEY_TYPES = (msgs.ServiceAccountKey.PrivateKeyTypeValueValuesEnum)
PUBLIC_KEY_TYPES = (
    msgs.IamProjectsServiceAccountsKeysGetRequest.PublicKeyTypeValueValuesEnum)
STAGE_TYPES = (msgs.Role.StageValueValuesEnum)

SERVICE_ACCOUNTS_COLLECTION = 'iam.projects.serviceAccounts'

SERVICE_ACCOUNT_FORMAT = ('table(displayName:label="DISPLAY NAME", email, '
                          'disabled)')
SERVICE_ACCOUNT_KEY_FORMAT = """
    table(
        name.scope(keys):label=KEY_ID,
        validAfterTime:label=CREATED_AT,
        validBeforeTime:label=EXPIRES_AT,
        disabled:label=DISABLED,
        disable_reason:label=DISABLE_REASON,
        extended_status:label=EXTENDED_STATUS
    )
"""
CONDITION_FORMAT_EXCEPTION = gcloud_exceptions.InvalidArgumentException(
    'condition',
    'condition must be either `None` or a list of key=value pairs. '
    'If not `None`, `expression` and `title` are required keys.\n'
    'Example: --condition=expression=[expression],title=[title],'
    'description=[description]')

CONDITION_FILE_FORMAT_EXCEPTION = gcloud_exceptions.InvalidArgumentException(
    'condition-from-file',
    'condition-from-file must be a path to a YAML or JSON file containing the '
    'condition. `expression` and `title` are required keys. `description` is '
    'optional. To specify a `None` condition, use --condition=None.')

MAX_LIBRARY_IAM_SUPPORTED_VERSION = 3

_ALL_CONDITIONS = {'All': None}
_NEW_CONDITION = object()
_NONE_CONDITION = {'None': None}


def _IsAllConditions(condition):
  return condition == _ALL_CONDITIONS


class IamEtagReadError(core_exceptions.Error):
  """IamEtagReadError is raised when etag is badly formatted."""


class IamPolicyBindingNotFound(core_exceptions.Error):
  """Raised when the specified IAM policy binding is not found."""


class IamPolicyBindingInvalidError(core_exceptions.Error):
  """Raised when the specified IAM policy binding is invalid."""


class IamPolicyBindingIncompleteError(IamPolicyBindingInvalidError):
  """Raised when the specified IAM policy binding is incomplete."""


def AddMemberFlag(parser, verb, hide_special_member_types, required=True):
  """Create --member flag and add to parser."""
  help_str = ("""\
The principal {verb}. Should be of the form `user|group|serviceAccount:email` or
`domain:domain`.

Examples: `user:test-user@gmail.com`, `group:admins@example.com`,
`serviceAccount:test123@example.domain.com`, or
`domain:example.domain.com`.
      """).format(verb=verb)
  # Adding role bindings for a deleted principal is a very uncommon use case.
  if 'remove' in verb:
    help_str += ("""
Deleted principals have an additional `deleted:` prefix and a `?uid=UID` suffix,
where ``UID'' is a unique identifier for the principal. Example:
`deleted:user:test-user@gmail.com?uid=123456789012345678901`.
      """)
  if not hide_special_member_types:
    help_str += ("""
Some resources also accept the following special values:
* `allUsers` - Special identifier that represents anyone who is on the internet,
   with or without a Google account.
* `allAuthenticatedUsers` - Special identifier that represents anyone who is
   authenticated with a Google account or a service account.
      """)
  parser.add_argument(
      '--member',
      metavar='PRINCIPAL',
      required=required,
      help=help_str,
      suggestion_aliases=['--principal'])


def _ConditionArgDict():
  condition_spec = {
      'expression': str,
      'title': str,
      'description': str,
      'None': None
  }
  return arg_parsers.ArgDict(spec=condition_spec, allow_key_only=True)


def _ConditionHelpText(intro):
  """Get the help text for --condition."""

  help_text = ("""\
{intro}

When using the `--condition` flag, include the following key-value pairs:

*expression*::: (Required) Condition expression that evaluates to True or False.
This uses a subset of Common Expression Language syntax.

If the condition expression includes a comma, use a different delimiter to
separate the key-value pairs. Specify the delimiter before listing the
key-value pairs. For example, to specify a colon (`:`) as the delimiter, do the
following: `--condition=^:^title=TITLE:expression=EXPRESSION`. For more
information, see https://cloud.google.com/sdk/gcloud/reference/topic/escaping.

*title*::: (Required) A short string describing the purpose of the expression.

*description*::: (Optional) Additional description for the expression.
      """).format(intro=intro)
  return help_text


def _AddConditionFlagsForAddBindingToIamPolicy(parser):
  """Create flags for condition and add to parser."""
  condition_intro = """\
A condition to include in the binding. When the condition is explicitly
specified as `None` (`--condition=None`), a binding without a condition is
added. When the condition is specified and is not `None`, `--role` cannot be a
basic role. Basic roles are `roles/editor`, `roles/owner`, and `roles/viewer`.
For more on conditions, refer to the conditions overview guide:
https://cloud.google.com/iam/docs/conditions-overview"""
  help_str_condition = _ConditionHelpText(condition_intro)
  help_str_condition_from_file = """
Path to a local JSON or YAML file that defines the condition.
To see available fields, see the help for `--condition`."""
  condition_group = parser.add_mutually_exclusive_group()
  condition_group.add_argument(
      '--condition',
      type=_ConditionArgDict(),
      metavar='KEY=VALUE',
      help=help_str_condition)

  condition_group.add_argument(
      '--condition-from-file',
      type=arg_parsers.FileContents(),
      help=help_str_condition_from_file)


def _AddConditionFlagsForRemoveBindingFromIamPolicy(parser,
                                                    condition_completer=None):
  """Create flags for condition and add to parser."""
  condition_intro = """\
The condition of the binding that you want to remove. When the condition is
explicitly specified as `None` (`--condition=None`), a binding without a
condition is removed. Otherwise, only a binding with a condition that exactly
matches the specified condition (including the optional description) is removed.
For more on conditions, refer to the conditions overview guide:
https://cloud.google.com/iam/docs/conditions-overview"""
  help_str_condition = _ConditionHelpText(condition_intro)
  help_str_condition_from_file = """
Path to a local JSON or YAML file that defines the condition.
To see available fields, see the help for `--condition`."""
  help_str_condition_all = """
Remove all bindings with this role and principal, irrespective of any
conditions."""
  condition_group = parser.add_mutually_exclusive_group()
  condition_group.add_argument(
      '--condition',
      type=_ConditionArgDict(),
      metavar='KEY=VALUE',
      completer=condition_completer,
      help=help_str_condition)

  condition_group.add_argument(
      '--condition-from-file',
      type=arg_parsers.FileContents(),
      help=help_str_condition_from_file)

  condition_group.add_argument(
      '--all', action='store_true', help=help_str_condition_all)


def ValidateConditionArgument(condition, exception):
  if 'None' in condition:
    if ('expression' in condition or 'description' in condition or
        'title' in condition):
      raise exception
  else:
    if not condition.get('expression') or not condition.get('title'):
      raise exception


def ValidateMutexConditionAndPrimitiveRoles(condition, role):
  primitive_roles = ['roles/editor', 'roles/owner', 'roles/viewer']
  if (_ConditionIsSpecified(condition) and not _IsNoneCondition(condition) and
      role in primitive_roles):
    raise IamPolicyBindingInvalidError(
        'Binding with a condition and a basic role is not allowed. '
        'Basic roles are `roles/editor`, `roles/owner`, '
        'and `roles/viewer`.')


def ValidateAndExtractConditionMutexRole(args):
  """Extract IAM condition from arguments and validate conditon/role mutex."""
  condition = ValidateAndExtractCondition(args)
  ValidateMutexConditionAndPrimitiveRoles(condition, args.role)
  return condition


def ValidateAndExtractCondition(args):
  """Extract IAM condition from arguments."""
  condition = None
  if args.IsSpecified('condition'):
    ValidateConditionArgument(args.condition, CONDITION_FORMAT_EXCEPTION)
    condition = args.condition
  if args.IsSpecified('condition_from_file'):
    condition = ParseYamlOrJsonCondition(args.condition_from_file)
  return condition


def AddArgForPolicyFile(parser):
  """Adds the IAM policy file argument to the given parser.

  Args:
    parser: An argparse.ArgumentParser-like object to which we add the argss.

  Raises:
    ArgumentError if one of the arguments is already defined in the parser.
  """
  parser.add_argument(
      'policy_file',
      metavar='POLICY_FILE',
      help="""\
        Path to a local JSON or YAML formatted file containing a valid policy.

        The output of the `get-iam-policy` command is a valid file, as is any
        JSON or YAML file conforming to the structure of a
        [Policy](https://cloud.google.com/iam/reference/rest/v1/Policy).
        """)


def AddArgsForAddIamPolicyBinding(parser,
                                  role_completer=None,
                                  add_condition=False,
                                  hide_special_member_types=False):
  """Adds the IAM policy binding arguments for role and members.

  Args:
    parser: An argparse.ArgumentParser-like object to which we add the argss.
    role_completer: A command_lib.iam.completers.IamRolesCompleter class to
      complete the `--role` flag value.
    add_condition: boolean, If true, add the flags for condition.
    hide_special_member_types: boolean. If true, help text for member does not
      include special values `allUsers` and `allAuthenticatedUsers`.

  Raises:
    ArgumentError if one of the arguments is already defined in the parser.
  """

  help_text = """
    Role name to assign to the principal. The role name is the complete path of
    a predefined role, such as `roles/logging.viewer`, or the role ID for a
    custom role, such as `organizations/{ORGANIZATION_ID}/roles/logging.viewer`.
  """

  parser.add_argument(
      '--role', required=True, completer=role_completer, help=help_text)
  AddMemberFlag(parser, 'to add the binding for', hide_special_member_types)
  if add_condition:
    _AddConditionFlagsForAddBindingToIamPolicy(parser)


# TODO(b/114447521): implement a completer for condition
def AddArgsForRemoveIamPolicyBinding(parser,
                                     role_completer=None,
                                     add_condition=False,
                                     condition_completer=None,
                                     hide_special_member_types=False):
  """Adds the IAM policy binding arguments for role and members.

  Args:
    parser: An argparse.ArgumentParser-like object to which we add the args.
    role_completer: A command_lib.iam.completers.IamRolesCompleter class to
      complete the --role flag value.
    add_condition: boolean, If true, add the flags for condition.
    condition_completer: A completer to complete the condition flag value.
    hide_special_member_types: boolean. If true, help text for member does not
      include special values `allUsers` and `allAuthenticatedUsers`.

  Raises:
    ArgumentError if one of the arguments is already defined in the parser.
  """
  parser.add_argument(
      '--role',
      required=True,
      completer=role_completer,
      help='The role to remove the principal from.')
  AddMemberFlag(parser, 'to remove the binding for', hide_special_member_types)
  if add_condition:
    _AddConditionFlagsForRemoveBindingFromIamPolicy(
        parser, condition_completer=condition_completer)


def AddBindingToIamPolicy(binding_message_type, policy, member, role):
  """Given an IAM policy, add new bindings as specified by args.

  An IAM binding is a pair of role and member. Check if the arguments passed
  define both the role and member attribute, create a binding out of their
  values, and append it to the policy.

  Args:
    binding_message_type: The protorpc.Message of the Binding to create
    policy: IAM policy to which we want to add the bindings.
    member: The member to add to IAM policy.
    role: The role the member should have.

  Returns:
    boolean, whether or not the policy was updated.
  """

  # First check all bindings to see if the member is already in a binding with
  # the same role.
  # A policy can have multiple bindings with the same role. This is why we need
  # to explicitly do this as a separate, first, step and check all bindings.
  for binding in policy.bindings:
    if binding.role == role:
      if member in binding.members:
        return False  # Nothing to do. Member already has the role.

  # Second step: check to see if a binding already exists with the same role and
  # add the member to this binding. This is to not create new bindings with
  # the same role.
  for binding in policy.bindings:
    if binding.role == role:
      binding.members.append(member)
      return True

  # Third step: no binding was found that has the same role. Create a new one.
  policy.bindings.append(
      binding_message_type(members=[member], role='{0}'.format(role)))
  return True


def _IsNoneCondition(condition):
  """When user specify --condition=None."""
  return condition is not None and 'None' in condition


def _ConditionIsSpecified(condition):
  """When --condition is specified."""
  return condition is not None


def AddBindingToIamPolicyWithCondition(binding_message_type,
                                       condition_message_type, policy, member,
                                       role, condition):
  """Given an IAM policy, add a new role/member binding with condition.

  An IAM binding is a pair of role and member with an optional condition.
  Check if the arguments passed define both the role and member attribute,
  create a binding out of their values, and append it to the policy.

  Args:
    binding_message_type: The protorpc.Message of the Binding to create.
    condition_message_type: the protorpc.Message of the Expr.
    policy: IAM policy to which we want to add the bindings.
    member: The member of the binding.
    role: The role the member should have.
    condition: The condition of the role/member binding.

  Raises:
    IamPolicyBindingIncompleteError: when user adds a binding without specifying
      --condition to a policy containing conditions in the non-interactive mode.
  """
  if _PolicyContainsCondition(policy) and not _ConditionIsSpecified(condition):
    if not console_io.CanPrompt():
      message = (
          'Adding a binding without specifying a condition to a '
          'policy containing conditions is prohibited in non-interactive '
          'mode. Run the command again with `--condition=None`')
      raise IamPolicyBindingIncompleteError(message)
    condition = _PromptForConditionAddBindingToIamPolicy(policy)
    ValidateConditionArgument(condition, CONDITION_FORMAT_EXCEPTION)
    ValidateMutexConditionAndPrimitiveRoles(condition, role)
  if (not _PolicyContainsCondition(policy) and
      _ConditionIsSpecified(condition) and not _IsNoneCondition(condition)):
    log.warning('Adding binding with condition to a policy without condition '
                'will change the behavior of add-iam-policy-binding and '
                'remove-iam-policy-binding commands.')
  condition = None if _IsNoneCondition(condition) else condition
  _AddBindingToIamPolicyWithCondition(binding_message_type,
                                      condition_message_type, policy, member,
                                      role, condition)


def _ConditionsInPolicy(policy, member=None, role=None):
  """Select conditions in bindings which have the given role and member.

  Search bindings from policy and return their conditions which has the given
  role and member if role and member are given. If member and role are not
  given, return all conditions. Duplicates are not returned.

  Args:
    policy: IAM policy to collect conditions
    member: member which should appear in the binding to select its condition
    role: role which should be the role of binding to select its condition

  Returns:
    A list of conditions got selected
  """
  conditions = {}
  for binding in policy.bindings:
    if (member is None or member in binding.members) and (role is None or
                                                          role == binding.role):
      condition = binding.condition
      conditions[_ConditionToString(condition)] = condition
  contain_none = False
  if 'None' in conditions:
    contain_none = True
    del conditions['None']
  conditions = [(condition_str, condition)
                for condition_str, condition in conditions.items()]
  conditions = sorted(conditions, key=lambda x: x[0])
  if contain_none:
    conditions.append(('None', _NONE_CONDITION))
  return conditions


def _ConditionToString(condition):
  if condition is None:
    return 'None'
  keys = ['expression', 'title', 'description']
  key_values = []
  for key in keys:
    if getattr(condition, key) is not None:
      key_values.append('{key}={value}'.format(
          key=key.upper(), value=getattr(condition, key)))
  return ', '.join(key_values)


def PromptChoicesForAddBindingToIamPolicy(policy):
  """The choices in a prompt for condition when adding binding to policy.

  All conditions in the policy will be returned. Two more choices (i.e.
  `None` and `Specify a new condition`) are appended.
  Args:
    policy: the IAM policy which the binding is added to.

  Returns:
    a list of conditions appearing in policy plus the choices of `None` and
    `Specify a new condition`.
  """
  conditions = _ConditionsInPolicy(policy)
  if conditions and conditions[-1][0] != 'None':
    conditions.append(('None', _NONE_CONDITION))
  conditions.append(('Specify a new condition', _NEW_CONDITION))
  return conditions


def PromptChoicesForRemoveBindingFromIamPolicy(policy, member, role):
  """The choices in a prompt for condition when removing binding from policy.

  Args:
    policy: the IAM policy which the binding is removed from.
    member: the member of the binding to be removed.
    role: the role of the binding to be removed.

  Returns:
    a list of conditions from the policy whose bindings contain the given member
    and role.
  """
  conditions = _ConditionsInPolicy(policy, member, role)
  if conditions:
    conditions.append(('all conditions', _ALL_CONDITIONS))
  return conditions


def _ToDictCondition(condition):
  if isinstance(condition, dict):
    return condition
  return_condition = {}
  for key in ('expression', 'title', 'description'):
    return_condition[key] = getattr(condition, key)
  return return_condition


def _PromptForConditionAddBindingToIamPolicy(policy):
  """Prompt user for a condition when adding binding."""
  prompt_message = ('The policy contains bindings with conditions, '
                    'so specifying a condition is required when adding a '
                    'binding. Please specify a condition.')
  conditions = PromptChoicesForAddBindingToIamPolicy(policy)
  condition_keys = [c[0] for c in conditions]

  condition_index = console_io.PromptChoice(
      condition_keys, prompt_string=prompt_message)
  if condition_index == len(conditions) - 1:
    return _PromptForNewCondition()
  return _ToDictCondition(conditions[condition_index][1])


def _PromptForConditionRemoveBindingFromIamPolicy(policy, member, role):
  """Prompt user for a condition when removing binding."""
  conditions = PromptChoicesForRemoveBindingFromIamPolicy(policy, member, role)
  if not conditions:
    raise IamPolicyBindingNotFound(
        'Policy binding with the specified principal '
        'and role not found!')
  prompt_message = ('The policy contains bindings with conditions, '
                    'so specifying a condition is required when removing a '
                    'binding. Please specify a condition.')
  condition_keys = [c[0] for c in conditions]

  condition_index = console_io.PromptChoice(
      condition_keys, prompt_string=prompt_message)
  if condition_index == len(conditions) - 1:
    return _ALL_CONDITIONS
  return _ToDictCondition(conditions[condition_index][1])


def _PromptForNewCondition():
  prompt_message = (
      'Condition is either `None` or a list of key=value pairs. '
      'If not `None`, `expression` and `title` are required keys.\n'
      'Example: --condition=expression=[expression],title=[title],'
      'description=[description].\nSpecify the condition')
  condition_string = console_io.PromptWithDefault(prompt_message)
  condition_dict = _ConditionArgDict()(condition_string)
  return condition_dict


def _EqualConditions(binding_condition, input_condition):
  if binding_condition is None and input_condition is None:
    return True
  if binding_condition is None or input_condition is None:
    return False
  return (binding_condition.expression == input_condition.get('expression') and
          binding_condition.title == input_condition.get('title') and
          binding_condition.description == input_condition.get('description'))


def _AddBindingToIamPolicyWithCondition(binding_message_type,
                                        condition_message_type, policy, member,
                                        role, condition):
  """Given an IAM policy, add a new role/member binding with condition."""
  for binding in policy.bindings:
    if binding.role == role and _EqualConditions(
        binding_condition=binding.condition, input_condition=condition):
      if member not in binding.members:
        binding.members.append(member)
      return

  condition_message = None
  if condition is not None:
    condition_message = condition_message_type(
        expression=condition.get('expression'),
        title=condition.get('title'),
        description=condition.get('description'))
  policy.bindings.append(
      binding_message_type(
          members=[member], role='{}'.format(role),
          condition=condition_message))


def RemoveBindingFromIamPolicyWithCondition(policy,
                                            member,
                                            role,
                                            condition,
                                            all_conditions=False):
  """Given an IAM policy, remove bindings as specified by the args.

  An IAM binding is a pair of role and member with an optional condition.
  Check if the arguments passed define both the role and member attribute,
  search the policy for a binding that contains this role, member and condition,
  and remove it from the policy.

  Args:
    policy: IAM policy from which we want to remove bindings.
    member: The member to remove from the IAM policy.
    role: The role of the member should be removed from.
    condition: The condition of the binding to be removed.
    all_conditions: If true, all bindings with the specified member and role
      will be removed, regardless of the condition.

  Raises:
    IamPolicyBindingNotFound: If specified binding is not found.
    IamPolicyBindingIncompleteError: when user removes a binding without
      specifying --condition to a policy containing conditions in the
      non-interactive mode.
  """
  if not all_conditions and _PolicyContainsCondition(
      policy) and not _ConditionIsSpecified(condition):
    if not console_io.CanPrompt():
      message = (
          'Removing a binding without specifying a condition from a '
          'policy containing conditions is prohibited in non-interactive '
          'mode. Run the command again with `--condition=None` to remove a '
          'binding without condition or run command with `--all` to remove all '
          'bindings of the specified principal and role.')
      raise IamPolicyBindingIncompleteError(message)
    condition = _PromptForConditionRemoveBindingFromIamPolicy(
        policy, member, role)

  if all_conditions or _IsAllConditions(condition):
    _RemoveBindingFromIamPolicyAllConditions(policy, member, role)
  else:
    condition = None if _IsNoneCondition(condition) else condition
    _RemoveBindingFromIamPolicyWithCondition(policy, member, role, condition)


def _RemoveBindingFromIamPolicyAllConditions(policy, member, role):
  """Remove all member/role bindings from policy regardless of condition."""
  conditions_removed = False
  for binding in policy.bindings:
    if role == binding.role and member in binding.members:
      binding.members.remove(member)
      conditions_removed = True
  if not conditions_removed:
    raise IamPolicyBindingNotFound(
        'Policy bindings with the specified principal '
        'and role not found!')
  policy.bindings[:] = [b for b in policy.bindings if b.members]


def _RemoveBindingFromIamPolicyWithCondition(policy, member, role, condition):
  """Remove the member/role binding with the condition from policy."""
  for binding in policy.bindings:
    if (role == binding.role and _EqualConditions(
        binding_condition=binding.condition, input_condition=condition) and
        member in binding.members):
      binding.members.remove(member)
      break
  else:
    raise IamPolicyBindingNotFound(
        'Policy binding with the specified principal, '
        'role, and condition not found!')
  policy.bindings[:] = [b for b in policy.bindings if b.members]


def _PolicyContainsCondition(policy):
  """Investigate if policy has bindings with condition.

  Given an IAM policy and return True if the policy contains any binding
  which has a condition. Return False otherwise.

  Args:
    policy: IAM policy.

  Returns:
    True if policy has bindings with conditions, otherwise False.
  """
  for binding in policy.bindings:
    if binding.condition:
      return True
  return False


def BindingInPolicy(policy, member, role):
  """Returns True if policy contains the specified binding."""
  for binding in policy.bindings:
    if binding.role == role and member in binding.members:
      return True
  return False


def RemoveBindingFromIamPolicy(policy, member, role):
  """Given an IAM policy, remove bindings as specified by the args.

  An IAM binding is a pair of role and member. Check if the arguments passed
  define both the role and member attribute, search the policy for a binding
  that contains this role and member, and remove it from the policy.

  Args:
    policy: IAM policy from which we want to remove bindings.
    member: The member to remove from the IAM policy.
    role: The role the member should be removed from.

  Raises:
    IamPolicyBindingNotFound: If specified binding is not found.
  """

  # First, remove the member from any binding that has the given role.
  # A server policy can have duplicates.
  for binding in policy.bindings:
    if binding.role == role and member in binding.members:
      binding.members.remove(member)
      break
  else:
    message = 'Policy binding with the specified principal and role not found!'
    raise IamPolicyBindingNotFound(message)

  # Second, remove any empty bindings.
  policy.bindings[:] = [b for b in policy.bindings if b.members]


def ConstructUpdateMaskFromPolicy(policy_file_path):
  """Construct a FieldMask based on input policy.

  Args:
    policy_file_path: Path to the JSON or YAML IAM policy file.

  Returns:
    a FieldMask containing policy fields to be modified, based on which fields
    are present in the input file.
  """
  policy_file = files.ReadFileContents(policy_file_path)
  # Since json is a subset of yaml, parse file as yaml.
  policy = yaml.load(policy_file)

  # The IAM update mask should only contain top level fields. Sort the fields
  # for testing purposes.
  return ','.join(sorted(policy.keys()))


def ParsePolicyFile(policy_file_path, policy_message_type):
  """Construct an IAM Policy protorpc.Message from a JSON/YAML formatted file.

  Args:
    policy_file_path: Path to the JSON or YAML IAM policy file.
    policy_message_type: Policy message type to convert JSON or YAML to.

  Returns:
    a protorpc.Message of type policy_message_type filled in from the JSON or
    YAML policy file.
  Raises:
    BadFileException if the JSON or YAML file is malformed.
  """
  policy, unused_mask = ParseYamlOrJsonPolicyFile(policy_file_path,
                                                  policy_message_type)

  if not policy.etag:
    msg = ('The specified policy does not contain an "etag" field '
           'identifying a specific version to replace. Changing a '
           'policy without an "etag" can overwrite concurrent policy '
           'changes.')
    console_io.PromptContinue(
        message=msg, prompt_string='Replace existing policy', cancel_on_no=True)
  return policy


def ParsePolicyFileWithUpdateMask(policy_file_path, policy_message_type):
  """Construct an IAM Policy protorpc.Message from a JSON/YAML formatted file.

  Also contructs a FieldMask based on input policy.
  Args:
    policy_file_path: Path to the JSON or YAML IAM policy file.
    policy_message_type: Policy message type to convert JSON or YAML to.

  Returns:
    a tuple of (policy, updateMask) where policy is a protorpc.Message of type
    policy_message_type filled in from the JSON or YAML policy file and
    updateMask is a FieldMask containing policy fields to be modified, based on
    which fields are present in the input file.
  Raises:
    BadFileException if the JSON or YAML file is malformed.
    IamEtagReadError if the etag is badly formatted.
  """
  policy, update_mask = ParseYamlOrJsonPolicyFile(policy_file_path,
                                                  policy_message_type)

  if not policy.etag:
    msg = ('The specified policy does not contain an "etag" field '
           'identifying a specific version to replace. Changing a '
           'policy without an "etag" can overwrite concurrent policy '
           'changes.')
    console_io.PromptContinue(
        message=msg, prompt_string='Replace existing policy', cancel_on_no=True)
  return (policy, update_mask)


def ParseYamlOrJsonPolicyFile(policy_file_path, policy_message_type):
  """Create an IAM Policy protorpc.Message from a YAML or JSON formatted file.

  Returns the parsed policy object and FieldMask derived from input dict.
  Args:
    policy_file_path: Path to the YAML or JSON IAM policy file.
    policy_message_type: Policy message type to convert YAML to.

  Returns:
    a tuple of (policy, updateMask) where policy is a protorpc.Message of type
    policy_message_type filled in from the JSON or YAML policy file and
    updateMask is a FieldMask containing policy fields to be modified, based on
    which fields are present in the input file.
  Raises:
    BadFileException if the YAML or JSON file is malformed.
    IamEtagReadError if the etag is badly formatted.
  """
  policy_to_parse = yaml.load_path(policy_file_path)
  try:
    policy = encoding.PyValueToMessage(policy_message_type, policy_to_parse)
    update_mask = ','.join(sorted(policy_to_parse.keys()))
  except (AttributeError) as e:
    # Raised when the input file is not properly formatted YAML policy file.
    raise gcloud_exceptions.BadFileException(
        'Policy file [{0}] is not a properly formatted YAML or JSON '
        'policy file. {1}'.format(policy_file_path, six.text_type(e)))
  except (apitools_messages.DecodeError, binascii.Error) as e:
    # DecodeError is raised when etag is badly formatted (not proper Base64)
    raise IamEtagReadError(
        'The etag of policy file [{0}] is not properly formatted. {1}'.format(
            policy_file_path, six.text_type(e)))
  return (policy, update_mask)


def ParseYamlOrJsonCondition(
    condition_file_content,
    file_format_exception=CONDITION_FILE_FORMAT_EXCEPTION):
  """Create a condition of IAM policy binding from content of YAML or JSON file.

  Args:
    condition_file_content: string, the content of a YAML or JSON file
      containing a condition.
    file_format_exception: InvalidArgumentException, the exception to throw when
      condition file is incorrectly formatted.

  Returns:
    a dictionary representation of the condition.
  """

  condition = yaml.load(condition_file_content)
  ValidateConditionArgument(condition, file_format_exception)
  return condition


def ParseYamlToRole(file_path, role_message_type):
  """Construct an IAM Role protorpc.Message from a Yaml formatted file.

  Args:
    file_path: Path to the Yaml IAM Role file.
    role_message_type: Role message type to convert Yaml to.

  Returns:
    a protorpc.Message of type role_message_type filled in from the Yaml
    role file.
  Raises:
    BadFileException if the Yaml file is malformed or does not exist.
  """
  role_to_parse = yaml.load_path(file_path)
  if 'stage' in role_to_parse:
    role_to_parse['stage'] = role_to_parse['stage'].upper()
  try:
    role = encoding.PyValueToMessage(role_message_type, role_to_parse)
  except (AttributeError) as e:
    # Raised when the YAML file is not properly formatted YAML role file.
    raise gcloud_exceptions.BadFileException(
        'Role file {0} is not a properly formatted YAML role file. {1}'.format(
            file_path, six.text_type(e)))
  except (apitools_messages.DecodeError, binascii.Error) as e:
    # DecodeError is raised when etag is badly formatted (not proper Base64)
    raise IamEtagReadError(
        'The etag of role file {0} is not properly formatted. {1}'.format(
            file_path, six.text_type(e)))
  return role


def ParseYamlToTrustStore(yaml_dict):
  """Construct a TrustStore protorpc.Message from the content of a Yaml file.

  Args:
    yaml_dict: YAML file content to parse.

  Returns:
    a TrustStore from the parsed YAML file.
  Raises:
    DecodeError if the Yaml file content could not be parsed.
  """
  config = messages_util.DictToMessageWithErrorCheck(yaml_dict, msgs.X509)
  return config.trustStore


def ParseYamlOrJsonToInlineCertificateIssuanceConfig(yaml_dict):
  """Construct a InlineCertificateIssuanceConfig protorpc.Message from the content of a Yaml file.

  Args:
    yaml_dict: YAML file content to parse.

  Returns:
    a InlineCertificateIssuanceConfig from the parsed YAML file.
  Raises:
    DecodeError if the Yaml file content could not be parsed.
  """
  config = messages_util.DictToMessageWithErrorCheck(
      yaml_dict, msgs.WorkloadIdentityPool
  )
  return config.inlineCertificateIssuanceConfig


def ParseYamlOrJsonToInlineTrustConfig(yaml_dict):
  """Construct a InlineTrustConfig protorpc.Message from the content of a Yaml file.

  Args:
    yaml_dict: YAML file content to parse.

  Returns:
    a InlineTrustConfig from the parsed YAML file.
  Raises:
    DecodeError if the Yaml file content could not be parsed.
  """
  config = messages_util.DictToMessageWithErrorCheck(
      yaml_dict, msgs.WorkloadIdentityPool
  )
  return config.inlineTrustConfig


def GetDetailedHelpForSetIamPolicy(
    collection,
    example_id='',
    example_see_more='',
    additional_flags='',
    use_an=False,
):
  """Returns a detailed_help for a set-iam-policy command.

  Args:
    collection: Name of the command collection (ex: "project", "dataset")
    example_id: Collection identifier to display in a sample command (ex:
      "my-project", '1234')
    example_see_more: Optional "See ... for details" message. If not specified,
      includes a default reference to IAM managing-policies documentation
    additional_flags: str, additional flags to include in the example command
      (after the command name and before the ID of the resource).
     use_an: If True, uses "an" instead of "a" for the article preceding uses of
       the collection.

  Returns:
    a dict with boilerplate help text for the set-iam-policy command
  """
  if not example_id:
    example_id = 'example-' + collection

  if not example_see_more:
    example_see_more = """
          See https://cloud.google.com/iam/docs/managing-policies for details
          of the policy file format and contents."""

  additional_flags = additional_flags + ' ' if additional_flags else ''
  a = 'an' if use_an else 'a'
  return {
      'brief':
          'Set IAM policy for {0} {1}.'.format(a, collection),
      'DESCRIPTION':
          '{description}',
      'EXAMPLES':
          textwrap.dedent("""\
          The following command will read an IAM policy from 'policy.json' and
          set it for {a} {collection} with '{id}' as the identifier:

            $ {{command}} {flags}{id} policy.json

          {see_more}""".format(
              collection=collection,
              id=example_id,
              see_more=example_see_more,
              flags=additional_flags,
              a=a))
  }


def GetDetailedHelpForAddIamPolicyBinding(collection,
                                          example_id,
                                          role='roles/editor',
                                          use_an=False,
                                          condition=False):
  """Returns a detailed_help for an add-iam-policy-binding command.

  Args:
    collection: Name of the command collection (ex: "project", "dataset")
    example_id: Collection identifier to display in a sample command
        (ex: "my-project", '1234')
    role: The sample role to use in the documentation. The default of
      'roles/editor' is usually sufficient, but if your command group's users
      would more likely use a different role, you can override it here.
    use_an: If True, uses "an" instead of "a" for the article preceding uses of
      the collection.
    condition: If True, add help text for condition.

  Returns:
    a dict with boilerplate help text for the add-iam-policy-binding command
  """
  a = 'an' if use_an else 'a'
  note = ('See https://cloud.google.com/iam/docs/managing-policies for details '
          'of policy role and principal types.')
  detailed_help = {
      'brief':
          'Add IAM policy binding for {0} {1}.'.format(a, collection),
      'DESCRIPTION':
          '{description}',
      'EXAMPLES':
          """To add an IAM policy binding for the role of `{role}` for the user
`test-user@gmail.com` on {a} {collection} with identifier
`{example_id}`, run:

  $ {{command}} {example_id} --member='user:test-user@gmail.com' --role='{role}'

To add an IAM policy binding for the role of `{role}` to the service
account `test-proj1@example.domain.com`, run:

  $ {{command}} {example_id} --member='serviceAccount:test-proj1@example.domain.com' --role='{role}'

To add an IAM policy binding for the role of `{role}` for all
authenticated users on {a} {collection} with identifier
`{example_id}`, run:

  $ {{command}} {example_id} --member='allAuthenticatedUsers' --role='{role}'
  """.format(collection=collection, example_id=example_id, role=role, a=a)
  }
  if condition:
    detailed_help['EXAMPLES'] = detailed_help['EXAMPLES'] + """\n
To add an IAM policy binding that expires at the end of the year 2018 for the
role of `{role}` and the user `test-user@gmail.com` on {a} {collection} with
identifier `{example_id}`, run:

  $ {{command}} {example_id} --member='user:test-user@gmail.com' --role='{role}' --condition='expression=request.time < timestamp("2019-01-01T00:00:00Z"),title=expires_end_of_2018,description=Expires at midnight on 2018-12-31'
  """.format(
      collection=collection, example_id=example_id, role=role, a=a)
  detailed_help['EXAMPLES'] = '\n'.join([detailed_help['EXAMPLES'], note])
  return detailed_help


def GetDetailedHelpForRemoveIamPolicyBinding(collection,
                                             example_id,
                                             role='roles/editor',
                                             use_an=False,
                                             condition=False):
  """Returns a detailed_help for a remove-iam-policy-binding command.

  Args:
    collection: Name of the command collection (ex: "project", "dataset")
    example_id: Collection identifier to display in a sample command
        (ex: "my-project", '1234')
    role: The sample role to use in the documentation. The default of
      'roles/editor' is usually sufficient, but if your command group's users
      would more likely use a different role, you can override it here.
    use_an: If True, uses "an" instead of "a" for the article preceding uses of
      the collection.
    condition: If True, add help text for condition.

  Returns:
    a dict with boilerplate help text for the remove-iam-policy-binding command
  """
  a = 'an' if use_an else 'a'
  note = (
      'See https://cloud.google.com/iam/docs/managing-policies for details'
      ' of policy role and member types.'
  )
  detailed_help = {
      'brief':
          'Remove IAM policy binding for {0} {1}.'.format(a, collection),
      'DESCRIPTION':
          '{description}',
      'EXAMPLES':
          """\
To remove an IAM policy binding for the role of `{role}` for the
user `test-user@gmail.com` on {collection} with identifier
`{example_id}`, run:

  $ {{command}} {example_id} --member='user:test-user@gmail.com' --role='{role}'

To remove an IAM policy binding for the role of `{role}` from all
authenticated users on {collection} `{example_id}`, run:

  $ {{command}} {example_id} --member='allAuthenticatedUsers' --role='{role}'
  """.format(collection=collection, example_id=example_id, role=role)
  }
  if condition:
    detailed_help['EXAMPLES'] = detailed_help['EXAMPLES'] + """\n
To remove an IAM policy binding with a condition of
`expression='request.time < timestamp("2019-01-01T00:00:00Z"),
title='expires_end_of_2018'`, and
description=`Expires at midnight on 2018-12-31` for the role of `{role}` for
the user `test-user@gmail.com` on {collection}
with identifier `{example_id}`, run:

  $ {{command}} {example_id} --member='user:test-user@gmail.com' --role='{role}' --condition='expression=request.time < timestamp("2019-01-01T00:00:00Z"),title=expires_end_of_2018,description=Expires at midnight on 2018-12-31'

To remove all IAM policy bindings regardless of the condition for the role of
`{role}` and for the user `test-user@gmail.com` on {collection} with
identifier `{example_id}`, run:

  $ {{command}} {example_id} --member='user:test-user@gmail.com' --role='{role}' --all
  """.format(
      collection=collection, example_id=example_id, role='roles/browser')
  detailed_help['EXAMPLES'] = '\n'.join([detailed_help['EXAMPLES'], note])
  return detailed_help


def GetHintForServiceAccountResource(action='act on'):
  """Returns a hint message for commands treating service account as a resource.

  Args:
    action: the action to take on the service account resource (with necessary
      prepositions), such as 'add iam policy bindings to'.
  """

  return ('When managing IAM roles, you can treat a service account either as '
          'a resource or as an identity. This command is to {action} a '
          'service account resource. There are other gcloud commands to '
          'manage IAM policies for other types of resources. For example, to '
          'manage IAM policies on a project, use the `$ gcloud projects` '
          'commands.'.format(action=action))


def ManagedByFromString(managed_by):
  """Parses a string into a MANAGED_BY enum.

  MANAGED_BY is an enum of who manages a service account key resource. IAM
  will rotate any SYSTEM_MANAGED keys by default.

  Args:
    managed_by: A string representation of a MANAGED_BY. Can be one of *user*,
      *system* or *any*.

  Returns:
    A KeyTypeValueValuesEnum (MANAGED_BY) value.
  """
  if managed_by == 'user':
    return [MANAGED_BY.USER_MANAGED]
  elif managed_by == 'system':
    return [MANAGED_BY.SYSTEM_MANAGED]
  elif managed_by == 'any':
    return []
  else:
    return [MANAGED_BY.KEY_TYPE_UNSPECIFIED]


def KeyTypeFromString(key_str):
  """Parses a string into a KeyType enum.

  Args:
    key_str: A string representation of a KeyType. Can be either *p12* or
      *json*.

  Returns:
    A PrivateKeyTypeValueValuesEnum value.
  """
  if key_str == 'p12':
    return KEY_TYPES.TYPE_PKCS12_FILE
  elif key_str == 'json':
    return KEY_TYPES.TYPE_GOOGLE_CREDENTIALS_FILE
  else:
    return KEY_TYPES.TYPE_UNSPECIFIED


def KeyTypeToString(key_type):
  """Get a string version of a KeyType enum.

  Args:
    key_type: An enum of either KEY_TYPES or CREATE_KEY_TYPES.

  Returns:
    The string representation of the key_type, such that
    parseKeyType(keyTypeToString(x)) is a no-op.
  """
  if (key_type == KEY_TYPES.TYPE_PKCS12_FILE or
      key_type == CREATE_KEY_TYPES.TYPE_PKCS12_FILE):
    return 'p12'
  elif (key_type == KEY_TYPES.TYPE_GOOGLE_CREDENTIALS_FILE or
        key_type == CREATE_KEY_TYPES.TYPE_GOOGLE_CREDENTIALS_FILE):
    return 'json'
  else:
    return 'unspecified'


def KeyTypeToCreateKeyType(key_type):
  """Transforms between instances of KeyType enums.

  Transforms KeyTypes into CreateKeyTypes.

  Args:
    key_type: A ServiceAccountKey.PrivateKeyTypeValueValuesEnum value.

  Returns:
    A IamProjectsServiceAccountKeysCreateRequest.PrivateKeyTypeValueValuesEnum
    value.
  """
  # For some stupid reason, HTTP requests generates different enum types for
  # each instance of an enum in the proto buffer. What's worse is that they're
  # not equal to one another.
  if key_type == KEY_TYPES.TYPE_PKCS12_FILE:
    return CREATE_KEY_TYPES.TYPE_PKCS12_FILE
  elif key_type == KEY_TYPES.TYPE_GOOGLE_CREDENTIALS_FILE:
    return CREATE_KEY_TYPES.TYPE_GOOGLE_CREDENTIALS_FILE
  else:
    return CREATE_KEY_TYPES.TYPE_UNSPECIFIED


def KeyTypeFromCreateKeyType(key_type):
  """The inverse of *toCreateKeyType*."""
  if key_type == CREATE_KEY_TYPES.TYPE_PKCS12_FILE:
    return KEY_TYPES.TYPE_PKCS12_FILE
  elif key_type == CREATE_KEY_TYPES.TYPE_GOOGLE_CREDENTIALS_FILE:
    return KEY_TYPES.TYPE_GOOGLE_CREDENTIALS_FILE
  else:
    return KEY_TYPES.TYPE_UNSPECIFIED


def ProjectToProjectResourceName(project):
  """Turns a project id into a project resource name."""
  return 'projects/{0}'.format(project)


def EmailToAccountResourceName(email):
  """Turns an email into a service account resource name."""
  return 'projects/-/serviceAccounts/{0}'.format(email)


def EmailAndKeyToResourceName(email, key):
  """Turns an email and key id into a key resource name."""
  return 'projects/-/serviceAccounts/{0}/keys/{1}'.format(email, key)


def EmailAndIdentityBindingToResourceName(email, identity_binding):
  """Turns an email and identity binding id into a key resource name."""
  return 'projects/-/serviceAccounts/{0}/identityBindings/{1}'.format(
      email, identity_binding)


def GetKeyIdFromResourceName(name):
  """Gets the key id from a resource name. No validation is done."""
  return name.split('/')[5]


def PublicKeyTypeFromString(key_str):
  """Parses a string into a PublicKeyType enum.

  Args:
    key_str: A string representation of a PublicKeyType. Can be either *pem* or
      *raw*.

  Returns:
    A PublicKeyTypeValueValuesEnum value.
  """
  if key_str == 'pem':
    return PUBLIC_KEY_TYPES.TYPE_X509_PEM_FILE
  return PUBLIC_KEY_TYPES.TYPE_RAW_PUBLIC_KEY


def StageTypeFromString(stage_str):
  """Parses a string into a stage enum.

  Args:
    stage_str: A string representation of a StageType. Can be *alpha* or *beta*
      or *ga* or *deprecated* or *disabled*.

  Returns:
    A StageValueValuesEnum value.
  """
  lower_stage_str = stage_str.lower()
  stage_dict = {
      'alpha': STAGE_TYPES.ALPHA,
      'beta': STAGE_TYPES.BETA,
      'ga': STAGE_TYPES.GA,
      'deprecated': STAGE_TYPES.DEPRECATED,
      'disabled': STAGE_TYPES.DISABLED
  }
  if lower_stage_str not in stage_dict:
    raise gcloud_exceptions.InvalidArgumentException(
        'stage',
        'The stage should be one of ' + ','.join(sorted(stage_dict)) + '.')
  return stage_dict[lower_stage_str]


def VerifyParent(organization, project, attribute='custom roles'):
  """Verify the parent name."""
  if organization is None and project is None:
    raise gcloud_exceptions.RequiredArgumentException(
        '--organization or --project',
        'Should specify the project or organization name for {0}.'.format(
            attribute))
  if organization and project:
    raise gcloud_exceptions.ConflictingArgumentsException(
        'organization', 'project')


def GetRoleName(organization,
                project,
                role,
                attribute='custom roles',
                parameter_name='ROLE_ID'):
  """Gets the Role name from organization Id and role Id."""
  if role.startswith('roles/'):
    if project or organization:
      raise gcloud_exceptions.InvalidArgumentException(
          parameter_name,
          'The role id that starts with \'roles/\' only stands for predefined '
          'role. Should not specify the project or organization for predefined '
          'roles')
    return role

  if role.startswith('projects/') or role.startswith('organizations/'):
    raise gcloud_exceptions.InvalidArgumentException(
        parameter_name, 'The role id should not include any \'projects/\' or '
        '\'organizations/\' prefix.')
  if '/' in role:
    raise gcloud_exceptions.InvalidArgumentException(
        parameter_name, 'The role id should not include any \'/\' character.')
  VerifyParent(organization, project, attribute)
  if organization:
    return 'organizations/{0}/roles/{1}'.format(organization, role)
  return 'projects/{0}/roles/{1}'.format(project, role)


def GetParentName(organization, project, attribute='custom roles'):
  """Gets the Role parent name from organization name or project name."""
  VerifyParent(organization, project, attribute)
  if organization:
    return 'organizations/{0}'.format(organization)
  return 'projects/{0}'.format(project)


def GetFullResourceName(resource_ref):
  """Convert a full resource URL to a full resource name (FRN).

  See https://cloud.google.com/iam/docs/full-resource-names.

  Args:
    resource_ref: googlecloudsdk.core.resources.Resource.

  Returns:
    str: Full resource name of the resource
  """
  full_name = resource_ref.SelfLink()
  full_name = re.sub(r'\w+://', '//', full_name)  # no protocol at the start
  full_name = re.sub(r'/v[0-9]+[0-9a-zA-Z]*/', '/', full_name)  # no version

  universe_domain_property = properties.VALUES.core.universe_domain
  universe_domain = universe_domain_property.Get()
  if universe_domain_property.default != universe_domain:
    # FRNs use the same format in all universes.
    full_name = full_name.replace(universe_domain,
                                  universe_domain_property.default, 1)

  if full_name.startswith('//www.'):
    # Convert '//www.googleapis.com/compute/' to '//compute.googleapis.com/'
    splitted_list = full_name.split('/')
    service = full_name.split('/')[3]
    splitted_list.pop(3)
    full_name = '/'.join(splitted_list)
    full_name = full_name.replace('//www.', '//{0}.'.format(service))
  return full_name


def ServiceAccountsUriFunc(resource):
  """Transforms a service account resource into a URL string.

  Args:
    resource: The ServiceAccount object

  Returns:
    URL to the service account
  """

  ref = resources.REGISTRY.Parse(
      resource.uniqueId, {'projectsId': resource.projectId},
      collection=SERVICE_ACCOUNTS_COLLECTION)
  return ref.SelfLink()


def AddServiceAccountNameArg(parser, action='to act on'):
  """Adds the IAM service account name argument that supports tab completion.

  Args:
    parser: An argparse.ArgumentParser-like object to which we add the args.
    action: Action to display in the help message. Should be something like 'to
      act on' or a relative phrase like 'whose policy to get'.

  Raises:
    ArgumentError if one of the arguments is already defined in the parser.
  """

  parser.add_argument(
      'service_account',
      metavar='SERVICE_ACCOUNT',
      type=GetIamAccountFormatValidator(),
      completer=completers.IamServiceAccountCompleter,
      help=('The service account {}. The account should be '
            'formatted either as a numeric service account ID '
            'or as an email, like this: '
            '123456789876543212345 or '
            'my-iam-account@somedomain.com.'.format(action)))


def AddServiceAccountRecommendArg(parser, action):
  """Adds optional recommend argument to the parser.

  Args:
    parser: An argparse.ArgumentParser-like object to which we add the args.
    action: Action to display in the help message. Should be something like
      'deletion' or a noun that describes the action being performed.

  Raises:
    ArgumentError if the argument is already defined in the parser.
  """
  parser.add_argument(
      '--recommend',
      metavar='BOOLEAN_VALUE',
      type=arg_parsers.ArgBoolean(),
      default=False,
      required=False,
      help=(
          'If true, checks Active Assist recommendation for the risk level of '
          'service account {}, and issues a warning in the prompt. Optional '
          'flag is set to false by default. For details see '
          'https://cloud.google.com/recommender/'
          'docs/change-risk-recommendations'
      ).format(action),
  )


def LogSetIamPolicy(name, kind):
  log.status.Print('Updated IAM policy for {} [{}].'.format(kind, name))


def GetIamAccountFormatValidator():
  """Checks that provided iam account identifier is valid."""
  return arg_parsers.RegexpValidator(
      # Overly broad on purpose but catches most common issues.
      r'^(.+@.+\..+|[0-9]+)$',
      'Not a valid service account identifier. It should be either a '
      'numeric string representing the unique_id or an email of the form: '
      'my-iam-account@somedomain.com or '
      'my-iam-account@PROJECT_ID.iam.gserviceaccount.com')


def GetIamOutputFileValidator():
  """Checks if the output file is writable."""

  def IsWritable(value):
    # If output is stdout ('-') then it is writable.
    if value == '-':
      return value
    try:
      with files.FileWriter(value, private=True) as f:
        f.close()
        return value
    except files.Error as e:
      raise gcloud_exceptions.BadFileException(e)

  return IsWritable


def SetRoleStageIfAlpha(role):
  """Set the role stage to Alpha if None.

  Args:
    role: A protorpc.Message of type Role.
  """
  if role.stage is None:
    role.stage = StageTypeFromString('alpha')


def GetResourceReference(project, organization):
  """Get the resource reference of a project or organization.

  Args:
    project: A project name string.
    organization: An organization id string.

  Returns:
    The resource reference of the given project or organization.
  """
  if project:
    return resources.REGISTRY.Parse(
        project, collection='cloudresourcemanager.projects')
  else:
    return resources.REGISTRY.Parse(
        organization, collection='cloudresourcemanager.organizations')


def TestingPermissionsWarning(permissions):
  """Prompt a warning for TESTING permissions with a 'y/n' question.

  Args:
    permissions: A list of permissions that need to be warned.
  """
  if permissions:
    msg = ('Note: permissions [' + ', '.join(permissions) +
           '] are in \'TESTING\' stage which means '
           'the functionality is not mature and they can go away in the '
           'future. This can break your workflows, so do not use them in '
           'production systems!')
    console_io.PromptContinue(
        message=msg,
        prompt_string='Are you sure you want to make this change?',
        cancel_on_no=True)


def ApiDisabledPermissionsWarning(permissions):
  """Prompt a warning for API diabled permissions.

  Args:
    permissions: A list of permissions that need to be warned.
  """
  if permissions:
    msg = (
        'API is not enabled for permissions: [' + ', '.join(permissions) +
        ']. Please enable the corresponding APIs to use those permissions.\n')
    log.warning(msg)