HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/396/platform/gsutil/gslib/tests/test_notification_pubsub.py
# -*- coding: utf-8 -*-
# Copyright 2017 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Integration tests for notification command."""

from __future__ import absolute_import
from __future__ import print_function
from __future__ import division
from __future__ import unicode_literals

import logging
import unittest

from gslib.cs_api_map import ApiSelector
from gslib.project_id import PopulateProjectId
from gslib.pubsub_api import PubsubApi
import gslib.tests.testcase as testcase
from gslib.tests.testcase.integration_testcase import SkipForS3
from gslib.tests.util import ObjectToURI as suri


@SkipForS3('gsutil doesn\'t support S3 notifications')
class TestNotificationPubSub(testcase.GsUtilIntegrationTestCase):
  """Integration tests for notification command (the Cloud Pub/Sub parts)."""

  def setUp(self):
    super(TestNotificationPubSub, self).setUp()
    self.pubsub_api = PubsubApi(logger=logging.getLogger())
    self.created_topic = None

  def tearDown(self):
    super(TestNotificationPubSub, self).tearDown()
    # Cleanup any created topics.
    if self.created_topic:
      self.pubsub_api.DeleteTopic(self.created_topic)
      self.created_topic = None

  def _RegisterDefaultTopicCreation(self, bucket_name):
    """Records the name of a topic we expect to create, for cleanup."""
    if self.test_api == ApiSelector.XML:
      return unittest.skip('Notifications only work with the JSON API.')

    expected_topic_name = 'projects/%s/topics/%s' % (PopulateProjectId(None),
                                                     bucket_name)
    self.created_topic = expected_topic_name
    return expected_topic_name

  def test_list_new_bucket(self):
    """Tests listing notification configs on a new bucket."""
    if self.test_api == ApiSelector.XML:
      return unittest.skip('Notifications only work with the JSON API.')

    bucket_uri = self.CreateBucket()
    stdout = self.RunGsUtil(
        ['notification', 'list', suri(bucket_uri)], return_stdout=True)
    self.assertFalse(stdout)

  def test_delete_with_no_notifications(self):
    """Tests deleting all notification configs when there are none."""
    if self.test_api == ApiSelector.XML:
      return unittest.skip('Notifications only work with the JSON API.')

    bucket_uri = self.CreateBucket()
    stdout = self.RunGsUtil(
        ['notification', 'delete', suri(bucket_uri)], return_stdout=True)
    self.assertFalse(stdout)

  def test_create_basic(self):
    """Tests the create command succeeds in normal circumstances."""
    if self.test_api == ApiSelector.XML:
      return unittest.skip('Notifications only work with the JSON API.')

    bucket_uri = self.CreateBucket()
    topic_name = self._RegisterDefaultTopicCreation(bucket_uri.bucket_name)

    stdout, stderr = self.RunGsUtil(
        ['notification', 'create', '-f', 'json',
         suri(bucket_uri)],
        return_stdout=True,
        return_stderr=True)

    if self._use_gcloud_storage:
      self.assertIn('kind: storage#notification', stdout)
      self.assertIn(topic_name, stdout)
    else:
      self.assertIn('Created notification', stderr)
      self.assertIn(topic_name, stderr)

  def test_list_one_entry(self):
    """Tests notification config listing with one entry."""
    if self.test_api == ApiSelector.XML:
      return unittest.skip('Notifications only work with the JSON API.')

    bucket_uri = self.CreateBucket()
    bucket_name = bucket_uri.bucket_name
    topic_name = self._RegisterDefaultTopicCreation(bucket_uri.bucket_name)

    self.RunGsUtil([
        'notification', 'create', '-f', 'json', '-e', 'OBJECT_FINALIZE', '-e',
        'OBJECT_DELETE', '-m', 'someKey:someValue', '-p', 'somePrefix',
        suri(bucket_uri)
    ],
                   return_stderr=True)
    stdout = self.RunGsUtil(
        ['notification', 'list', suri(bucket_uri)], return_stdout=True)

    if self._use_gcloud_storage:
      # gcloud and gsutil look the same in the terminal, but, for some reason,
      # the shim test script detects an extra newline.
      trailing_space = '\n'
    else:
      trailing_space = ''
    self.assertEqual(
        stdout,
        ('projects/_/buckets/{bucket_name}/notificationConfigs/1\n'
         '\tCloud Pub/Sub topic: {topic_name}\n'
         '\tCustom attributes:\n'
         '\t\tsomeKey: someValue\n'
         '\tFilters:\n'
         '\t\tEvent Types: OBJECT_FINALIZE, OBJECT_DELETE\n'
         '\t\tObject name prefix: \'somePrefix\'\n{trailing_space}'.format(
             bucket_name=bucket_name,
             topic_name=topic_name,
             trailing_space=trailing_space)))

  def test_delete(self):
    """Tests the create command succeeds in normal circumstances."""
    if self.test_api == ApiSelector.XML:
      return unittest.skip('Notifications only work with the JSON API.')

    bucket_uri = self.CreateBucket()
    self._RegisterDefaultTopicCreation(bucket_uri.bucket_name)
    self.RunGsUtil(['notification', 'create', '-f', 'json', suri(bucket_uri)])
    self.RunGsUtil(['notification', 'delete', suri(bucket_uri)])