File: //proc/thread-self/root/snap/google-cloud-cli/396/lib/googlecloudsdk/command_lib/pubsub/util.py
# -*- coding: utf-8 -*- #
# Copyright 2015 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""A library that is used to support Cloud Pub/Sub commands."""
from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals
from googlecloudsdk.api_lib.pubsub import subscriptions
from googlecloudsdk.api_lib.pubsub import topics
from googlecloudsdk.api_lib.util import exceptions as exc
from googlecloudsdk.command_lib.projects import util as projects_util
from googlecloudsdk.core import exceptions
from googlecloudsdk.core import log
from googlecloudsdk.core import properties
from googlecloudsdk.core import resources
from googlecloudsdk.core.resource import resource_projector
from googlecloudsdk.core.util import times
import six
# Format for the seek time argument.
SEEK_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'
# Collection for various subcommands.
TOPICS_COLLECTION = 'pubsub.projects.topics'
TOPICS_PUBLISH_COLLECTION = 'pubsub.topics.publish'
SNAPSHOTS_COLLECTION = 'pubsub.projects.snapshots'
SNAPSHOTS_LIST_COLLECTION = 'pubsub.snapshots.list'
SUBSCRIPTIONS_COLLECTION = 'pubsub.projects.subscriptions'
SUBSCRIPTIONS_ACK_COLLECTION = 'pubsub.subscriptions.ack'
SUBSCRIPTIONS_LIST_COLLECTION = 'pubsub.subscriptions.list'
SUBSCRIPTIONS_MOD_ACK_COLLECTION = 'pubsub.subscriptions.mod_ack'
SUBSCRIPTIONS_MOD_CONFIG_COLLECTION = 'pubsub.subscriptions.mod_config'
SUBSCRIPTIONS_PULL_COLLECTION = 'pubsub.subscriptions.pull'
SUBSCRIPTIONS_SEEK_COLLECTION = 'pubsub.subscriptions.seek'
SCHEMAS_COLLECTION = 'pubsub.projects.schemas'
PUSH_AUTH_SERVICE_ACCOUNT_MISSING_ENDPOINT_WARNING = """\
Using --push-auth-service-account requires specifying --push-endpoint. This
command will continue to run while ignoring --push-auth-service-account, but
will fail in a future version. To correct a subscription configuration, run:
  $ gcloud pubsub subscriptions update SUBSCRIPTION \\
      --push-endpoint=PUSH_ENDPOINT \\
      --push-auth-service-account={SERVICE_ACCOUNT_EMAIL} [...]
"""
PUSH_AUTH_TOKEN_AUDIENCE_MISSING_REQUIRED_FLAGS_WARNING = """\
Using --push-auth-token-audience requires specifying both --push-endpoint and
--push-auth-service-account. This command will continue to run while ignoring
--push-auth-token-audience, but will fail in a future version. To correct a
subscription configuration, run:
  $ gcloud pubsub subscriptions update SUBSCRIPTION \\
      --push-endpoint={PUSH_ENDPOINT} \\
      --push-auth-service-account={SERVICE_ACCOUNT_EMAIL} \\
      --push-auth-token-audience={OPTIONAL_AUDIENCE_OVERRIDE} [...]
"""
class InvalidArgumentError(exceptions.Error):
  """The user provides invalid arguments."""
class RequestsFailedError(exceptions.Error):
  """Indicates that some requests to the API have failed."""
  def __init__(self, requests, action):
    super(RequestsFailedError, self).__init__(
        'Failed to {action} the following: [{requests}].'.format(
            action=action, requests=','.join(requests)))
def CreateFailureErrorMessage(
    original_message, default_message='Internal Error'
):
  return original_message if original_message else default_message
def ParseSnapshot(snapshot_name, project_id=''):
  project_id = _GetProject(project_id)
  return resources.REGISTRY.Parse(snapshot_name,
                                  params={'projectsId': project_id},
                                  collection=SNAPSHOTS_COLLECTION)
def ParseSubscription(subscription_name, project_id=''):
  project_id = _GetProject(project_id)
  return resources.REGISTRY.Parse(subscription_name,
                                  params={'projectsId': project_id},
                                  collection=SUBSCRIPTIONS_COLLECTION)
def ParseTopic(topic_name, project_id=''):
  project_id = _GetProject(project_id)
  return resources.REGISTRY.Parse(topic_name,
                                  params={'projectsId': project_id},
                                  collection=TOPICS_COLLECTION)
def ParseProject(project_id=None):
  project_id = _GetProject(project_id)
  return projects_util.ParseProject(project_id)
def _GetProject(project_id):
  return project_id or properties.VALUES.core.project.Get(required=True)
def SnapshotUriFunc(snapshot):
  if isinstance(snapshot, dict):
    name = snapshot['name']
  else:
    name = snapshot
  return ParseSnapshot(name).SelfLink()
def SubscriptionUriFunc(subscription):
  project = None
  if isinstance(subscription, dict):
    name = subscription['subscriptionId']
    project = subscription['projectId']
  elif isinstance(subscription, str):
    name = subscription
  else:
    name = subscription.name
  return ParseSubscription(name, project).SelfLink()
def TopicUriFunc(topic):
  if isinstance(topic, dict):
    name = topic['topicId']
  else:
    name = topic.name
  return ParseTopic(name).SelfLink()
def ParsePushConfig(args, client=None):
  """Parses configs of push subscription from args."""
  push_endpoint = args.push_endpoint
  service_account_email = getattr(args, 'SERVICE_ACCOUNT_EMAIL', None)
  audience = getattr(args, 'OPTIONAL_AUDIENCE_OVERRIDE', None)
  # TODO(b/284985002): Remove warnings when argument groups are created for
  # authenticated push flags.
  if audience is not None and (
      push_endpoint is None or service_account_email is None
  ):
    log.warning(
        PUSH_AUTH_TOKEN_AUDIENCE_MISSING_REQUIRED_FLAGS_WARNING.format(
            PUSH_ENDPOINT=push_endpoint or 'PUSH_ENDPOINT',
            SERVICE_ACCOUNT_EMAIL=service_account_email
            or 'SERVICE_ACCOUNT_EMAIL',
            OPTIONAL_AUDIENCE_OVERRIDE=audience,
        )
    )
  elif service_account_email is not None and push_endpoint is None:
    log.warning(
        PUSH_AUTH_SERVICE_ACCOUNT_MISSING_ENDPOINT_WARNING.format(
            SERVICE_ACCOUNT_EMAIL=service_account_email
        )
    )
  if push_endpoint is None:
    if HasNoWrapper(args):
      raise InvalidArgumentError(
          'argument --push-no-wrapper: --push-endpoint must be specified.'
      )
    return None
  client = client or subscriptions.SubscriptionsClient()
  oidc_token = None
  # Only set oidc_token when service_account_email is set.
  if service_account_email is not None:
    oidc_token = client.messages.OidcToken(
        serviceAccountEmail=service_account_email, audience=audience)
  no_wrapper = None
  if HasNoWrapper(args):
    write_metadata = getattr(args, 'push_no_wrapper_write_metadata', False)
    no_wrapper = client.messages.NoWrapper(writeMetadata=write_metadata)
  return client.messages.PushConfig(
      pushEndpoint=push_endpoint, oidcToken=oidc_token, noWrapper=no_wrapper)
def HasNoWrapper(args):
  return getattr(args, 'push_no_wrapper', False)
def FormatSeekTime(time):
  return times.FormatDateTime(time, fmt=SEEK_TIME_FORMAT, tzinfo=times.UTC)
def FormatDuration(duration):
  """Formats a duration argument to be a string with units.
  Args:
    duration (int): The duration in seconds.
  Returns:
    unicode: The formatted duration.
  """
  return six.text_type(duration) + 's'
def ParseAttributes(attribute_dict, messages=None):
  """Parses attribute_dict into a list of AdditionalProperty messages.
  Args:
    attribute_dict (Optional[dict]): Dict containing key=value pairs
      to parse.
    messages (Optional[module]): Module containing pubsub proto messages.
  Returns:
    list: List of AdditionalProperty messages.
  """
  messages = messages or topics.GetMessagesModule()
  attributes = []
  if attribute_dict:
    for key, value in sorted(six.iteritems(attribute_dict)):
      attributes.append(
          messages.PubsubMessage.AttributesValue.AdditionalProperty(
              key=key,
              value=value))
  return attributes
# TODO(b/32276674): Remove the use of custom *DisplayDict's.
def TopicDisplayDict(topic):
  """Creates a serializable from a Cloud Pub/Sub Topic operation for display.
  Args:
    topic: (Cloud Pub/Sub Topic) Topic to be serialized.
  Returns:
    A serialized object representing a Cloud Pub/Sub Topic
    operation (create, delete).
  """
  topic_display_dict = resource_projector.MakeSerializable(topic)
  topic_display_dict['topicId'] = topic.name
  del topic_display_dict['name']
  return topic_display_dict
def SubscriptionDisplayDict(subscription):
  """Creates a serializable from a Cloud Pub/Sub Subscription op for display.
  Args:
    subscription: (Cloud Pub/Sub Subscription) Subscription to be serialized.
  Returns:
    A serialized object representing a Cloud Pub/Sub Subscription
    operation (create, delete, update).
  """
  push_endpoint = ''
  subscription_type = 'pull'
  if subscription.pushConfig:
    if subscription.pushConfig.pushEndpoint:
      push_endpoint = subscription.pushConfig.pushEndpoint
      subscription_type = 'push'
  return {
      'subscriptionId': subscription.name,
      'topic': subscription.topic,
      'type': subscription_type,
      'pushEndpoint': push_endpoint,
      'ackDeadlineSeconds': subscription.ackDeadlineSeconds,
      'retainAckedMessages': bool(subscription.retainAckedMessages),
      'messageRetentionDuration': subscription.messageRetentionDuration,
      'enableExactlyOnceDelivery': subscription.enableExactlyOnceDelivery,
  }
def SnapshotDisplayDict(snapshot):
  """Creates a serializable from a Cloud Pub/Sub Snapshot operation for display.
  Args:
    snapshot: (Cloud Pub/Sub Snapshot) Snapshot to be serialized.
  Returns:
    A serialized object representing a Cloud Pub/Sub Snapshot operation (create,
    delete).
  """
  return {
      'snapshotId': snapshot.name,
      'topic': snapshot.topic,
      'expireTime': snapshot.expireTime,
  }
def ListSubscriptionDisplayDict(subscription):
  """Returns a subscription dict with additional fields."""
  result = resource_projector.MakeSerializable(subscription)
  result['type'] = 'PUSH' if subscription.pushConfig.pushEndpoint else 'PULL'
  subscription_ref = ParseSubscription(subscription.name)
  result['projectId'] = subscription_ref.projectsId
  result['subscriptionId'] = subscription_ref.subscriptionsId
  topic_info = ParseTopic(subscription.topic)
  result['topicId'] = topic_info.topicsId
  return result
def ListTopicDisplayDict(topic):
  topic_dict = resource_projector.MakeSerializable(topic)
  topic_ref = ParseTopic(topic.name)
  topic_dict['topic'] = topic.name
  topic_dict['topicId'] = topic_ref.topicsId
  del topic_dict['name']
  return topic_dict
def ListTopicSubscriptionDisplayDict(topic_subscription):
  """Returns a topic_subscription dict with additional fields."""
  result = resource_projector.MakeSerializable(
      {'subscription': topic_subscription})
  subscription_ref = ParseSubscription(topic_subscription)
  result['projectId'] = subscription_ref.projectsId
  result['subscriptionId'] = subscription_ref.subscriptionsId
  return result
def ListSnapshotDisplayDict(snapshot):
  """Returns a snapshot dict with additional fields."""
  result = resource_projector.MakeSerializable(snapshot)
  snapshot_ref = ParseSnapshot(snapshot.name)
  result['projectId'] = snapshot_ref.projectsId
  result['snapshotId'] = snapshot_ref.snapshotsId
  topic_ref = ParseTopic(snapshot.topic)
  result['topicId'] = topic_ref.topicsId
  result['expireTime'] = snapshot.expireTime
  return result
def GetProject():
  """Returns the value of the core/project config property.
  Config properties can be overridden with command line flags. If the --project
  flag was provided, this will return the value provided with the flag.
  """
  return properties.VALUES.core.project.Get(required=True)
def ParseSchemaName(schema):
  """Parses a schema name using configuration properties for fallback.
  Args:
    schema: str, the schema's ID, fully-qualified URL, or relative name
  Returns:
    str: the relative name of the schema resource
  """
  return resources.REGISTRY.Parse(
      schema, params={
          'projectsId': GetProject
      }, collection='pubsub.projects.schemas').RelativeName()
def OutputSchemaValidated(unused_response, unused_args):
  """Logs a message indicating that a schema is valid."""
  log.status.Print('Schema is valid.')
def OutputMessageValidated(unused_response, unused_args):
  """Logs a message indicating that a message is valid."""
  log.status.Print('Message is valid.')
def ParseExactlyOnceAckIdsAndFailureReasons(ack_ids_and_failure_reasons,
                                            ack_ids):
  failed_ack_ids = [ack['AckId'] for ack in ack_ids_and_failure_reasons]
  successfully_processed_ack_ids = [
      ack_id for ack_id in ack_ids if ack_id not in failed_ack_ids
  ]
  return failed_ack_ids, successfully_processed_ack_ids
def HandleExactlyOnceDeliveryError(error):
  e = exc.HttpException(error)
  ack_ids_and_failure_reasons = ParseExactlyOnceErrorInfo(e.payload.details)
  # If the failure doesn't have more information (specifically for exactly
  # once related failures), re-raise the exception.
  if not ack_ids_and_failure_reasons:
    raise error
  return ack_ids_and_failure_reasons
def ParseExactlyOnceErrorInfo(error_metadata):
  """Parses error metadata for exactly once ack/modAck failures.
  Args:
    error_metadata: error metadata as dict of format ack_id -> failure_reason.
  Returns:
    list: error metadata with only exactly once failures.
  """
  ack_ids_and_failure_reasons = []
  for error_md in error_metadata:
    if 'reason' not in error_md or 'EXACTLY_ONCE' not in error_md['reason']:
      continue
    if 'metadata' not in error_md or not isinstance(error_md['metadata'], dict):
      continue
    for ack_id, failure_reason in error_md['metadata'].items():
      if 'PERMANENT_FAILURE' in failure_reason or ('TEMPORARY_FAILURE'
                                                   in failure_reason):
        result = resource_projector.MakeSerializable({})
        result['AckId'] = ack_id
        result['FailureReason'] = failure_reason
        ack_ids_and_failure_reasons.append(result)
  return ack_ids_and_failure_reasons