HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/api_lib/functions/v2/util.py
# -*- coding: utf-8 -*- #
# Copyright 2021 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Functionality related to Cloud Functions v2 API clients."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

import enum

from apitools.base.py import encoding
from apitools.base.py import exceptions as apitools_exceptions
import frozendict
from googlecloudsdk.api_lib.cloudresourcemanager import projects_api
from googlecloudsdk.api_lib.cloudresourcemanager import projects_util as projects_api_util
from googlecloudsdk.api_lib.functions.v2 import exceptions
from googlecloudsdk.api_lib.util import apis
from googlecloudsdk.calliope import base as calliope_base
from googlecloudsdk.command_lib.projects import util as projects_util
from googlecloudsdk.core import log
from googlecloudsdk.core import properties
from googlecloudsdk.core.console import console_io
from googlecloudsdk.core.console import progress_tracker
from googlecloudsdk.core.util import encoding as encoder
from googlecloudsdk.core.util import retry
import six

_API_NAME = 'cloudfunctions'

_V2_ALPHA = 'v2alpha'
_V2_BETA = 'v2beta'
_V2_GA = 'v2'

_DEFAULT_ABORTED_MESSAGE = (
    'Aborted by user (background API operations may still be in progress).'
)

RELEASE_TRACK_TO_API_VERSION = {
    calliope_base.ReleaseTrack.ALPHA: 'v2alpha',
    calliope_base.ReleaseTrack.BETA: 'v2beta',
    calliope_base.ReleaseTrack.GA: 'v2',
}

MAX_WAIT_MS = 1820000
SLEEP_MS = 1000

# EventArc types
EA_PUBSUB_MESSAGE_PUBLISHED = 'google.cloud.pubsub.topic.v1.messagePublished'
EA_STORAGE_ARCHIVE = 'google.cloud.storage.object.v1.archived'
EA_STORAGE_DELETE = 'google.cloud.storage.object.v1.deleted'
EA_STORAGE_FINALIZE = 'google.cloud.storage.object.v1.finalized'
EA_STORAGE_UPDATE = 'google.cloud.storage.object.v1.metadataUpdated'

EVENTARC_STORAGE_TYPES = (
    EA_STORAGE_ARCHIVE,
    EA_STORAGE_DELETE,
    EA_STORAGE_FINALIZE,
    EA_STORAGE_UPDATE,
)

# EventFlow types
EF_PUBSUB_MESSAGE_PUBLISH = 'google.pubsub.topic.publish'
EF_STORAGE_ARCHIVE = 'google.storage.object.archive'
EF_STORAGE_DELETE = 'google.storage.object.delete'
EF_STORAGE_FINALIZE = 'google.storage.object.finalize'
EF_STORAGE_METADATA_UPDATE = 'google.storage.object.metadataUpdate'

EVENTFLOW_TO_EVENTARC_STORAGE_MAP = frozendict.frozendict({
    EF_STORAGE_ARCHIVE: EA_STORAGE_ARCHIVE,
    EF_STORAGE_DELETE: EA_STORAGE_DELETE,
    EF_STORAGE_FINALIZE: EA_STORAGE_FINALIZE,
    EF_STORAGE_METADATA_UPDATE: EA_STORAGE_UPDATE,
})

# Legacy types
LEGACY_PUBSUB_MESSAGE_PUBLISH = (
    'providers/cloud.pubsub/eventTypes/topic.publish'
)

PUBSUB_MESSAGE_PUBLISH_TYPES = (
    EA_PUBSUB_MESSAGE_PUBLISHED,
    EF_PUBSUB_MESSAGE_PUBLISH,
    LEGACY_PUBSUB_MESSAGE_PUBLISH,
)


class ApiEnv(enum.Enum):
  TEST = 1
  AUTOPUSH = 2
  STAGING = 3
  PROD = 4


def GetProject():
  # type: () -> str
  """Returns the value of the core/project config prooerty.

  Config properties can be overridden with command line flags. If the --project
  flag was provided, this will return the value provided with the flag.
  """
  return properties.VALUES.core.project.Get(required=True)


def GetMessagesModule(release_track):
  """Returns the API messages module for GCFv2."""
  api_version = RELEASE_TRACK_TO_API_VERSION.get(release_track)
  return apis.GetMessagesModule(_API_NAME, api_version)


def GetStage(messages):
  """Returns corresponding GoogleCloudFunctionsV2(alpha|beta|ga)Stage."""
  if messages is apis.GetMessagesModule(_API_NAME, _V2_ALPHA):
    return messages.GoogleCloudFunctionsV2alphaStage
  elif messages is apis.GetMessagesModule(_API_NAME, _V2_BETA):
    return messages.GoogleCloudFunctionsV2betaStage
  else:
    return messages.GoogleCloudFunctionsV2Stage


def GetStateMessage(messages):
  """Returns corresponding GoogleCloudFunctionsV2(alpha|beta|ga)stateMessage."""
  if messages is apis.GetMessagesModule(_API_NAME, _V2_ALPHA):
    return messages.GoogleCloudFunctionsV2alphaStateMessage
  elif messages is apis.GetMessagesModule(_API_NAME, _V2_BETA):
    return messages.GoogleCloudFunctionsV2betaStateMessage
  else:
    return messages.GoogleCloudFunctionsV2StateMessage


def GetApiEndpointOverride():
  # type: () -> str | None
  """Returns the API endpoint override property value for GCF."""
  try:
    return properties.VALUES.api_endpoint_overrides.Property(
        'cloudfunctions'
    ).Get()
  except properties.NoSuchPropertyError:
    return None


def GetClientInstance(release_track):
  """Returns an API client for GCFv2."""
  api_version = RELEASE_TRACK_TO_API_VERSION.get(release_track)
  return apis.GetClientInstance(_API_NAME, api_version)


def GetStateMessagesStrings(state_messages):
  """Returns the list of string representations of the state messages."""
  return map(
      lambda st: '[{}] {}'.format(str(st.severity), st.message), state_messages
  )


def GetOperationMetadata(messages):
  """Returns corresponding GoogleCloudFunctionsV2(alpha|beta|ga)OperationMetadata."""
  if messages is apis.GetMessagesModule(_API_NAME, _V2_ALPHA):
    return messages.GoogleCloudFunctionsV2alphaOperationMetadata
  elif messages is apis.GetMessagesModule(_API_NAME, _V2_BETA):
    return messages.GoogleCloudFunctionsV2betaOperationMetadata
  elif messages is apis.GetMessagesModule(_API_NAME, _V2_GA):
    return messages.GoogleCloudFunctionsV2OperationMetadata
  else:
    raise NotImplementedError('Invalid messages module.')


def _GetOperationMetadata(messages, operation):
  return encoding.PyValueToMessage(
      GetOperationMetadata(messages),
      encoding.MessageToPyValue(operation.metadata),
  )


def _GetStageHeader(name_enum):
  """Converts NameValueValuesEnum into the header to use in progress stages."""
  return '[{}]'.format(six.text_type(name_enum).replace('_', ' ').title())


def _GetOperation(client, request):
  """Get operation and return None if doesn't exist."""
  try:
    # We got response for a GET request, so an operation exists.
    return client.projects_locations_operations.Get(request)
  except apitools_exceptions.HttpError as error:
    if error.status_code == six.moves.http_client.NOT_FOUND:
      return None
    raise


def _GetOperationAndStages(client, request, messages):
  """Returns the stages in the operation."""
  operation = _GetOperation(client, request)
  if operation.error:
    raise exceptions.StatusToFunctionsError(operation.error)

  stages = []
  if operation.metadata:
    operation_metadata = _GetOperationMetadata(messages, operation)

    for stage in operation_metadata.stages:
      stages.append(
          progress_tracker.Stage(
              _GetStageHeader(stage.name), key=six.text_type(stage.name)
          )
      )

  return operation, stages


def _GetOperationAndLogProgress(client, request, tracker, messages):
  """Returns a Boolean indicating whether the request has completed."""
  operation = client.projects_locations_operations.Get(request)
  if operation.error:
    raise exceptions.StatusToFunctionsError(
        operation.error, error_message=OperationErrorToString(operation.error)
    )

  operation_metadata = _GetOperationMetadata(messages, operation)
  # cs/symbol:google.cloud.functions.v2main.OperationMetadata.Stage
  for stage in operation_metadata.stages:
    stage_in_progress = (
        stage.state is GetStage(messages).StateValueValuesEnum.IN_PROGRESS
    )
    stage_complete = (
        stage.state is GetStage(messages).StateValueValuesEnum.COMPLETE
    )

    if not stage_in_progress and not stage_complete:
      continue

    stage_key = str(stage.name)
    if tracker.IsComplete(stage_key):
      # Cannot update a completed stage in the tracker
      continue

    # Start running a stage
    if tracker.IsWaiting(stage_key):
      tracker.StartStage(stage_key)

    # Update stage message, including Build logs URL if applicable
    stage_message = stage.message or ''
    if stage_in_progress:
      stage_message = (stage_message or 'In progress') + '... '
    else:
      stage_message = ''

    if stage.resourceUri and stage_key == 'BUILD':
      stage_message += 'Logs are available at [{}]'.format(stage.resourceUri)

    tracker.UpdateStage(stage_key, stage_message)

    # Complete a finished stage
    if stage_complete:
      if stage.stateMessages:
        tracker.CompleteStageWithWarnings(
            stage_key, GetStateMessagesStrings(stage.stateMessages)
        )
      else:
        tracker.CompleteStage(stage_key)

  # We try to detect custom IAM roles in migrtion setup config.
  if operation.done and operation_metadata.customIamRoleDetected:
    # TODO(b/327636194): update the link once the doc is ready.
    log.warning(
        'A custom IAM role was detected. If this role is used to manage or'
        ' access your function, you must manually add the equivalent Cloud Run'
        ' permissions and add the binding to your Cloud Run function. Refer to'
        ' https://cloud.google.com/run/docs/reference/iam/permissions for'
        ' details.'
    )

  return operation


def WaitForOperation(
    client, messages, operation, description, extra_stages=None
):
  """Wait for a long-running operation (LRO) to complete.

  Args:
    client: The GCFv2 API client.
    messages: The GCFv2 message stubs.
    operation: The operation message response.
    description: str, the description of the waited operation.
    extra_stages: List[progress_tracker.Stage]|None, list of optional stages for
      the progress tracker to watch. The GCF 2nd api returns unexpected stages
      in the case of rollbacks.

  Returns:
    cloudfunctions_v2_messages.Operation, the finished operation.
  """

  def IsNotDoneAndIsMissingStages(res, _):
    op, stages = res
    return not stages and not op.done

  request = messages.CloudfunctionsProjectsLocationsOperationsGetRequest(
      name=operation.name
  )
  # Wait for stages to be loaded.
  with progress_tracker.ProgressTracker(
      'Preparing function', aborted_message=_DEFAULT_ABORTED_MESSAGE
  ) as tracker:
    retryer = retry.Retryer(max_wait_ms=MAX_WAIT_MS)
    try:
      # List[progress_tracker.Stage]
      operation, stages = retryer.RetryOnResult(
          _GetOperationAndStages,
          args=[client, request, messages],
          should_retry_if=IsNotDoneAndIsMissingStages,
          sleep_ms=SLEEP_MS,
      )
    except retry.WaitException:
      raise exceptions.FunctionsError(
          'Operation {0} is taking too long'.format(operation.name)
      )

  if extra_stages is not None:
    stages += extra_stages

  # Wait for LRO to complete.
  description += '...'

  with progress_tracker.StagedProgressTracker(
      description, stages, aborted_message=_DEFAULT_ABORTED_MESSAGE
  ) as tracker:
    if operation.done and not stages:
      # No stages to show in the progress tracker so just return the operation.
      return operation

    retryer = retry.Retryer(max_wait_ms=MAX_WAIT_MS)
    try:
      operation = retryer.RetryOnResult(
          _GetOperationAndLogProgress,
          args=[client, request, tracker, messages],
          should_retry_if=lambda op, _: not op.done,
          sleep_ms=SLEEP_MS,
      )
    except retry.WaitException:
      raise exceptions.FunctionsError(
          'Operation {0} is taking too long'.format(request.name)
      )

  return operation


def OperationErrorToString(error):
  """Returns a human readable string representation from the operation.

  Args:
    error: A string representing the raw json of the operation error.

  Returns:
    A human readable string representation of the error.
  """
  error_message = 'OperationError: code={0}, message={1}'.format(
      error.code, encoder.Decode(error.message)
  )
  messages = apis.GetMessagesModule('cloudfunctions', _V2_ALPHA)
  if error.details:
    for detail in error.details:
      sub_error = encoding.PyValueToMessage(
          messages.Status, encoding.MessageToPyValue(detail)
      )
      if sub_error.code is not None or sub_error.message is not None:
        error_message += '\n' + OperationErrorToString(sub_error)
  return error_message


def HasRoleBinding(iam_policy, sa_email, role):
  # type(Policy, str, str) -> bool
  """Returns whether the given SA has the given role bound in given policy.

  Args:
    iam_policy: The IAM policy to check.
    sa_email: The service account to check.
    role: The role to check for.
  """
  # iam_policy.bindings structure:
  # list[<Binding
  #       members=['serviceAccount:member@thing.iam.gserviceaccount.com', ...],
  #       role='roles/somerole'>...]
  return any(
      'serviceAccount:{}'.format(sa_email) in b.members and b.role == role
      for b in iam_policy.bindings
  )


def PromptToBindRoleIfMissing(sa_email, role, alt_roles=None, reason=''):
  # type: (str, str, tuple[str] | None, str) -> None
  """Prompts to bind the role to the service account in project level if missing.

  If the console cannot prompt, a warning is logged instead.

  Args:
    sa_email: The service account email to bind the role to.
    role: The role to bind if missing.
    alt_roles: Alternative roles to check that dismiss the need to bind the
      specified role.
    reason: Extra information to print explaining why the binding is necessary.
  """
  alt_roles = alt_roles or []
  project_ref = projects_util.ParseProject(GetProject())
  member = 'serviceAccount:{}'.format(sa_email)
  try:
    iam_policy = projects_api.GetIamPolicy(project_ref)
    if any(HasRoleBinding(iam_policy, sa_email, r) for r in [role, *alt_roles]):
      return

    log.status.Print(
        'Service account [{}] is missing the role [{}].\n{}'.format(
            sa_email, role, reason
        )
    )

    bind = console_io.CanPrompt() and console_io.PromptContinue(
        prompt_string='\nBind the role [{}] to service account [{}]?'.format(
            role, sa_email
        )
    )
    if not bind:
      log.warning('Manual binding of above role may be necessary.\n')
      return

    projects_api.AddIamPolicyBinding(project_ref, member, role)
    log.status.Print('Role successfully bound.\n')
  except apitools_exceptions.HttpForbiddenError:
    log.warning(
        (
            'Your account does not have permission to check or bind IAM'
            ' policies to project [%s]. If the deployment fails, ensure [%s]'
            ' has the role [%s] before retrying.'
        ),
        project_ref,
        sa_email,
        role,
    )


_rm_messages = projects_api_util.GetMessages()

_LOG_TYPES = frozenset([
    _rm_messages.AuditLogConfig.LogTypeValueValuesEnum.ADMIN_READ,
    _rm_messages.AuditLogConfig.LogTypeValueValuesEnum.DATA_READ,
    _rm_messages.AuditLogConfig.LogTypeValueValuesEnum.DATA_WRITE,
])


def _LookupAuditConfig(iam_policy, service):
  # type: (Policy, str) -> AuditConfig
  """Looks up the audit config for the given service.

  If no audit config is found, a new one is created and attached to the given
  policy.

  Args:
    iam_policy: The IAM policy to look through.
    service: The service to find the audit config for.

  Returns:
    The audit config for the given service or a blank new one if not found.
  """
  # iam_policy.auditConfigs structure:
  # list[<AuditConfig
  #       auditLogConfigs=[<AuditLogConfig<logType=...>, ...],
  #       service='foo.googleapis.com'>...]
  for ac in iam_policy.auditConfigs:
    if ac.service == service:
      return ac

  audit_config = _rm_messages.AuditConfig(service=service, auditLogConfigs=[])
  iam_policy.auditConfigs.append(audit_config)
  return audit_config


def PromptToEnableDataAccessAuditLogs(service):
  # type: (str) -> None
  """Prompts to enable Data Access audit logs for the given service.

  If the console cannot prompt, a warning is logged instead.

  Args:
    service: The service to enable Data Access audit logs for.
  """
  project = GetProject()
  project_ref = projects_util.ParseProject(project)
  warning_msg = (
      'If audit logs are not fully enabled for [{}], your function may'
      ' fail to receive some events.'.format(service)
  )

  try:
    policy = projects_api.GetIamPolicy(project_ref)
  except apitools_exceptions.HttpForbiddenError:
    log.warning(
        'You do not have permission to retrieve the IAM policy and check'
        ' whether Data Access audit logs are enabled for [{}]. {}'.format(
            service, warning_msg
        )
    )
    return

  audit_config = _LookupAuditConfig(policy, service)

  enabled_log_types = set(lc.logType for lc in audit_config.auditLogConfigs)
  if enabled_log_types == _LOG_TYPES:
    return

  log.status.Print(
      'Some Data Access audit logs are disabled for [{}]: '
      'https://console.cloud.google.com/iam-admin/audit?project={}'.format(
          service, project
      )
  )

  if not console_io.CanPrompt():
    log.warning(warning_msg)
    return

  log.status.Print(warning_msg)
  if not console_io.PromptContinue(
      prompt_string='\nEnable all Data Access audit logs for [{}]?'.format(
          service
      )
  ):
    return

  # Create log configs for any missing log types.
  log_types_to_enable = [lt for lt in _LOG_TYPES if lt not in enabled_log_types]
  audit_config.auditLogConfigs.extend(
      [_rm_messages.AuditLogConfig(logType=lt) for lt in log_types_to_enable]
  )

  try:
    projects_api.SetIamPolicy(project_ref, policy, update_mask='auditConfigs')
    log.status.Print('Data Access audit logs successfully enabled.')
  except apitools_exceptions.HttpForbiddenError:
    log.warning(
        'You do not have permission to update the IAM policy and ensure Data'
        ' Access audit logs are enabled for [{}].'.format(service)
    )


def GetCloudFunctionsApiEnv():
  """Determine the cloudfunctions API env the gcloud cmd is using."""
  api_string = GetApiEndpointOverride()
  if api_string is None:
    return ApiEnv.PROD
  if 'test-cloudfunctions' in api_string:
    return ApiEnv.TEST
  if 'autopush-cloudfunctions' in api_string:
    return ApiEnv.AUTOPUSH
  if 'staging-cloudfunctions' in api_string:
    return ApiEnv.STAGING

  return ApiEnv.PROD