HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/394/lib/googlecloudsdk/api_lib/functions/v1/operations.py
# -*- coding: utf-8 -*- #
# Copyright 2021 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""A library used to interact with Operations objects."""
# TODO(b/73491568) Refactor to use api_lib.util.waiter

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

from googlecloudsdk.api_lib.functions.v1 import exceptions
from googlecloudsdk.core.console import progress_tracker as console_progress_tracker
from googlecloudsdk.core.util import encoding
from googlecloudsdk.core.util import retry

MAX_WAIT_MS = 1820000
WAIT_CEILING_MS = 2000
SLEEP_MS = 1000


def OperationErrorToString(error):
  """Returns a human readable string representation from the operation.

  Args:
    error: A string representing the raw json of the operation error.

  Returns:
    A human readable string representation of the error.
  """
  return 'OperationError: code={0}, message={1}'.format(
      error.code, encoding.Decode(error.message)
  )


# TODO(b/130604453): Remove try_set_invoker option.
def _GetOperationStatus(
    client,
    get_request,
    progress_tracker=None,
    try_set_invoker=None,
    on_every_poll=None,
):
  """Helper function for getting the status of an operation.

  Args:
    client: The client used to make requests.
    get_request: A GetOperationRequest message.
    progress_tracker: progress_tracker.ProgressTracker, A reference for the
      progress tracker to tick, in case this function is used in a Retryer.
    try_set_invoker: function to try setting invoker, see above TODO.
    on_every_poll: list of functions to execute every time we poll. Functions
      should take in Operation as an argument.

  Returns:
    True if the operation succeeded without error.
    False if the operation is not yet done.

  Raises:
    FunctionsError: If the operation is finished with error.
  """
  if try_set_invoker:
    try_set_invoker()
  if progress_tracker:
    progress_tracker.Tick()
  op = client.operations.Get(get_request)
  if op.error:
    raise exceptions.FunctionsError(OperationErrorToString(op.error))
  if on_every_poll:
    for function in on_every_poll:
      function(op)
  return op.done


# TODO(b/139026575): Remove try_set_invoker option.
def _WaitForOperation(
    client, get_request, message, try_set_invoker=None, on_every_poll=None
):
  """Wait for an operation to complete.

  No operation is done instantly. Wait for it to finish following this logic:
  * we wait 1s (jitter is also 1s)
  * we query service
  * if the operation is not finished we loop to first point
  * wait limit is 1820s - if we get to that point it means something is wrong
        and we can throw an exception

  Args:
    client:  The client used to make requests.
    get_request: A GetOperationRequest message.
    message: str, The string to print while polling.
    try_set_invoker: function to try setting invoker, see above TODO.
    on_every_poll: list of functions to execute every time we poll. Functions
      should take in Operation as an argument.

  Returns:
    True if the operation succeeded without error.

  Raises:
    FunctionsError: If the operation takes more than 1820s.
  """

  with console_progress_tracker.ProgressTracker(message, autotick=False) as pt:
    # This is actually linear retryer.
    retryer = retry.Retryer(
        exponential_sleep_multiplier=1,
        max_wait_ms=MAX_WAIT_MS,
        wait_ceiling_ms=WAIT_CEILING_MS,
    )
    try:
      retryer.RetryOnResult(
          _GetOperationStatus,
          [client, get_request],
          {
              'progress_tracker': pt,
              'try_set_invoker': try_set_invoker,
              'on_every_poll': on_every_poll,
          },
          should_retry_if=lambda done, _: not done,
          sleep_ms=SLEEP_MS,
      )
    except retry.WaitException:
      raise exceptions.FunctionsError(
          'Operation {0} is taking too long'.format(get_request.name)
      )


def Wait(
    operation,
    messages,
    client,
    notice=None,
    try_set_invoker=None,
    on_every_poll=None,
):
  """Initialize waiting for operation to finish.

  Generate get request based on the operation and wait for an operation
  to complete.

  Args:
    operation: The operation which we are waiting for.
    messages: GCF messages module.
    client: GCF client module.
    notice: str, displayed when waiting for the operation to finish.
    try_set_invoker: function to try setting invoker, see above TODO.
    on_every_poll: list of functions to execute every time we poll. Functions
      should take in Operation as an argument.

  Raises:
    FunctionsError: If the operation takes more than 620s.
  """
  if notice is None:
    notice = 'Waiting for operation to finish'
  request = messages.CloudfunctionsOperationsGetRequest()
  request.name = operation.name
  _WaitForOperation(client, request, notice, try_set_invoker, on_every_poll)