HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/396/lib/googlecloudsdk/core/util/retry.py
# -*- coding: utf-8 -*- #
# Copyright 2015 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Implementation of retrying logic."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import collections
import functools
import itertools
import math
import random
import sys
import time

from googlecloudsdk.core import exceptions

try:
  # Python 3.3 and above.
  collections_abc = collections.abc
except AttributeError:
  collections_abc = collections


_DEFAULT_JITTER_MS = 1000


class RetryerState(object):
  """Object that holds the state of the retryer."""

  def __init__(self, retrial, time_passed_ms, time_to_wait_ms):
    """Initializer for RetryerState.

    Args:
      retrial: int, the retry attempt we are currently at.
      time_passed_ms: int, number of ms that passed since we started retryer.
      time_to_wait_ms: int, number of ms to wait for the until next trial.
          If this number is -1, it means the iterable item that specifies the
          next sleep value has raised StopIteration.
    """
    self.retrial = retrial
    self.time_passed_ms = time_passed_ms
    self.time_to_wait_ms = time_to_wait_ms


class RetryException(Exception):
  """Raised to stop retrials on failure."""

  def __init__(self, message, last_result, state):
    self.message = message
    self.last_result = last_result
    self.state = state
    super(RetryException, self).__init__(message)

  def __str__(self):
    return ('last_result={last_result}, last_retrial={last_retrial}, '
            'time_passed_ms={time_passed_ms},'
            'time_to_wait={time_to_wait_ms}'.format(
                last_result=self.last_result,
                last_retrial=self.state.retrial,
                time_passed_ms=self.state.time_passed_ms,
                time_to_wait_ms=self.state.time_to_wait_ms))


class WaitException(RetryException):
  """Raised when timeout was reached."""


class MaxRetrialsException(RetryException):
  """Raised when too many retrials reached."""


class Retryer(object):
  """Retries a function based on specified retry strategy."""

  def __init__(self, max_retrials=None, max_wait_ms=None,
               exponential_sleep_multiplier=None, jitter_ms=_DEFAULT_JITTER_MS,
               status_update_func=None, wait_ceiling_ms=None):
    """Initializer for Retryer.

    Args:
      max_retrials: int, max number of retrials before raising RetryException.
      max_wait_ms: int, number of ms to wait before raising
      exponential_sleep_multiplier: float, The exponential factor to use on
          subsequent retries.
      jitter_ms: int, random [0, jitter_ms] additional value to wait.
      status_update_func: func(result, state) called right after each trial.
      wait_ceiling_ms: int, maximum wait time between retries, regardless of
          modifiers added like exponential multiplier or jitter.
    """

    self._max_retrials = max_retrials
    self._max_wait_ms = max_wait_ms
    self._exponential_sleep_multiplier = exponential_sleep_multiplier
    self._jitter_ms = jitter_ms
    self._status_update_func = status_update_func
    self._wait_ceiling_ms = wait_ceiling_ms

  def _RaiseIfStop(self, result, state):
    if self._max_retrials is not None and self._max_retrials <= state.retrial:
      raise MaxRetrialsException('Reached', result, state)
    if self._max_wait_ms is not None:
      if state.time_passed_ms + state.time_to_wait_ms > self._max_wait_ms:
        raise WaitException('Timeout', result, state)

  def _GetTimeToWait(self, last_retrial, sleep_ms):
    """Get time to wait after applying modifyers.

    Apply the exponential sleep multiplyer, jitter and ceiling limiting to the
    base sleep time.

    Args:
      last_retrial: int, which retry attempt we just tried. First try this is 0.
      sleep_ms: int, how long to wait between the current trials.

    Returns:
      int, ms to wait before trying next attempt with all waiting logic applied.
    """
    wait_time_ms = sleep_ms
    if wait_time_ms:
      if self._exponential_sleep_multiplier:
        # For exponential backoff, the amount to sleep is given by:
        #   wait_time_ms * (self._exponential_sleep_multiplier ** last_retrial)
        # If a small wait ceiling is provided, the number of retrials will
        # increase pretty quickly, and thus this quantity will eventually get
        # way too large, to the point of overflowing when converting to a float.
        # Thus, we cap it at a "reasonable" value of 100 years just to avoid
        # computing it if the number would be greater than that.
        hundred_years_ms = 100*365*86400*1000
        if self._exponential_sleep_multiplier > 1.0 and last_retrial > math.log(
            hundred_years_ms / wait_time_ms,
            self._exponential_sleep_multiplier):
          wait_time_ms = hundred_years_ms
        else:
          wait_time_ms *= self._exponential_sleep_multiplier ** last_retrial
      if self._jitter_ms:
        wait_time_ms += random.random() * self._jitter_ms
      if self._wait_ceiling_ms:
        wait_time_ms = min(wait_time_ms, self._wait_ceiling_ms)
      return wait_time_ms
    return 0

  def RetryOnException(self, func, args=None, kwargs=None,
                       should_retry_if=None, sleep_ms=None):
    """Retries the function if an exception occurs.

    Args:
      func: The function to call and retry.
      args: a sequence of positional arguments to be passed to func.
      kwargs: a dictionary of positional arguments to be passed to func.
      should_retry_if: func(exc_type, exc_value, exc_traceback, state) that
          returns True or False.
      sleep_ms: int or iterable for how long to wait between trials.

    Returns:
      Whatever the function returns.

    Raises:
      RetryException, WaitException: if function is retries too many times,
        or time limit is reached.
    """

    args = args if args is not None else ()
    kwargs = kwargs if kwargs is not None else {}

    def TryFunc():
      try:
        return func(*args, **kwargs), None
      except:  # pylint: disable=bare-except
        return None, sys.exc_info()

    if should_retry_if is None:
      should_retry = lambda x, s: x[1] is not None
    else:
      def ShouldRetryFunc(try_func_result, state):
        exc_info = try_func_result[1]
        if exc_info is None:
          # No exception, no reason to retry.
          return False
        return should_retry_if(exc_info[0], exc_info[1], exc_info[2], state)
      should_retry = ShouldRetryFunc

    result, exc_info = self.RetryOnResult(
        TryFunc, should_retry_if=should_retry, sleep_ms=sleep_ms)
    if exc_info:
      # Exception that was not retried was raised. Re-raise.
      exceptions.reraise(exc_info[1], tb=exc_info[2])
    return result

  def RetryOnResult(self, func, args=None, kwargs=None,
                    should_retry_if=None, sleep_ms=None):
    """Retries the function if the given condition is satisfied.

    Args:
      func: The function to call and retry.
      args: a sequence of arguments to be passed to func.
      kwargs: a dictionary of positional arguments to be passed to func.
      should_retry_if: result to retry on or func(result, RetryerState) that
          returns True or False if we should retry or not.
      sleep_ms: int or iterable, for how long to wait between trials.

    Returns:
      Whatever the function returns.

    Raises:
      MaxRetrialsException: function retried too many times.
      WaitException: time limit is reached.
    """
    args = args if args is not None else ()
    kwargs = kwargs if kwargs is not None else {}

    start_time_ms = _GetCurrentTimeMs()
    retrial = 0
    if callable(should_retry_if):
      should_retry = should_retry_if
    else:
      should_retry = lambda x, s: x == should_retry_if

    if isinstance(sleep_ms, collections_abc.Iterable):
      sleep_gen = iter(sleep_ms)
    else:
      sleep_gen = itertools.repeat(sleep_ms)

    while True:
      result = func(*args, **kwargs)
      time_passed_ms = _GetCurrentTimeMs() - start_time_ms
      try:
        sleep_from_gen = next(sleep_gen)
      except StopIteration:
        time_to_wait_ms = -1
      else:
        time_to_wait_ms = self._GetTimeToWait(retrial, sleep_from_gen)
      state = RetryerState(retrial, time_passed_ms, time_to_wait_ms)

      if not should_retry(result, state):
        return result

      if time_to_wait_ms == -1:
        raise MaxRetrialsException('Sleep iteration stop', result, state)
      if self._status_update_func:
        self._status_update_func(result, state)
      self._RaiseIfStop(result, state)
      _SleepMs(time_to_wait_ms)
      retrial += 1


def RetryOnException(f=None, max_retrials=None, max_wait_ms=None,
                     sleep_ms=None, exponential_sleep_multiplier=None,
                     jitter_ms=_DEFAULT_JITTER_MS,
                     status_update_func=None,
                     should_retry_if=None):
  """A decorator to retry on exceptions.

  Args:
    f: a function to run possibly multiple times
    max_retrials: int, max number of retrials before raising RetryException.
    max_wait_ms: int, number of ms to wait before raising
    sleep_ms: int or iterable, for how long to wait between trials.
    exponential_sleep_multiplier: float, The exponential factor to use on
        subsequent retries.
    jitter_ms: int, random [0, jitter_ms] additional value to wait.
    status_update_func: func(result, state) called right after each trail.
    should_retry_if: func(exc_type, exc_value, exc_traceback, state) that
        returns True or False.

  Returns:
    A version of f that is executed potentially multiple times and that
    yields the first returned value or the last exception raised.
  """

  if f is None:
    # Returns a decorator---based on retry Retry with max_retrials,
    # max_wait_ms, sleep_ms, etc. fixed.
    return functools.partial(
        RetryOnException,
        exponential_sleep_multiplier=exponential_sleep_multiplier,
        jitter_ms=jitter_ms,
        max_retrials=max_retrials,
        max_wait_ms=max_wait_ms,
        should_retry_if=should_retry_if,
        sleep_ms=sleep_ms,
        status_update_func=status_update_func)

  @functools.wraps(f)
  def DecoratedFunction(*args, **kwargs):
    retryer = Retryer(
        max_retrials=max_retrials,
        max_wait_ms=max_wait_ms,
        exponential_sleep_multiplier=exponential_sleep_multiplier,
        jitter_ms=jitter_ms,
        status_update_func=status_update_func)
    try:
      return retryer.RetryOnException(f, args=args, kwargs=kwargs,
                                      should_retry_if=should_retry_if,
                                      sleep_ms=sleep_ms)
    except MaxRetrialsException as mre:
      to_reraise = mre.last_result[1]
      exceptions.reraise(to_reraise[1], tb=to_reraise[2])

  return DecoratedFunction


def _GetCurrentTimeMs():
  return int(time.time() * 1000)


def _SleepMs(time_to_wait_ms):
  time.sleep(time_to_wait_ms / 1000.0)