HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/api_lib/eventarc/base.py
# -*- coding: utf-8 -*- #
# Copyright 2021 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Base client for Eventarc API interaction."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

from apitools.base.py import exceptions as apitools_exceptions
from googlecloudsdk.api_lib.util import apis
from googlecloudsdk.api_lib.util import exceptions
from googlecloudsdk.api_lib.util import waiter
from googlecloudsdk.core import resources


class EventarcClientBase(object):
  """Base Client for interaction with of Eventarc API."""

  def __init__(self, api_name, api_version, resource_label):
    client = apis.GetClientInstance(api_name, api_version)
    self._operation_service = client.projects_locations_operations
    self._resource_label = resource_label

  def WaitFor(self, operation, operation_type, resource_ref, loading_msg=''):
    """Waits until the given long-running operation is complete.

    Args:
      operation: the long-running operation to wait for.
      operation_type: str, the type of operation (Creating, Updating or
        Deleting).
      resource_ref: Resource to reference.
      loading_msg: str, the message prompt to the user for a long-running
        operation.

    Returns:
      The long-running operation's response.

    Raises:
      HttpException: when failing to pull the long-running operation's status.
    """
    poller = waiter.CloudOperationPollerNoResources(self._operation_service)
    operation_ref = resources.REGISTRY.Parse(
        operation.name, collection='eventarc.projects.locations.operations')
    resource_name = resource_ref.Name()
    project_name = resource_ref.Parent().Parent().Name()
    location_name = resource_ref.Parent().Name()
    message = ('{} {} [{}] in project [{}], '
               'location [{}]').format(operation_type, self._resource_label,
                                       resource_name, project_name,
                                       location_name)
    if loading_msg:
      message = '{}, {}'.format(message, loading_msg)
    try:
      return waiter.WaitFor(
          poller, operation_ref, message, wait_ceiling_ms=20000
      )
    except apitools_exceptions.HttpForbiddenError as e:
      desc_cmd = 'gcloud eventarc {} describe {} --location={}'.format(
          self._resource_label_plural, resource_name, location_name
      )
      error_message = (
          'Failed to poll status of the operation, but the operation may have '
          'succeeded. {status_message} After fixing the permission issue, '
          'either check the %s by running `%s`, or rerun the original '
          'command.'
      ) % (self._resource_label, desc_cmd)
      raise exceptions.HttpException(e, error_format=error_message)

  @property
  def _resource_label_plural(self):
    return '{}s'.format(self._resource_label)