HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/command_lib/pubsub/util.py
# -*- coding: utf-8 -*- #
# Copyright 2015 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""A library that is used to support Cloud Pub/Sub commands."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

from googlecloudsdk.api_lib.pubsub import subscriptions
from googlecloudsdk.api_lib.pubsub import topics
from googlecloudsdk.api_lib.util import exceptions as exc
from googlecloudsdk.command_lib.projects import util as projects_util
from googlecloudsdk.core import exceptions
from googlecloudsdk.core import log
from googlecloudsdk.core import properties
from googlecloudsdk.core import resources
from googlecloudsdk.core.resource import resource_projector
from googlecloudsdk.core.util import times

import six

# Format for the seek time argument.
SEEK_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'


# Collection for various subcommands.
TOPICS_COLLECTION = 'pubsub.projects.topics'
TOPICS_PUBLISH_COLLECTION = 'pubsub.topics.publish'
SNAPSHOTS_COLLECTION = 'pubsub.projects.snapshots'
SNAPSHOTS_LIST_COLLECTION = 'pubsub.snapshots.list'
SUBSCRIPTIONS_COLLECTION = 'pubsub.projects.subscriptions'
SUBSCRIPTIONS_ACK_COLLECTION = 'pubsub.subscriptions.ack'
SUBSCRIPTIONS_LIST_COLLECTION = 'pubsub.subscriptions.list'
SUBSCRIPTIONS_MOD_ACK_COLLECTION = 'pubsub.subscriptions.mod_ack'
SUBSCRIPTIONS_MOD_CONFIG_COLLECTION = 'pubsub.subscriptions.mod_config'
SUBSCRIPTIONS_PULL_COLLECTION = 'pubsub.subscriptions.pull'
SUBSCRIPTIONS_SEEK_COLLECTION = 'pubsub.subscriptions.seek'
SCHEMAS_COLLECTION = 'pubsub.projects.schemas'

PUSH_AUTH_SERVICE_ACCOUNT_MISSING_ENDPOINT_WARNING = """\
Using --push-auth-service-account requires specifying --push-endpoint. This
command will continue to run while ignoring --push-auth-service-account, but
will fail in a future version. To correct a subscription configuration, run:
  $ gcloud pubsub subscriptions update SUBSCRIPTION \\
      --push-endpoint=PUSH_ENDPOINT \\
      --push-auth-service-account={SERVICE_ACCOUNT_EMAIL} [...]
"""

PUSH_AUTH_TOKEN_AUDIENCE_MISSING_REQUIRED_FLAGS_WARNING = """\
Using --push-auth-token-audience requires specifying both --push-endpoint and
--push-auth-service-account. This command will continue to run while ignoring
--push-auth-token-audience, but will fail in a future version. To correct a
subscription configuration, run:
  $ gcloud pubsub subscriptions update SUBSCRIPTION \\
      --push-endpoint={PUSH_ENDPOINT} \\
      --push-auth-service-account={SERVICE_ACCOUNT_EMAIL} \\
      --push-auth-token-audience={OPTIONAL_AUDIENCE_OVERRIDE} [...]
"""


class InvalidArgumentError(exceptions.Error):
  """The user provides invalid arguments."""


class RequestsFailedError(exceptions.Error):
  """Indicates that some requests to the API have failed."""

  def __init__(self, requests, action):
    super(RequestsFailedError, self).__init__(
        'Failed to {action} the following: [{requests}].'.format(
            action=action, requests=','.join(requests)))


def CreateFailureErrorMessage(
    original_message, default_message='Internal Error'
):
  return original_message if original_message else default_message


def ParseSnapshot(snapshot_name, project_id=''):
  project_id = _GetProject(project_id)
  return resources.REGISTRY.Parse(snapshot_name,
                                  params={'projectsId': project_id},
                                  collection=SNAPSHOTS_COLLECTION)


def ParseSubscription(subscription_name, project_id=''):
  project_id = _GetProject(project_id)
  return resources.REGISTRY.Parse(subscription_name,
                                  params={'projectsId': project_id},
                                  collection=SUBSCRIPTIONS_COLLECTION)


def ParseTopic(topic_name, project_id=''):
  project_id = _GetProject(project_id)
  return resources.REGISTRY.Parse(topic_name,
                                  params={'projectsId': project_id},
                                  collection=TOPICS_COLLECTION)


def ParseProject(project_id=None):
  project_id = _GetProject(project_id)
  return projects_util.ParseProject(project_id)


def _GetProject(project_id):
  return project_id or properties.VALUES.core.project.Get(required=True)


def SnapshotUriFunc(snapshot):
  if isinstance(snapshot, dict):
    name = snapshot['name']
  else:
    name = snapshot
  return ParseSnapshot(name).SelfLink()


def SubscriptionUriFunc(subscription):
  project = None
  if isinstance(subscription, dict):
    name = subscription['subscriptionId']
    project = subscription['projectId']
  elif isinstance(subscription, str):
    name = subscription
  else:
    name = subscription.name
  return ParseSubscription(name, project).SelfLink()


def TopicUriFunc(topic):
  if isinstance(topic, dict):
    name = topic['topicId']
  else:
    name = topic.name
  return ParseTopic(name).SelfLink()


def ParsePushConfig(args, client=None):
  """Parses configs of push subscription from args."""
  push_endpoint = args.push_endpoint
  service_account_email = getattr(args, 'SERVICE_ACCOUNT_EMAIL', None)
  audience = getattr(args, 'OPTIONAL_AUDIENCE_OVERRIDE', None)

  # TODO(b/284985002): Remove warnings when argument groups are created for
  # authenticated push flags.
  if audience is not None and (
      push_endpoint is None or service_account_email is None
  ):
    log.warning(
        PUSH_AUTH_TOKEN_AUDIENCE_MISSING_REQUIRED_FLAGS_WARNING.format(
            PUSH_ENDPOINT=push_endpoint or 'PUSH_ENDPOINT',
            SERVICE_ACCOUNT_EMAIL=service_account_email
            or 'SERVICE_ACCOUNT_EMAIL',
            OPTIONAL_AUDIENCE_OVERRIDE=audience,
        )
    )
  elif service_account_email is not None and push_endpoint is None:
    log.warning(
        PUSH_AUTH_SERVICE_ACCOUNT_MISSING_ENDPOINT_WARNING.format(
            SERVICE_ACCOUNT_EMAIL=service_account_email
        )
    )

  if push_endpoint is None:
    if HasNoWrapper(args):
      raise InvalidArgumentError(
          'argument --push-no-wrapper: --push-endpoint must be specified.'
      )
    return None

  client = client or subscriptions.SubscriptionsClient()
  oidc_token = None

  # Only set oidc_token when service_account_email is set.
  if service_account_email is not None:
    oidc_token = client.messages.OidcToken(
        serviceAccountEmail=service_account_email, audience=audience)

  no_wrapper = None
  if HasNoWrapper(args):
    write_metadata = getattr(args, 'push_no_wrapper_write_metadata', False)
    no_wrapper = client.messages.NoWrapper(writeMetadata=write_metadata)

  return client.messages.PushConfig(
      pushEndpoint=push_endpoint, oidcToken=oidc_token, noWrapper=no_wrapper)


def HasNoWrapper(args):
  return getattr(args, 'push_no_wrapper', False)


def FormatSeekTime(time):
  return times.FormatDateTime(time, fmt=SEEK_TIME_FORMAT, tzinfo=times.UTC)


def FormatDuration(duration):
  """Formats a duration argument to be a string with units.

  Args:
    duration (int): The duration in seconds.
  Returns:
    unicode: The formatted duration.
  """
  return six.text_type(duration) + 's'


def ParseAttributes(attribute_dict, messages=None):
  """Parses attribute_dict into a list of AdditionalProperty messages.

  Args:
    attribute_dict (Optional[dict]): Dict containing key=value pairs
      to parse.
    messages (Optional[module]): Module containing pubsub proto messages.
  Returns:
    list: List of AdditionalProperty messages.
  """
  messages = messages or topics.GetMessagesModule()
  attributes = []
  if attribute_dict:
    for key, value in sorted(six.iteritems(attribute_dict)):
      attributes.append(
          messages.PubsubMessage.AttributesValue.AdditionalProperty(
              key=key,
              value=value))
  return attributes


# TODO(b/32276674): Remove the use of custom *DisplayDict's.
def TopicDisplayDict(topic):
  """Creates a serializable from a Cloud Pub/Sub Topic operation for display.

  Args:
    topic: (Cloud Pub/Sub Topic) Topic to be serialized.
  Returns:
    A serialized object representing a Cloud Pub/Sub Topic
    operation (create, delete).
  """
  topic_display_dict = resource_projector.MakeSerializable(topic)
  topic_display_dict['topicId'] = topic.name
  del topic_display_dict['name']

  return topic_display_dict


def SubscriptionDisplayDict(subscription):
  """Creates a serializable from a Cloud Pub/Sub Subscription op for display.

  Args:
    subscription: (Cloud Pub/Sub Subscription) Subscription to be serialized.
  Returns:
    A serialized object representing a Cloud Pub/Sub Subscription
    operation (create, delete, update).
  """
  push_endpoint = ''
  subscription_type = 'pull'
  if subscription.pushConfig:
    if subscription.pushConfig.pushEndpoint:
      push_endpoint = subscription.pushConfig.pushEndpoint
      subscription_type = 'push'

  return {
      'subscriptionId': subscription.name,
      'topic': subscription.topic,
      'type': subscription_type,
      'pushEndpoint': push_endpoint,
      'ackDeadlineSeconds': subscription.ackDeadlineSeconds,
      'retainAckedMessages': bool(subscription.retainAckedMessages),
      'messageRetentionDuration': subscription.messageRetentionDuration,
      'enableExactlyOnceDelivery': subscription.enableExactlyOnceDelivery,
  }


def SnapshotDisplayDict(snapshot):
  """Creates a serializable from a Cloud Pub/Sub Snapshot operation for display.

  Args:
    snapshot: (Cloud Pub/Sub Snapshot) Snapshot to be serialized.

  Returns:
    A serialized object representing a Cloud Pub/Sub Snapshot operation (create,
    delete).
  """
  return {
      'snapshotId': snapshot.name,
      'topic': snapshot.topic,
      'expireTime': snapshot.expireTime,
  }


def ListSubscriptionDisplayDict(subscription):
  """Returns a subscription dict with additional fields."""
  result = resource_projector.MakeSerializable(subscription)
  result['type'] = 'PUSH' if subscription.pushConfig.pushEndpoint else 'PULL'
  subscription_ref = ParseSubscription(subscription.name)
  result['projectId'] = subscription_ref.projectsId
  result['subscriptionId'] = subscription_ref.subscriptionsId
  topic_info = ParseTopic(subscription.topic)
  result['topicId'] = topic_info.topicsId
  return result


def ListTopicDisplayDict(topic):
  topic_dict = resource_projector.MakeSerializable(topic)
  topic_ref = ParseTopic(topic.name)
  topic_dict['topic'] = topic.name
  topic_dict['topicId'] = topic_ref.topicsId
  del topic_dict['name']
  return topic_dict


def ListTopicSubscriptionDisplayDict(topic_subscription):
  """Returns a topic_subscription dict with additional fields."""
  result = resource_projector.MakeSerializable(
      {'subscription': topic_subscription})

  subscription_ref = ParseSubscription(topic_subscription)
  result['projectId'] = subscription_ref.projectsId
  result['subscriptionId'] = subscription_ref.subscriptionsId
  return result


def ListSnapshotDisplayDict(snapshot):
  """Returns a snapshot dict with additional fields."""
  result = resource_projector.MakeSerializable(snapshot)
  snapshot_ref = ParseSnapshot(snapshot.name)
  result['projectId'] = snapshot_ref.projectsId
  result['snapshotId'] = snapshot_ref.snapshotsId
  topic_ref = ParseTopic(snapshot.topic)
  result['topicId'] = topic_ref.topicsId
  result['expireTime'] = snapshot.expireTime
  return result


def GetProject():
  """Returns the value of the core/project config property.

  Config properties can be overridden with command line flags. If the --project
  flag was provided, this will return the value provided with the flag.
  """
  return properties.VALUES.core.project.Get(required=True)


def ParseSchemaName(schema):
  """Parses a schema name using configuration properties for fallback.

  Args:
    schema: str, the schema's ID, fully-qualified URL, or relative name

  Returns:
    str: the relative name of the schema resource
  """
  return resources.REGISTRY.Parse(
      schema, params={
          'projectsId': GetProject
      }, collection='pubsub.projects.schemas').RelativeName()


def OutputSchemaValidated(unused_response, unused_args):
  """Logs a message indicating that a schema is valid."""
  log.status.Print('Schema is valid.')


def OutputMessageValidated(unused_response, unused_args):
  """Logs a message indicating that a message is valid."""
  log.status.Print('Message is valid.')


def ParseExactlyOnceAckIdsAndFailureReasons(ack_ids_and_failure_reasons,
                                            ack_ids):
  failed_ack_ids = [ack['AckId'] for ack in ack_ids_and_failure_reasons]
  successfully_processed_ack_ids = [
      ack_id for ack_id in ack_ids if ack_id not in failed_ack_ids
  ]
  return failed_ack_ids, successfully_processed_ack_ids


def HandleExactlyOnceDeliveryError(error):
  e = exc.HttpException(error)
  ack_ids_and_failure_reasons = ParseExactlyOnceErrorInfo(e.payload.details)
  # If the failure doesn't have more information (specifically for exactly
  # once related failures), re-raise the exception.
  if not ack_ids_and_failure_reasons:
    raise error

  return ack_ids_and_failure_reasons


def ParseExactlyOnceErrorInfo(error_metadata):
  """Parses error metadata for exactly once ack/modAck failures.

  Args:
    error_metadata: error metadata as dict of format ack_id -> failure_reason.

  Returns:
    list: error metadata with only exactly once failures.
  """
  ack_ids_and_failure_reasons = []

  for error_md in error_metadata:
    if 'reason' not in error_md or 'EXACTLY_ONCE' not in error_md['reason']:
      continue
    if 'metadata' not in error_md or not isinstance(error_md['metadata'], dict):
      continue

    for ack_id, failure_reason in error_md['metadata'].items():
      if 'PERMANENT_FAILURE' in failure_reason or ('TEMPORARY_FAILURE'
                                                   in failure_reason):
        result = resource_projector.MakeSerializable({})
        result['AckId'] = ack_id
        result['FailureReason'] = failure_reason
        ack_ids_and_failure_reasons.append(result)

  return ack_ids_and_failure_reasons