HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/396/lib/googlecloudsdk/api_lib/pubsub/lite_subscriptions.py
# -*- coding: utf-8 -*- #
# Copyright 2021 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Utilities for Pub/Sub Lite subscriptions."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

from concurrent import futures
import time
from typing import Optional

from google.cloud.pubsublite import cloudpubsub
from google.cloud.pubsublite import types
from google.pubsub_v1 import PubsubMessage
from googlecloudsdk.command_lib.pubsub import lite_util
from googlecloudsdk.core import exceptions
from googlecloudsdk.core import gapic_util
from googlecloudsdk.core import log
from six.moves import queue

_MAX_INT64 = 0x7FFFFFFFFFFFFFFF


class SubscribeOperationException(exceptions.Error):
  """Error when something went wrong while subscribing."""


def GetDefaultSubscriberClient():
  return cloudpubsub.SubscriberClient(
      credentials=gapic_util.GetGapicCredentials())


class SubscriberClient(object):
  """GCloud wrapper client for a Pub/Sub Lite subscriber."""

  def __init__(self,
               subscription_resource,
               partitions,
               max_messages,
               auto_ack,
               client=None):
    self._client = client or GetDefaultSubscriberClient()
    self._messages = queue.Queue()
    self._subscription = self._SubscriptionResourceToPath(subscription_resource)
    self._partitions = {types.Partition(partition) for partition in partitions}
    self._flow_control_settings = types.FlowControlSettings(
        messages_outstanding=max_messages,
        bytes_outstanding=_MAX_INT64,
    )
    self._auto_ack = auto_ack
    self._pull_future = None

  def __enter__(self):
    self._client.__enter__()
    self._pull_future = self._client.subscribe(
        self._subscription,
        callback=self._messages.put,
        per_partition_flow_control_settings=self._flow_control_settings,
        fixed_partitions=self._partitions)
    return self

  def __exit__(self, exc_type, exc_value, traceback):
    time.sleep(1)  # Wait 1 second to ensure all acks have been processed
    if not self._pull_future.done():
      try:
        # Cancel the streaming pull future and get the result to prevent
        # logging an abandoned future.
        self._pull_future.cancel()
        self._pull_future.result()
      except futures.CancelledError:
        pass
    self._client.__exit__(exc_type, exc_value, traceback)

  def _SubscriptionResourceToPath(self, resource):
    return types.SubscriptionPath(
        project=lite_util.ProjectIdToProjectNumber(resource.projectsId),
        location=lite_util.LocationToZoneOrRegion(resource.locationsId),
        name=resource.subscriptionsId)

  def _RaiseIfFailed(self):
    if self._pull_future.done():
      e = self._pull_future.exception()
      if e:
        raise SubscribeOperationException(
            'Subscribe operation failed with error: {error}'.format(error=e))
      log.debug('The streaming pull future completed unexpectedly without '
                'raising an exception.')
      raise exceptions.InternalError(
          'The subscribe stream terminated unexpectedly.')

  def Pull(self) -> Optional[PubsubMessage]:
    """Pulls and optionally acks a message from the provided subscription.

    Returns:
      A PubsubMessage pulled from the subscription.
    """
    self._RaiseIfFailed()
    try:
      message = self._messages.get(timeout=1)
      if self._auto_ack:
        message.ack()
      return message
    except queue.Empty:
      return None