HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/396/lib/googlecloudsdk/command_lib/functions/service_account_util.py
# -*- coding: utf-8 -*- #
# Copyright 2024 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Service account utils."""
import re
from apitools.base.py import exceptions as apitools_exceptions
from googlecloudsdk.api_lib.cloudbuild import cloudbuild_util
from googlecloudsdk.api_lib.cloudresourcemanager import projects_api
from googlecloudsdk.command_lib.functions import run_util
from googlecloudsdk.command_lib.projects import util as project_util
from googlecloudsdk.core import log
from googlecloudsdk.core.console import console_io

_BUILDER_ROLE = 'roles/cloudbuild.builds.builder'
_EDITOR_ROLE = 'roles/editor'
_RUN_INVOKER_ROLE = 'roles/run.invoker'
_PREDEFINE_ROLES_WITH_ROUTE_INVOKER_PERMISSION = [
    'roles/run.admin',
    'roles/run.developer',
    _RUN_INVOKER_ROLE,
    'roles/run.servicesInvoker',
    'roles/run.sourceDeveloper',
]

_ROUTE_INVOKER_PERMISSION = 'run.routes.invoke'

_GCE_SA = '{project_number}-compute@developer.gserviceaccount.com'


def GetDefaultBuildServiceAccount(project_id, region='global'):
  """Gets the default build service account for a project."""
  client = cloudbuild_util.GetClientInstance()
  name = f'projects/{project_id}/locations/{region}/defaultServiceAccount'
  return client.projects_locations.GetDefaultServiceAccount(
      client.MESSAGES_MODULE.CloudbuildProjectsLocationsGetDefaultServiceAccountRequest(
          name=name
      )
  ).serviceAccountEmail


def _ExtractServiceAccountEmail(service_account):
  """Extracts the service account email from the service account resource."""
  match = re.search(r'/serviceAccounts/([^/]+)$', service_account)
  if match:
    return match.group(1)
  else:
    return None


def ValidateDefaultBuildServiceAccountAndPromptWarning(
    project_id, region, build_service_account=None
):
  """Util to validate the default build service account permission.

  Prompt a warning to users if cloudbuild.builds.builder is missing.

  Args:
    project_id: id of project
    region: region to deploy the function
    build_service_account: user provided build service account. It will be None,
      if user doesn't provide it.
  """
  if build_service_account is None:
    build_service_account = _ExtractServiceAccountEmail(
        GetDefaultBuildServiceAccount(project_id, region)
    )
  project_number = project_util.GetProjectNumber(project_id)
  if build_service_account == _GCE_SA.format(project_number=project_number):
    try:
      iam_policy = projects_api.GetIamPolicy(
          project_util.ParseProject(project_id)
      )
    except apitools_exceptions.HttpForbiddenError:
      log.warning(
          (
              'Your account does not have permission to check or bind IAM'
              ' policies to project [%s]. If the deployment fails, ensure [%s]'
              ' has the role [%s] before retrying.'
          ),
          project_id,
          build_service_account,
          _BUILDER_ROLE,
      )
      return

    account_string = f'serviceAccount:{build_service_account}'
    contained_roles = [
        binding.role
        for binding in iam_policy.bindings
        if account_string in binding.members
    ]
    if (
        _BUILDER_ROLE not in contained_roles
        and _EDITOR_ROLE not in contained_roles
        and console_io.CanPrompt()
    ):
      # Previously default compute engine service account was granted editor
      # role when it was provisioned, which naturally granted all of the
      # permission required to finish a build. Nowadays, editor role is not
      # granted by default anymore. We want to suggest users having
      # roles/cloudbuild.builds.builder instead to make sure build can be
      # completed successfully.
      console_io.PromptContinue(
          default=False,
          cancel_on_no=True,
          prompt_string=(
              f'\nThe default build service account [{build_service_account}]'
              f' is missing the [{_BUILDER_ROLE}] role. This may cause issues'
              ' when deploying a function. You could fix it by running the'
              ' command: \ngcloud projects add-iam-policy-binding'
              f' {project_id} \\\n'
              f' --member=serviceAccount:{project_number}-compute@developer.gserviceaccount.com'
              ' \\\n --role=roles/cloudbuild.builds.builder \nFor more'
              ' information, please refer to:'
              ' https://cloud.google.com/functions/docs/troubleshooting#build-service-account.\n'
              ' Would you like to continue?'
          ),
      )


def ValidateAndBindTriggerServiceAccount(
    function,
    project_id,
    trigger_service_account,
    is_gen2=False,
):
  """Validates trigger service account has route.invoker permission on project.

  If missing, prompt user to add the run invoker role on the function's Cloud
  Run service.

  Args:
    function: the function to add the binding to
    project_id: the project id to validate
    trigger_service_account: the trigger service account to validate
    is_gen2: whether the function is a gen2 function
  """
  project_number = project_util.GetProjectNumber(project_id)
  trigger_service_account = (
      trigger_service_account
      if trigger_service_account
      else _GCE_SA.format(project_number=project_number)
  )
  try:
    iam_policy = projects_api.GetIamPolicy(
        project_util.ParseProject(project_id)
    )
    if _ShouldBindInvokerRole(iam_policy, trigger_service_account):
      run_util.AddOrRemoveInvokerBinding(
          function,
          f'serviceAccount:{trigger_service_account}',
          add_binding=True,
          is_gen2=is_gen2,
      )
      log.status.Print('Role successfully bound.\n')
  except apitools_exceptions.HttpForbiddenError:
    log.warning(
        'Your account does not have permission to check or bind IAM'
        ' policies to project [%s]. If your function encounters'
        ' authentication errors, ensure [%s] has the role [%s].',
        project_id,
        trigger_service_account,
        _RUN_INVOKER_ROLE,
    )


def _ShouldBindInvokerRole(iam_policy, service_account):
  """Prompts user to bind the invoker role if missing."""
  custom_role_detected = False
  account_string = f'serviceAccount:{service_account}'
  for binding in iam_policy.bindings:
    if account_string not in binding.members:
      continue
    if binding.role in _PREDEFINE_ROLES_WITH_ROUTE_INVOKER_PERMISSION:
      return False
    elif not binding.role.startswith('roles/'):
      # A custom role starts with "projects/" or "organizations/" while a
      # predefined role starts with "roles/".
      custom_role_detected = True

  prompt_string = (
      f'Your trigger service account [{service_account}] is missing'
      f' the [{_RUN_INVOKER_ROLE}] role. This will cause authentication'
      ' errors when running the function.\n'
  )
  if custom_role_detected:
    prompt_string = (
        f'Your trigger service account [{service_account}] likely'
        f' lacks the [{_ROUTE_INVOKER_PERMISSION}] permission, which will'
        ' cause authentication errors. Since this service account uses a'
        ' custom role, please verify that the custom role includes this'
        " permission. If not, you'll need to either add this permission to"
        f' the custom role, or grant the [{_RUN_INVOKER_ROLE}] role to the'
        ' service account directly.\n'
    )

  should_bind = console_io.CanPrompt() and console_io.PromptContinue(
      default=False,
      cancel_on_no=True,
      prompt_string=prompt_string
      + ' Do you want to add the invoker binding to the IAM policy of'
      ' your Cloud Run function?',
  )

  if not should_bind:
    log.warning(prompt_string)

  return should_bind