HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/api_lib/app/operations_util.py
# -*- coding: utf-8 -*- #
# Copyright 2016 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Utilities for working with long running operations go/long-running-operation.
"""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import json

from apitools.base.py import encoding
from apitools.base.py import exceptions as apitools_exceptions

import enum

from googlecloudsdk.api_lib.app import exceptions as app_exceptions
from googlecloudsdk.api_lib.util import exceptions as api_exceptions
from googlecloudsdk.api_lib.util import requests
from googlecloudsdk.api_lib.util import waiter
from googlecloudsdk.core import exceptions
from googlecloudsdk.core import log
from googlecloudsdk.core import resources
import six

# Default is to retry every 5 seconds for 1 hour.
DEFAULT_OPERATION_RETRY_INTERVAL = 5
DEFAULT_OPERATION_MAX_TRIES = (60 // DEFAULT_OPERATION_RETRY_INTERVAL) * 60


def CallAndCollectOpErrors(method, *args, **kwargs):
  """Wrapper for method(...) which re-raises operation-style errors.

  Args:
    method: Original method to call.
    *args: Positional arguments to method.
    **kwargs: Keyword arguments to method.

  Raises:
    MiscOperationError: If the method call itself raises one of the exceptions
      listed below. Otherwise, the original exception is raised. Preserves
      stack trace. Re-uses the error string from original error or in the case
      of HttpError, we synthesize human-friendly string from HttpException.
      However, HttpException is neither raised nor part of the stack trace.

  Returns:
    Result of calling method(*args, **kwargs).
  """
  try:
    return method(*args, **kwargs)
  except apitools_exceptions.HttpError as http_err:
    # Create HttpException locally only to get its human friendly string
    _ReraiseMiscOperationError(api_exceptions.HttpException(http_err))
  except (OperationError, OperationTimeoutError, app_exceptions.Error) as err:
    _ReraiseMiscOperationError(err)


def _ReraiseMiscOperationError(err):
  """Transform and re-raise error helper."""
  exceptions.reraise(MiscOperationError(six.text_type(err)))


class MiscOperationError(exceptions.Error):
  """Wrapper exception for errors treated as operation failures."""


class OperationError(exceptions.Error):
  pass


class OperationTimeoutError(exceptions.Error):
  pass


class Status(enum.Enum):
  PENDING = 1
  COMPLETED = 2
  ERROR = 3


class Operation(object):
  """Wrapper around Operation response objects for console output.

  Attributes:
    project: String, name of the project.
    id: String, ID of operation.
    start_time: String, time the operation started.
    status: Status enum, either PENDING, COMPLETED, or Error.
    op_resource: messages.Operation, the original Operation resource.
  """

  def __init__(self, op_response):
    """Creates the operation wrapper object."""
    res = resources.REGISTRY.ParseRelativeName(op_response.name,
                                               'appengine.apps.operations')
    self.project = res.appsId
    self.id = res.Name()
    self.start_time = _GetInsertTime(op_response)
    self.status = GetStatus(op_response)
    self.op_resource = op_response

  def __eq__(self, other):
    return (isinstance(other, Operation) and
            self.project == other.project and
            self.id == other.id and
            self.start_time == other.start_time and
            self.status == other.status and
            self.op_resource == other.op_resource)


def GetStatus(operation):
  """Returns string status for given operation.

  Args:
    operation: A messages.Operation instance.

  Returns:
    The status of the operation in string form.
  """
  if not operation.done:
    return Status.PENDING.name
  elif operation.error:
    return Status.ERROR.name
  else:
    return Status.COMPLETED.name


def _GetInsertTime(operation):
  """Finds the insertTime property and return its string form.

  Args:
    operation: A messages.Operation instance.

  Returns:
    The time the operation started in string form or None if N/A.
  """
  if not operation.metadata:
    return None
  properties = operation.metadata.additionalProperties
  for prop in properties:
    if prop.key == 'insertTime':
      return prop.value.string_value


class AppEngineOperationPoller(waiter.OperationPoller):
  """A poller for appengine operations."""

  def __init__(self, operation_service, operation_metadata_type=None):
    """Sets up poller for appengine operations.

    Args:
      operation_service: apitools.base.py.base_api.BaseApiService, api service
        for retrieving information about ongoing operation.
      operation_metadata_type: Message class for the Operation metadata (for
        instance, OperationMetadataV1, or OperationMetadataV1Beta).
    """
    self.operation_service = operation_service
    self.operation_metadata_type = operation_metadata_type
    self.warnings_seen = set()

  def IsDone(self, operation):
    """Overrides."""
    self._LogNewWarnings(operation)
    if operation.done:
      log.debug('Operation [{0}] complete. Result: {1}'.format(
          operation.name,
          json.dumps(encoding.MessageToDict(operation), indent=4)))
      if operation.error:
        raise OperationError(requests.ExtractErrorMessage(
            encoding.MessageToPyValue(operation.error)))
      return True
    log.debug('Operation [{0}] not complete. Waiting to retry.'.format(
        operation.name))
    return False

  def Poll(self, operation_ref):
    """Overrides.

    Args:
      operation_ref: googlecloudsdk.core.resources.Resource.

    Returns:
      fetched operation message.
    """
    request_type = self.operation_service.GetRequestType('Get')
    request = request_type(name=operation_ref.RelativeName())
    operation = self.operation_service.Get(request)
    self._LogNewWarnings(operation)
    return operation

  def _LogNewWarnings(self, operation):
    if self.operation_metadata_type:
      # Log any new warnings to the end user.
      new_warnings = GetWarningsFromOperation(
          operation, self.operation_metadata_type) - self.warnings_seen
      for warning in new_warnings:
        log.warning(warning + '\n')
        self.warnings_seen.add(warning)

  def GetResult(self, operation):
    """Simply returns the operation.

    Args:
      operation: api_name_messages.Operation.

    Returns:
      the 'response' field of the Operation.
    """
    return operation


class AppEngineOperationBuildPoller(AppEngineOperationPoller):
  """Waits for a build to be present, or for the operation to finish."""

  def __init__(self, operation_service, operation_metadata_type):
    """Sets up poller for appengine operations.

    Args:
      operation_service: apitools.base.py.base_api.BaseApiService, api service
        for retrieving information about ongoing operation.
      operation_metadata_type: Message class for the Operation metadata (for
        instance, OperationMetadataV1, or OperationMetadataV1Beta).
    """
    super(AppEngineOperationBuildPoller, self).__init__(operation_service,
                                                        operation_metadata_type)

  def IsDone(self, operation):
    if GetBuildFromOperation(operation, self.operation_metadata_type):
      return True
    return super(AppEngineOperationBuildPoller, self).IsDone(operation)


def GetMetadataFromOperation(operation, operation_metadata_type):
  if not operation.metadata:
    return None
  return encoding.JsonToMessage(
      operation_metadata_type,
      encoding.MessageToJson(operation.metadata))


def GetBuildFromOperation(operation, operation_metadata_type):
  metadata = GetMetadataFromOperation(operation, operation_metadata_type)
  if not metadata or not metadata.createVersionMetadata:
    return None
  return metadata.createVersionMetadata.cloudBuildId


def GetWarningsFromOperation(operation, operation_metadata_type):
  metadata = GetMetadataFromOperation(operation, operation_metadata_type)
  if not metadata:
    return set()
  return set(warning for warning in metadata.warning)


def WaitForOperation(operation_service, operation,
                     max_retries=None,
                     retry_interval=None,
                     operation_collection='appengine.apps.operations',
                     message=None,
                     poller=None):
  """Wait until the operation is complete or times out.

  Args:
    operation_service: The apitools service type for operations
    operation: The operation resource to wait on
    max_retries: Maximum number of times to poll the operation
    retry_interval: Frequency of polling in seconds
    operation_collection: The resource collection of the operation.
    message: str, the message to display while progress tracker displays.
    poller: AppEngineOperationPoller to poll with, defaulting to done.
  Returns:
    The operation resource when it has completed
  Raises:
    OperationError: if the operation contains an error.
    OperationTimeoutError: when the operation polling times out

  """
  poller = poller or AppEngineOperationPoller(operation_service)
  if poller.IsDone(operation):
    return poller.GetResult(operation)
  operation_ref = resources.REGISTRY.ParseRelativeName(
      operation.name,
      operation_collection)
  if max_retries is None:
    max_retries = DEFAULT_OPERATION_MAX_TRIES - 1
  if retry_interval is None:
    retry_interval = DEFAULT_OPERATION_RETRY_INTERVAL
  if message is None:
    message = 'Waiting for operation [{}] to complete'.format(
        operation_ref.RelativeName())
  # Convert to milliseconds
  retry_interval *= 1000
  try:
    completed_operation = waiter.WaitFor(
        poller,
        operation_ref,
        message,
        pre_start_sleep_ms=1000,
        max_retrials=max_retries,
        exponential_sleep_multiplier=1.0,
        sleep_ms=retry_interval)
  except waiter.TimeoutError:
    raise OperationTimeoutError(('Operation [{0}] timed out. This operation '
                                 'may still be underway.').format(
                                     operation.name))
  return completed_operation