HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/394/lib/googlecloudsdk/api_lib/pubsub/lite_topics.py
# -*- coding: utf-8 -*- #
# Copyright 2021 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Utilities for Pub/Sub Lite topics."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

from google.cloud.pubsublite import cloudpubsub
from google.cloud.pubsublite import types
from google.cloud.pubsublite.cloudpubsub import message_transforms
from googlecloudsdk.api_lib.pubsub import topics
from googlecloudsdk.command_lib.pubsub import lite_util
from googlecloudsdk.core import gapic_util
from googlecloudsdk.core.util import http_encoding


def GetDefaultPublisherClient():
  return cloudpubsub.PublisherClient(
      credentials=gapic_util.GetGapicCredentials())


class PublisherClient(object):
  """Wrapper client for a Pub/Sub Lite publisher."""

  def __init__(self, client=None):
    self._client = client or GetDefaultPublisherClient()

  def __enter__(self):
    self._client.__enter__()
    return self

  def __exit__(self, exc_type, exc_value, traceback):
    self._client.__exit__(exc_type, exc_value, traceback)

  def _TopicResourceToPath(self, resource):
    return types.TopicPath(
        project=lite_util.ProjectIdToProjectNumber(resource.projectsId),
        location=lite_util.LocationToZoneOrRegion(resource.locationsId),
        name=resource.topicsId)

  def Publish(self,
              topic_resource,
              message=None,
              ordering_key=None,
              attributes=None,
              event_time=None):
    """Publishes a message to the specified Pub/Sub Lite topic.

    Args:
      topic_resource: The pubsub.lite_topic resource to publish to.
      message: The string message to publish.
      ordering_key: The key for ordering delivery to subscribers.
      attributes: A dict of attributes to attach to the message.
      event_time: A user-specified event timestamp.

    Raises:
      EmptyMessageException: if the message is empty.
      PublishOperationException: if the publish operation is not successful.

    Returns:
      The messageId of the published message, containing the Partition and
      Offset.
    """
    if not message and not attributes:
      raise topics.EmptyMessageException(
          'You cannot send an empty message. You must specify either a '
          'MESSAGE, one or more ATTRIBUTE, or both.')
    topic_path = self._TopicResourceToPath(topic_resource)
    attributes = attributes or {}
    if event_time:
      attributes[message_transforms.PUBSUB_LITE_EVENT_TIME] = (
          message_transforms.encode_attribute_event_time(event_time))
    try:
      return types.MessageMetadata.decode(
          self._client.publish(topic_path, http_encoding.Encode(message),
                               ordering_key, **attributes).result())
    except Exception as e:
      raise topics.PublishOperationException(
          'Publish operation failed with error: {error}'.format(error=e))