HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/api_lib/compute/waiters.py
# -*- coding: utf-8 -*- #
# Copyright 2019 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilities for waiting on Compute Engine operations."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

from apitools.base.py import exceptions as apitools_exceptions

from googlecloudsdk.api_lib.compute import batch_helper
from googlecloudsdk.api_lib.compute import path_simplifier
from googlecloudsdk.api_lib.compute import single_request_helper
from googlecloudsdk.api_lib.util import exceptions as http_exceptions
from googlecloudsdk.command_lib.util import time_util
from googlecloudsdk.core import log
from googlecloudsdk.core import properties

_POLLING_TIMEOUT_SEC = 60 * 30
_MAX_TIME_BETWEEN_POLLS_SEC = 5
_SERVICE_UNAVAILABLE_RETRY_COUNT = 3

# The set of possible operation types is {insert, delete, update,
# *.insert, *.delete, *.update} + all verbs. For example,
#
#        verb                    op type
#   Instances.setMedata        setMetdata
#   Instances.insert           insert
#   InsetanceTempalte.delete   compute.instanceTemplates.delete
# In our status reporting, we use the following
# mapping. Anything not in the map is reported as "Updated".
_HUMAN_FRIENDLY_OPERATION_TYPE_SUFFIXES = {
    'createSnapshot': {
        'past': 'created',
        'present': 'create'
    },
    'recreateInstancesInstanceGroupManager': {
        'past': 'recreated',
        'present': 'recreate'
    },
    'createFirewallSecurityPolicy': {
        'past': 'created',
        'present': 'create'
    },
    'deleteFirewallSecurityPolicy': {
        'past': 'deleted',
        'present': 'delete'
    },
    'createNetworkEdgeSecurityService': {
        'past': 'created',
        'present': 'create'
    },
    'deleteNetworkEdgeSecurityService': {
        'past': 'deleted',
        'present': 'delete'
    },
    'createPSCServiceEndpoint': {
        'past': 'created',
        'present': 'create'
    },
    'deletePscForwardingRule': {
        'past': 'deleted',
        'present': 'delete'
    },
    'createRegionPscForwardingRule': {
        'past': 'created',
        'present': 'create'
    },
    'deleteRegionPscForwardingRule': {
        'past': 'deleted',
        'present': 'delete'
    },
    'releaseExternalIpv6Address': {
        'past': 'deleted',
        'present': 'delete'
    },
    'reserveExternalIpv6Address': {
        'past': 'created',
        'present': 'create'
    },
    'insert': {
        'past': 'created',
        'present': 'create'
    },
    'delete': {
        'past': 'deleted',
        'present': 'delete'
    },
    'update': {
        'past': 'updated',
        'present': 'update'
    },
    'invalidateCache': {
        'past': 'completed invalidation for',
        'present': 'complete invalidation for'
    }
}

# The set of operation types for multi-nested resources.
# For example, ReservationSubBlocks.performMaintenance is a multi-nested
# resource because ReservationSubBlocks has the following parent resource name
# format:
# reservations/<reservation-name>/<reservation-block-name>/<reservation-subblock-name>.
_MULTI_NESTED_RESOURCE_OPERATION_TYPES = [
    'ReservationSubBlocks.performMaintenance',
    'ReservationSubBlocks.reportFaulty',
]


def _HumanFriendlyNamesForOp(op_type):
  for s in _HUMAN_FRIENDLY_OPERATION_TYPE_SUFFIXES:
    if op_type.endswith(s):
      return _HUMAN_FRIENDLY_OPERATION_TYPE_SUFFIXES.get(s)

  return {'past': 'updated', 'present': 'update'}


def _HumanFriendlyNameForOpPastTense(op_type):
  return _HumanFriendlyNamesForOp(op_type)['past']


def _HumanFriendlyNameForOpPresentTense(op_type):
  return _HumanFriendlyNamesForOp(op_type)['present']


def _IsDeleteOp(op_type):
  return _HumanFriendlyNameForOpPastTense(op_type) == 'deleted'


def _RecordProblems(operation, warnings, errors):
  """Records any warnings and errors into the given lists."""
  for warning in operation.warnings or []:
    warnings.append(warning.message)
  if operation.error:
    for error in operation.error.errors or []:
      errors.append((operation.httpErrorStatusCode, error))


def _RecordUnfinishedOperations(operations, errors):
  """Adds error messages stating that the given operations timed out."""
  pending_resources = [operation.targetLink for operation, _ in operations]
  errors.append((
      None,
      (
          'Did not {action} the following resources within '
          '{timeout}s: {links}. These operations may still be '
          'underway remotely and may still succeed; use gcloud list '
          'and describe commands or '
          'https://console.developers.google.com/ to '
          'check resource state'
      ).format(
          action=_HumanFriendlyNameForOpPresentTense(
              operations[0][0].operationType
          ),
          timeout=_POLLING_TIMEOUT_SEC,
          links=', '.join(pending_resources),
      ),
  ))


class OperationData(object):
  """Holds all information necessary to poll given operation.

  Attributes:
    operation: An Operation object to poll.
    operation_service: The service that can be used to get operation object.
    resource_service: The service of the collection being mutated by the
      operation. If the operation type is not delete, this service is used to
      fetch the mutated object after the operation is done.
    project: str, The project to which the resource belong.
    resize_request_name: str, Name of the resize request being created.
    no_followup: str, If True, do not send followup GET request.
    followup_override: str, Overrides the target resource name when it is
      different from the resource name which is used to poll.
    always_return_operation: If true, always return operation object even if the
      operation fails.
    errors: An output parameter for capturing errors.
    warnings: An output parameter for capturing warnings.
  """

  # TODO: b/313849714 - Remove resize_request_name from this class once the bug
  # is fixed.
  def __init__(
      self,
      operation,
      operation_service,
      resource_service,
      project=None,
      resize_request_name=None,
      no_followup=False,
      followup_override=None,
      always_return_operation=False,
  ):
    self.operation = operation
    self.operation_service = operation_service
    self.resource_service = resource_service
    self.project = project
    self.resize_request_name = resize_request_name
    self.no_followup = no_followup
    self.followup_override = followup_override
    self.always_return_operation = always_return_operation

    self.errors = []
    self.warnings = []

  def __eq__(self, o):
    if not isinstance(o, OperationData):
      return False
    return (self.operation == o.operation and self.project == o.project and
            self.operation_service == o.operation_service and
            self.resource_service == o.resource_service and
            self.no_followup == o.no_followup and
            self.followup_override == o.followup_override)

  def __hash__(self):
    return (hash(self.operation.selfLink) ^ hash(self.project)
            ^ hash(self.operation_service) ^ hash(self.resource_service)
            ^ hash(self.no_followup) ^ hash(self.followup_override))

  def __ne__(self, o):
    return not self == o

  def SetOperation(self, operation):
    """"Updates the operation.

    Args:
      operation: Operation to be assigned.
    """
    self.operation = operation

  def IsGlobalOrganizationOperation(self):
    if not hasattr(self.operation_service.client,
                   'globalOrganizationOperations'):
      return False
    return (self.operation_service ==
            self.operation_service.client.globalOrganizationOperations)

  def IsDone(self):
    """Returns true if the operation is done."""
    operation_type = self.operation_service.GetResponseType('Get')
    done = operation_type.StatusValueValuesEnum.DONE
    return self.operation.status == done

  def _SupportOperationWait(self):
    return 'Wait' in self.operation_service.GetMethodsList()

  def ResourceGetRequest(self):
    """"Generates apitools request message to get the resource."""

    target_link = self.operation.targetLink

    if self.project:
      request = self.resource_service.GetRequestType('Get')(
          project=self.project)
    else:
      # Gets the flexible resource ID.
      if target_link is None:
        log.status.write('{0}.\n'.format(
            _HumanFriendlyNameForOpPastTense(
                self.operation.operationType).capitalize()))
        return
      token_list = target_link.split('/')
      flexible_resource_id = token_list[-1]
      request = self.resource_service.GetRequestType('Get')(
          securityPolicy=flexible_resource_id)
    if self.operation.zone:
      request.zone = path_simplifier.Name(self.operation.zone)
    elif self.operation.region:
      request.region = path_simplifier.Name(self.operation.region)
    resource_params = self.resource_service.GetMethodConfig(
        'Get'
    ).ordered_params
    # b/394563040 - Temporary fix for issue where InvalidUserInputError is
    # printed after update operation completes for wireGroups
    if (
        'crossSiteNetwork' in resource_params
        and 'global' not in resource_params
    ):
      resource_params.insert(1, 'global')
    name_field = resource_params[-1]
    if len(resource_params) == 4:
      # This is a nested resource, which means it has four params
      # used to locate it: project, scope (zone/region/global),
      # parent_resource_name, and resource_name. Top level resources only have
      # three such params: project, scope (zone/region/global), resource_name.

      # TODO: b/313849714 - Remove the if block once the bug is fixed.
      if self.resize_request_name:
        target_link = (
            target_link + '/resizeRequests/' + self.resize_request_name
        )
      parent_resource_field = resource_params[2]
      parent_resource_name = target_link.split('/')[-3]

      # Create the parent resource name for multi-nested resources.
      for op_type in _MULTI_NESTED_RESOURCE_OPERATION_TYPES:
        if self.operation.operationType.startswith(op_type):
          parent_resource_name = '{}/{}/{}/{}'.format(
              target_link.split('/')[-6],
              target_link.split('/')[-5],
              target_link.split('/')[-4],
              target_link.split('/')[-3],
          )
          break
      setattr(request, parent_resource_field, parent_resource_name)
    resource_name = self.followup_override or path_simplifier.Name(target_link)
    setattr(request, name_field, resource_name)
    return request

  def _OperationRequest(self, verb):
    """Generates apitools request message to poll the operation."""

    if self.project:
      request = self.operation_service.GetRequestType(verb)(
          operation=self.operation.name, project=self.project)
    else:
      # Fetches the parent ID from the operation name.
      token_list = self.operation.name.split('-')
      parent_id = 'organizations/' + token_list[1]
      request = self.operation_service.GetRequestType(verb)(
          operation=self.operation.name, parentId=parent_id)
    if self.operation.zone:
      request.zone = path_simplifier.Name(self.operation.zone)
    elif self.operation.region:
      request.region = path_simplifier.Name(self.operation.region)
    return request

  def OperationGetRequest(self):
    """Generates apitools request message for operations.get method."""
    return self._OperationRequest('Get')

  def OperationWaitRequest(self):
    """Generates apitools request message for operations.wait method."""
    return self._OperationRequest('Wait')

  def _CallService(self, method, request):
    try:
      return method(request)
    except apitools_exceptions.HttpError as e:
      http_err = http_exceptions.HttpException(e)
      self.errors.append((http_err.error.status_code, http_err.message))
      _RecordProblems(self.operation, self.warnings, self.errors)
      raise

  def _PollUntilDoneUsingOperationGet(self, timeout_sec=_POLLING_TIMEOUT_SEC):
    """Polls the operation with operation Get method."""
    get_request = self.OperationGetRequest()
    start = time_util.CurrentTimeSec()
    poll_time_interval = 0
    max_poll_interval = 5  # 5 seconds

    while True:
      if time_util.CurrentTimeSec() - start > timeout_sec:
        self.errors.append(
            (None, 'operation {} timed out'.format(self.operation.name)))
        _RecordProblems(self.operation, self.warnings, self.errors)
        return

      try:
        self.operation = self._CallService(self.operation_service.Get,
                                           get_request)
      except apitools_exceptions.HttpError:
        return

      if self.IsDone():
        _RecordProblems(self.operation, self.warnings, self.errors)
        return
      poll_time_interval = min(poll_time_interval + 1, max_poll_interval)
      time_util.Sleep(poll_time_interval)

  def _PollUntilDoneUsingOperationWait(self, timeout_sec=_POLLING_TIMEOUT_SEC):
    """Polls the operation with operation method."""
    wait_request = self.OperationWaitRequest()
    start = time_util.CurrentTimeSec()

    while not self.IsDone():
      if time_util.CurrentTimeSec() - start > timeout_sec:
        self.errors.append(
            (None, 'operation {} timed out'.format(self.operation.name)))
        _RecordProblems(self.operation, self.warnings, self.errors)
        return
      try:
        self.operation = self._CallService(self.operation_service.Wait,
                                           wait_request)
      except apitools_exceptions.HttpError:
        return

    _RecordProblems(self.operation, self.warnings, self.errors)

  def PollUntilDone(self, timeout_sec=_POLLING_TIMEOUT_SEC):
    """Polls the operation until it is done."""
    if self.IsDone():
      return

    if self._SupportOperationWait():
      self._PollUntilDoneUsingOperationWait(timeout_sec)
    else:
      self._PollUntilDoneUsingOperationGet(timeout_sec)

  def GetResult(self, timeout_sec=_POLLING_TIMEOUT_SEC):
    """Get the resource which is touched by the operation."""
    self.PollUntilDone(timeout_sec)
    if not self.no_followup and not self.operation.error and not _IsDeleteOp(
        self.operation.operationType):
      resource_get_request = self.ResourceGetRequest()
      try:
        return self._CallService(self.resource_service.Get,
                                 resource_get_request)
      except apitools_exceptions.HttpError:
        pass


def WaitForOperations(
    operations_data,
    http,
    batch_url,
    warnings,
    errors,
    progress_tracker=None,
    timeout=None,
    log_result=True,
):
  """Blocks until the given operations are done or until a timeout is reached.

  Args:
    operations_data: A list of OperationData objects holding Operations to poll.
    http: An HTTP object.
    batch_url: The URL to which batch requests should be sent.
    warnings: An output parameter for capturing warnings.
    errors: An output parameter for capturing errors.
    progress_tracker: progress tracker to tick while waiting for operations to
                      finish.
    timeout: The maximum amount of time, in seconds, to wait for the
      operations to reach the DONE state.
    log_result: Whether the Operation Waiter should print the result in past
      tense of each request.

  Yields:
    The resources pointed to by the operations' targetLink fields if
    the operation type is not delete. Only resources whose
    corresponding operations reach done are yielded.
  """
  if not operations_data:
    return
  timeout = timeout or _POLLING_TIMEOUT_SEC

  # Operation -> OperationData mapping will be used to reify operation_service
  # and resource_service from operation_service.Get(operation) response.
  # It is necessary because poll operation is returning only response, but we
  # also need to get operation details to know the service to poll for all
  # unprocessed_operations.
  operation_details = {}
  unprocessed_operations = []
  for operation in operations_data:
    operation_details[operation.operation.selfLink] = operation
    unprocessed_operations.append(
        (operation.operation, _SERVICE_UNAVAILABLE_RETRY_COUNT)
    )

  start = time_util.CurrentTimeSec()
  sleep_sec = 0
  # There is only one type of operation in compute API.
  # We pick the type of the first operation in the list.
  operation_type = operations_data[0].operation_service.GetResponseType('Get')

  while unprocessed_operations:
    if progress_tracker:
      progress_tracker.Tick()
    resource_requests = []
    operation_requests = []

    log.debug('Operations to inspect: %s', unprocessed_operations)
    for operation, _ in unprocessed_operations:
      # Reify operation
      data = operation_details[operation.selfLink]
      # Need to update the operation since old operation may not have all the
      # required information.
      data.SetOperation(operation)

      operation_service = data.operation_service
      resource_service = data.resource_service

      if operation.status == operation_type.StatusValueValuesEnum.DONE:
        # The operation has reached the DONE state, so we record any
        # problems it contains (if any) and proceed to get the target
        # resource if there were no problems and the operation is not
        # a deletion.
        _RecordProblems(operation, warnings, errors)

        # We shouldn't attempt to get the target resource if there was
        # anything wrong with the operation. Note that
        # httpErrorStatusCode is set only when the operation is not
        # successful.
        if (operation.httpErrorStatusCode and
            operation.httpErrorStatusCode != 200):  # httplib.OK
          if data.always_return_operation:
            yield operation
          else:
            continue

        # Just in case the server did not set httpErrorStatusCode but
        # the operation did fail, we check the "error" field.
        if operation.error:
          continue

        # If the operation is done and we do not want to get the target resource
        # but do want to print results from the operation, we return the
        # operation.
        if data.no_followup:
          yield operation
          continue

        # We shouldn't get the target resource if the operation type
        # is delete because there will be no resource left.
        if not _IsDeleteOp(operation.operationType):
          request = data.ResourceGetRequest()
          # Some operations do not have target and should not send get request.
          if request:
            resource_requests.append((resource_service, 'Get', request))

        # Only log when there is target link in the operation.
        if operation.targetLink and log_result:
          log.status.write('{0} [{1}].\n'.format(
              _HumanFriendlyNameForOpPastTense(
                  operation.operationType).capitalize(), operation.targetLink))

      else:
        # The operation has not reached the DONE state, so we add a request
        # to poll the operation.
        # TODO(b/129413862): Global org operation service supports wait API.
        if data.IsGlobalOrganizationOperation():
          request = data.OperationGetRequest()
          operation_requests.append((operation_service, 'Get', request))
        else:
          request = data.OperationWaitRequest()
          operation_requests.append((operation_service, 'Wait', request))

    requests = resource_requests + operation_requests
    if not requests:
      break
    if (
        not properties.VALUES.compute.force_batch_request.GetBool()
        and len(requests) == 1
    ):
      service, method, request_body = requests[0]
      responses, request_errors = single_request_helper.MakeSingleRequest(
          service=service, method=method, request_body=request_body)
    else:
      responses, request_errors = batch_helper.MakeRequests(
          requests=requests, http=http, batch_url=batch_url)

    all_done = True
    # If a request return error, the response will be none. In this case, append
    # the previous operation back to unprocessed_operations. Thus we need to
    # save the unprocessed_operations before reset
    previous_operations = unprocessed_operations
    # save the current errors in case timeout
    current_errors = list(request_errors)
    unprocessed_operations = []
    for seq, response in enumerate(responses):
      if isinstance(response, operation_type):
        unprocessed_operations.append(
            (response, _SERVICE_UNAVAILABLE_RETRY_COUNT)
        )
        if response.status != operation_type.StatusValueValuesEnum.DONE:
          all_done = False
      elif response is None and request_errors and request_errors[0][0] == 503:
        # if the error code is 503, we do not want to record the error but
        # ignore then retry
        error = request_errors.pop(0)
        operation, retry_count = previous_operations[seq]
        retry_count -= 1
        # each request will be retried for three times for 503 error
        if retry_count > 0:
          unprocessed_operations.append((operation, retry_count))
          all_done = False
        else:
          # if three retries all fail, we return the error
          errors.append(error)
      else:
        yield response

    errors.extend(request_errors)

    # If there are no more operations, we are done.
    if not unprocessed_operations:
      break

    # If all of the operations are done, we should ignore the timeout and ignore
    # the sleep.
    if all_done:
      continue

    # Did we time out? If so, record the operations that timed out so
    # they can be reported to the user.
    if time_util.CurrentTimeSec() - start > timeout:
      if not current_errors:
        log.debug('Timeout of %ss reached.', timeout)
        _RecordUnfinishedOperations(unprocessed_operations, errors)
      else:
        # we keep retry until timeout and there is still error, we still want
        # to report 503 error to the client
        errors.extend(current_errors)
      break

    # Sleeps before trying to poll the operations again.
    sleep_sec = min(sleep_sec + 1, _MAX_TIME_BETWEEN_POLLS_SEC)
    log.debug('Sleeping for %ss.', sleep_sec)
    time_util.Sleep(sleep_sec)