HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/396/lib/googlecloudsdk/api_lib/resource_manager/operations.py
# -*- coding: utf-8 -*- #
# Copyright 2016 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""CRM API Operations utilities."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import time

from apitools.base.py import encoding
from apitools.base.py import exceptions
from googlecloudsdk.api_lib.util import apis
from googlecloudsdk.core import exceptions as core_exceptions
from googlecloudsdk.core import resources
from googlecloudsdk.core import yaml
from googlecloudsdk.core.console import progress_tracker as tracker
from googlecloudsdk.core.util import retry
import ruamel.yaml


OPERATIONS_API_V1 = 'v1'
OPERATIONS_API_V3 = 'v3'


class OperationError(exceptions.Error):
  pass


def OperationsClient(version=OPERATIONS_API_V1):
  return apis.GetClientInstance('cloudresourcemanager', version)


def OperationsRegistry(version=OPERATIONS_API_V1):
  registry = resources.REGISTRY.Clone()
  registry.RegisterApiByName('cloudresourcemanager', version)
  return registry


def OperationsService(version=OPERATIONS_API_V1):
  return OperationsClient(version).operations


def OperationsMessages(version=OPERATIONS_API_V1):
  return apis.GetMessagesModule('cloudresourcemanager', version)


def OperationNameToId(operation_name):
  return operation_name[len('operations/'):]


def OperationIdToName(operation_id):
  return 'operations/{0}'.format(operation_id)


def GetOperation(operation_id):
  return OperationsService().Get(
      OperationsMessages().CloudresourcemanagerOperationsGetRequest(
          operationsId=operation_id))


def GetOperationV3(operation_id):
  return OperationsService(OPERATIONS_API_V3).Get(
      OperationsMessages(
          OPERATIONS_API_V3).CloudresourcemanagerOperationsGetRequest(
              name=OperationIdToName(operation_id)))


def WaitForOperation(operation):
  wait_message = 'Waiting for [{0}] to finish'.format(operation.name)
  with tracker.ProgressTracker(wait_message, autotick=False) as pt:
    retryer = OperationRetryer()
    poller = OperationPoller(pt)
    return retryer.RetryPollOperation(poller, operation)


def ExtractOperationResponse(operation, response_message_type):
  raw_dict = encoding.MessageToDict(operation.response)
  return encoding.DictToMessage(raw_dict, response_message_type)


def ToOperationResponse(message):
  raw_dict = encoding.MessageToDict(message)
  return encoding.DictToMessage(raw_dict,
                                OperationsMessages().Operation.ResponseValue)


class OperationRetryer(object):
  """A wrapper around a Retryer that works with CRM operations.

  Uses predefined constants for retry timing, so all CRM operation commands can
  share their retry timing settings.
  """

  def __init__(self,
               pre_start_sleep=lambda: time.sleep(1),
               max_retry_ms=2000,
               max_wait_ms=300000,
               wait_ceiling_ms=20000,
               first_retry_sleep_ms=2000):
    self._pre_start_sleep = pre_start_sleep
    self._max_retry_ms = max_retry_ms
    self._max_wait_ms = max_wait_ms
    self._wait_ceiling_ms = wait_ceiling_ms
    self._first_retry_sleep_ms = first_retry_sleep_ms

  def RetryPollOperation(self, operation_poller, operation):
    self._pre_start_sleep()
    return self._Retryer().RetryOnResult(
        lambda: operation_poller.Poll(operation),
        should_retry_if=self._ShouldRetry,
        sleep_ms=self._first_retry_sleep_ms)

  def _Retryer(self):
    return retry.Retryer(
        exponential_sleep_multiplier=2,
        max_wait_ms=self._max_wait_ms,
        wait_ceiling_ms=self._wait_ceiling_ms)

  def _ShouldRetry(self, result, state):
    if isinstance(result, exceptions.HttpError):
      return self._CheckTimePassedBelowMax(result, state)
    return self._CheckResultNotException(result)

  def _CheckTimePassedBelowMax(self, result, state):
    if state.time_passed_ms > self._max_retry_ms:
      raise result
    return True

  def _CheckResultNotException(self, result):
    if isinstance(result, Exception):
      raise result
    return not result.done


class OperationPoller(object):

  def __init__(self, progress_tracker=None):
    self._progress_tracker = progress_tracker

  def Poll(self, operation):
    if self._progress_tracker:
      self._progress_tracker.Tick()
    latest = GetOperation(OperationNameToId(operation.name))
    if latest.done and latest.error:
      raise OperationFailedException(latest)
    return latest


class OperationFailedException(core_exceptions.Error):
  """Exception for failed operations."""

  def __init__(self, operation_with_error):
    op_id = OperationNameToId(operation_with_error.name)
    error_code = operation_with_error.error.code
    error_message = operation_with_error.error.message
    message = 'Operation [{0}] failed: {1}: {2}'.format(op_id, error_code,
                                                        error_message)
    try:
      # Convert the proto message to a Python dict to safely access details
      error_py_value = encoding.MessageToPyValue(operation_with_error.error)
      details = error_py_value.get('details', [])
      if details:
        message += '\n' + yaml.dump(details)
    except (TypeError, AttributeError, ruamel.yaml.YAMLError) as e:
      # If formatting fails for any reason, fall back to the base message.
      message += f'\n(Failed to parse or format error details: {e})'
    super(OperationFailedException, self).__init__(message)


def GetUri(resource):
  """Returns the uri for resource."""
  operation_id = OperationNameToId(resource.name)
  operation_ref = OperationsRegistry().Parse(
      None,
      params={'operationsId': operation_id},
      collection='cloudresourcemanager.operations')
  return operation_ref.SelfLink()


def GetFailedOperation(operation_name, messages, error_details, error_code,
                       error_message):
  """Returns a failed operation with error details."""
  details_messages = []
  for item in error_details:
    # Each item in the details list is an 'Any' proto,
    # represented by DetailsValueListEntry
    details_messages.append(
        encoding.DictToMessage(item, messages.Status.DetailsValueListEntry))

  return messages.Operation(
      name='operations/' + operation_name,
      done=True,
      error=messages.Status(
          code=error_code, message=error_message, details=details_messages))