HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/surface/pubsub/subscriptions/create.py
# -*- coding: utf-8 -*- #
# Copyright 2015 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Cloud Pub/Sub subscriptions create command."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

from apitools.base.py import exceptions as api_ex
from googlecloudsdk.api_lib.pubsub import subscriptions
from googlecloudsdk.api_lib.util import exceptions
from googlecloudsdk.calliope import base
from googlecloudsdk.command_lib.pubsub import flags
from googlecloudsdk.command_lib.pubsub import resource_args
from googlecloudsdk.command_lib.pubsub import util
from googlecloudsdk.command_lib.util.args import labels_util
from googlecloudsdk.core import log
from googlecloudsdk.core import properties


def _Run(
    args,
    enable_labels=False,
    legacy_output=False,
    enable_push_to_cps=False,
):
  """Creates one or more subscriptions."""
  flags.ValidateDeadLetterPolicy(args)

  client = subscriptions.SubscriptionsClient()

  topic_ref = args.CONCEPTS.topic.Parse()
  push_config = util.ParsePushConfig(args)
  enable_message_ordering = getattr(args, 'enable_message_ordering', None)
  filter_string = getattr(args, 'message_filter', None)
  dead_letter_topic = getattr(args, 'dead_letter_topic', None)
  max_delivery_attempts = getattr(args, 'max_delivery_attempts', None)
  retain_acked_messages = getattr(args, 'retain_acked_messages', None)
  retention_duration = getattr(args, 'message_retention_duration', None)
  enable_exactly_once_delivery = getattr(
      args, 'enable_exactly_once_delivery', None
  )
  if args.IsSpecified('message_retention_duration'):
    retention_duration = util.FormatDuration(retention_duration)
  min_retry_delay = getattr(args, 'min_retry_delay', None)
  if args.IsSpecified('min_retry_delay'):
    min_retry_delay = util.FormatDuration(min_retry_delay)
  max_retry_delay = getattr(args, 'max_retry_delay', None)
  if args.IsSpecified('max_retry_delay'):
    max_retry_delay = util.FormatDuration(max_retry_delay)
  bigquery_table = getattr(args, 'bigquery_table', None)
  use_topic_schema = getattr(args, 'use_topic_schema', None)
  use_table_schema = getattr(args, 'use_table_schema', None)
  write_metadata = getattr(args, 'write_metadata', None)
  drop_unknown_fields = getattr(args, 'drop_unknown_fields', None)
  bigquery_service_account_email = getattr(
      args, 'bigquery_service_account_email', None
  )
  cloud_storage_bucket = getattr(args, 'cloud_storage_bucket', None)
  cloud_storage_file_prefix = getattr(args, 'cloud_storage_file_prefix', None)
  cloud_storage_file_suffix = getattr(args, 'cloud_storage_file_suffix', None)
  cloud_storage_file_datetime_format = getattr(
      args, 'cloud_storage_file_datetime_format', None
  )
  cloud_storage_max_bytes = getattr(args, 'cloud_storage_max_bytes', None)
  cloud_storage_max_duration = getattr(args, 'cloud_storage_max_duration', None)
  cloud_storage_max_messages = getattr(args, 'cloud_storage_max_messages', None)
  if args.IsSpecified('cloud_storage_max_duration'):
    cloud_storage_max_duration = util.FormatDuration(cloud_storage_max_duration)
  cloud_storage_output_format_list = getattr(
      args, 'cloud_storage_output_format', None
  )
  cloud_storage_output_format = None
  if cloud_storage_output_format_list:
    cloud_storage_output_format = cloud_storage_output_format_list[0]
  cloud_storage_use_topic_schema = getattr(
      args, 'cloud_storage_use_topic_schema', None
  )
  cloud_storage_write_metadata = getattr(
      args, 'cloud_storage_write_metadata', None
  )
  cloud_storage_service_account_email = getattr(
      args, 'cloud_storage_service_account_email', None
  )
  pubsub_export_topic = (
      getattr(args, 'pubsub_export_topic', None) if enable_push_to_cps else None
  )
  pubsub_export_topic_region = getattr(args, 'pubsub_export_topic_region', None)
  message_transforms_file = getattr(args, 'message_transforms_file', None)

  no_expiration = False
  expiration_period = getattr(args, 'expiration_period', None)
  if args.IsSpecified('expiration_period'):
    if expiration_period == subscriptions.NEVER_EXPIRATION_PERIOD_VALUE:
      no_expiration = True
      expiration_period = None

  if dead_letter_topic:
    dead_letter_topic = args.CONCEPTS.dead_letter_topic.Parse().RelativeName()

  if pubsub_export_topic:
    pubsub_export_topic = (
        args.CONCEPTS.pubsub_export_topic.Parse().RelativeName()
    )

  labels = None
  if enable_labels:
    labels = labels_util.ParseCreateArgs(
        args, client.messages.Subscription.LabelsValue
    )

  tags = flags.GetTagsMessage(args, client.messages.Subscription.TagsValue)

  failed = []
  for subscription_ref in args.CONCEPTS.subscription.Parse():
    try:
      result = client.Create(
          subscription_ref,
          topic_ref,
          args.ack_deadline,
          push_config,
          retain_acked_messages,
          retention_duration,
          labels=labels,
          no_expiration=no_expiration,
          expiration_period=expiration_period,
          enable_message_ordering=enable_message_ordering,
          filter_string=filter_string,
          dead_letter_topic=dead_letter_topic,
          max_delivery_attempts=max_delivery_attempts,
          min_retry_delay=min_retry_delay,
          max_retry_delay=max_retry_delay,
          enable_exactly_once_delivery=enable_exactly_once_delivery,
          bigquery_table=bigquery_table,
          use_topic_schema=use_topic_schema,
          use_table_schema=use_table_schema,
          write_metadata=write_metadata,
          drop_unknown_fields=drop_unknown_fields,
          bigquery_service_account_email=bigquery_service_account_email,
          cloud_storage_bucket=cloud_storage_bucket,
          cloud_storage_file_prefix=cloud_storage_file_prefix,
          cloud_storage_file_suffix=cloud_storage_file_suffix,
          cloud_storage_file_datetime_format=cloud_storage_file_datetime_format,
          cloud_storage_max_bytes=cloud_storage_max_bytes,
          cloud_storage_max_duration=cloud_storage_max_duration,
          cloud_storage_max_messages=cloud_storage_max_messages,
          cloud_storage_output_format=cloud_storage_output_format,
          cloud_storage_use_topic_schema=cloud_storage_use_topic_schema,
          cloud_storage_write_metadata=cloud_storage_write_metadata,
          cloud_storage_service_account_email=cloud_storage_service_account_email,
          pubsub_export_topic=pubsub_export_topic,
          pubsub_export_topic_region=pubsub_export_topic_region,
          message_transforms_file=message_transforms_file,
          tags=tags,
      )
    except api_ex.HttpError as error:
      exc = exceptions.HttpException(error)
      log.CreatedResource(
          subscription_ref.RelativeName(),
          kind='subscription',
          failed=util.CreateFailureErrorMessage(exc.payload.status_message),
      )
      failed.append(subscription_ref.subscriptionsId)
      continue

    if legacy_output:
      result = util.SubscriptionDisplayDict(result)

    log.CreatedResource(subscription_ref.RelativeName(), kind='subscription')
    yield result

  if failed:
    raise util.RequestsFailedError(failed, 'create')


@base.UniverseCompatible
@base.ReleaseTracks(base.ReleaseTrack.GA)
class Create(base.CreateCommand):
  """Creates one or more Cloud Pub/Sub subscriptions."""

  detailed_help = {'DESCRIPTION': """\
          Creates one or more Cloud Pub/Sub subscriptions for a given topic.
          The new subscription defaults to a PULL subscription unless a push
          endpoint is specified."""}

  @classmethod
  def Args(cls, parser):
    topic_help_text = (
        'from which this subscription is receiving messages. '
        'Each subscription is attached to a single topic.'
    )
    topic = resource_args.CreateTopicResourceArg(
        topic_help_text, positional=False
    )
    subscription = resource_args.CreateSubscriptionResourceArg(
        'to create.', plural=True
    )
    resource_args.AddResourceArgs(parser, [topic, subscription])
    flags.AddSubscriptionSettingsFlags(parser)
    labels_util.AddCreateLabelsFlags(parser)
    flags.AddMessageTransformsFlags(parser)

  def Run(self, args):
    flags.ValidateFilterString(args)
    return _Run(args, enable_labels=True)


@base.ReleaseTracks(base.ReleaseTrack.BETA)
class CreateBeta(Create):
  """Creates one or more Cloud Pub/Sub subscriptions."""

  @classmethod
  def Args(cls, parser):
    topic_help_text = (
        'from which this subscription is receiving messages. '
        'Each subscription is attached to a single topic.'
    )
    topic = resource_args.CreateTopicResourceArg(
        topic_help_text, positional=False
    )
    subscription = resource_args.CreateSubscriptionResourceArg(
        'to create.', plural=True
    )
    resource_args.AddResourceArgs(parser, [topic, subscription])
    flags.AddSubscriptionSettingsFlags(
        parser,
        enable_push_to_cps=True,
    )
    labels_util.AddCreateLabelsFlags(parser)
    flags.AddMessageTransformsFlags(parser)

  def Run(self, args):
    flags.ValidateFilterString(args)
    legacy_output = properties.VALUES.pubsub.legacy_output.GetBool()
    return _Run(
        args,
        enable_labels=True,
        legacy_output=legacy_output,
        enable_push_to_cps=True,
    )


@base.ReleaseTracks(base.ReleaseTrack.ALPHA)
class CreateAlpha(CreateBeta):
  """Creates one or more Cloud Pub/Sub subscriptions."""

  @classmethod
  def Args(cls, parser):
    super(CreateAlpha, cls).Args(parser)
    flags.AddTagsFlag(parser)