HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/surface/storage/buckets/notifications/create.py
# -*- coding: utf-8 -*- #
# Copyright 2022 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Implementation of create command for notifications."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import functools
import time

from apitools.base.py import exceptions as apitools_exceptions

from googlecloudsdk.api_lib.storage import api_factory
from googlecloudsdk.api_lib.storage import cloud_api
from googlecloudsdk.api_lib.storage import errors as api_errors
from googlecloudsdk.api_lib.storage.gcs_json import error_util
from googlecloudsdk.api_lib.util import apis
from googlecloudsdk.calliope import arg_parsers
from googlecloudsdk.calliope import base
from googlecloudsdk.command_lib.storage import notification_configuration_iterator
from googlecloudsdk.command_lib.storage import storage_url
from googlecloudsdk.core import log
from googlecloudsdk.core import properties


@error_util.catch_http_error_raise_gcs_api_error()
def _maybe_create_or_modify_topic(topic_name, service_account_email):
  """Ensures that topic with SA permissions exists, creating it if needed.

  Args:
    topic_name (str): Name of the Cloud Pub/Sub topic to use or create.
    service_account_email (str): The project service account for Google Cloud
      Storage. This SA needs publish permission on the PubSub topic.

  Returns:
    True if topic was created or had its IAM permissions modified.
    Otherwise, False.
  """
  pubsub_client = apis.GetClientInstance('pubsub', 'v1')
  pubsub_messages = apis.GetMessagesModule('pubsub', 'v1')

  try:
    pubsub_client.projects_topics.Get(
        pubsub_messages.PubsubProjectsTopicsGetRequest(topic=topic_name))
    log.warning('Topic already exists: ' + topic_name)
    created_new_topic = False
  except apitools_exceptions.HttpError as e:
    if e.status_code != 404:
      # Expect an Apitools NotFound error. Raise error otherwise.
      raise
    new_topic = pubsub_client.projects_topics.Create(
        pubsub_messages.Topic(name=topic_name))
    log.info('Created topic:\n{}'.format(new_topic))
    created_new_topic = True

  # Verify that the service account is in the IAM policy.
  topic_iam_policy = pubsub_client.projects_topics.GetIamPolicy(
      pubsub_messages.PubsubProjectsTopicsGetIamPolicyRequest(
          resource=topic_name))
  expected_binding = pubsub_messages.Binding(
      role='roles/pubsub.publisher',
      members=['serviceAccount:' + service_account_email])

  # Can be improved by checking for roles stronger than "pubsub.publisher".
  # We could also recurse up the hierarchy, checking project-level permissions.
  # However, the caller may not have permission to perform this recursion.
  # The trade-off of complexity for the benefit of not granting a redundant,
  # permission is not worth it, so we grant "publisher" if a simple check fails.
  if expected_binding not in topic_iam_policy.bindings:
    topic_iam_policy.bindings.append(expected_binding)
    updated_topic_iam_policy = pubsub_client.projects_topics.SetIamPolicy(
        pubsub_messages.PubsubProjectsTopicsSetIamPolicyRequest(
            resource=topic_name,
            setIamPolicyRequest=pubsub_messages.SetIamPolicyRequest(
                policy=topic_iam_policy)))
    log.info('Updated topic IAM policy:\n{}'.format(updated_topic_iam_policy))
    return True
  else:
    log.warning(
        'Project service account {} already has publish permission for topic {}'
        .format(service_account_email, topic_name))
  return created_new_topic


@base.UniverseCompatible
class Create(base.Command):
  """Create a notification configuration on a bucket."""

  detailed_help = {
      'DESCRIPTION':
          """
      *{command}* creates a notification configuration on a bucket,
      establishing a flow of event notifications from Cloud Storage to a
      Cloud Pub/Sub topic. As part of creating this flow, it also verifies
      that the destination Cloud Pub/Sub topic exists, creating it if necessary,
      and verifies that the Cloud Storage bucket has permission to publish
      events to that topic, granting the permission if necessary.

      If a destination Cloud Pub/Sub topic is not specified with the `-t` flag,
      Cloud Storage chooses a topic name in the default project whose ID is
      the same as the bucket name. For example, if the default project ID
      specified is `default-project` and the bucket being configured is
      `gs://example-bucket`, the create command uses the Cloud Pub/Sub topic
      `projects/default-project/topics/example-bucket`.

      In order to enable notifications, your project's
      [Cloud Storage service agent](https://cloud.google.com/storage/docs/projects#service-accounts)
      must have the IAM permission "pubsub.topics.publish".
      This command checks to see if the destination Cloud Pub/Sub topic grants
      the service agent this permission. If not, the create command attempts to
      grant it.

      A bucket can have up to 100 total notification configurations and up to
      10 notification configurations set to trigger for a specific event.
      """,
      'EXAMPLES':
          """
      Send notifications of all changes to the bucket
      `example-bucket` to the Cloud Pub/Sub topic
      `projects/default-project/topics/example-bucket`:

        $ {command} gs://example-bucket

      The same as the above but sends no notification payload:

        $ {command} --payload-format=none gs://example-bucket

      Include custom metadata in notification payloads:

        $ {command} --custom-attributes=key1:value1,key2:value2 gs://example-bucket

      Create a notification configuration that only sends an event when a new
      object has been created or an object is deleted:

        $ {command} --event-types=OBJECT_FINALIZE,OBJECT_DELETE gs://example-bucket

      Create a topic and notification configuration that sends events only when
      they affect objects with the prefix `photos/`:

        $ {command} --object-prefix=photos/ gs://example-bucket

      Specifies the destination topic ID `files-to-process` in the default
      project:

        $ {command} --topic=files-to-process gs://example-bucket

      The same as above but specifies a Cloud Pub/Sub topic belonging
      to the specific cloud project `example-project`:

        $ {command} --topic=projects/example-project/topics/files-to-process gs://example-bucket

      Skip creating a topic when creating the notification configuraiton:

        $ {command} --skip-topic-setup gs://example-bucket
      """,
  }

  @staticmethod
  def Args(parser):
    parser.add_argument(
        'url',
        help='URL of the bucket to create the notification configuration'
        ' on.')
    parser.add_argument(
        '-m',
        '--custom-attributes',
        metavar='KEY=VALUE',
        type=arg_parsers.ArgDict(),
        help='Specifies key:value attributes that are appended to the set of'
        ' attributes sent to Cloud Pub/Sub for all events associated with'
        ' this notification configuration.')
    parser.add_argument(
        '-e',
        '--event-types',
        metavar='NOTIFICATION_EVENT_TYPE',
        type=arg_parsers.ArgList(
            choices=sorted(
                [status.value for status in cloud_api.NotificationEventType])),
        help=(
            'Specify event type filters for this notification configuration.'
            ' Cloud Storage will send notifications of only these types. By'
            ' default, Cloud Storage sends notifications for all event types.'
            ' * OBJECT_FINALIZE: An object has been created.'
            ' * OBJECT_METADATA_UPDATE: The metadata of an object has changed.'
            ' * OBJECT_DELETE: An object has been permanently deleted.'
            ' * OBJECT_ARCHIVE: A live version of an object has become a'
            ' noncurrent version.'))
    parser.add_argument(
        '-p',
        '--object-prefix',
        help='Specifies a prefix path for this notification configuration.'
        ' Cloud Storage will send notifications for only objects in the'
        ' bucket whose names begin with the prefix.')
    parser.add_argument(
        '-f',
        '--payload-format',
        choices=sorted(
            [status.value for status in cloud_api.NotificationPayloadFormat]),
        default=cloud_api.NotificationPayloadFormat.JSON.value,
        help='Specifies the payload format of notification messages.'
        ' Notification details are available in the message attributes.'
        " 'none' sends no payload.")
    parser.add_argument(
        '-s',
        '--skip-topic-setup',
        action='store_true',
        help='Skips creation and permission assignment of the Cloud Pub/Sub'
        ' topic. This is useful if the caller does not have permission to'
        ' access the topic in question, or if the topic already exists and has'
        ' the appropriate publish permission assigned.')
    parser.add_argument(
        '-t',
        '--topic',
        help='Specifies the Cloud Pub/Sub topic to send notifications to.'
        ' If not specified, this command chooses a topic whose project is'
        ' your default project and whose ID is the same as the'
        ' Cloud Storage bucket name.')

  def Run(self, args):
    project_id = properties.VALUES.core.project.GetOrFail()
    url = storage_url.storage_url_from_string(args.url)
    notification_configuration_iterator.raise_error_if_not_gcs_bucket_matching_url(
        url)
    if not args.topic:
      topic_name = 'projects/{}/topics/{}'.format(project_id, url.bucket_name)
    elif not args.topic.startswith('projects/'):
      # A topic ID may be present but not a whole path. Use the default project.
      topic_name = 'projects/{}/topics/{}'.format(
          project_id,
          args.topic.rpartition('/')[-1])
    else:
      topic_name = args.topic

    # Notifications supported for only GCS.
    gcs_client = api_factory.get_api(storage_url.ProviderPrefix.GCS)
    if not args.skip_topic_setup:
      # Using generated topic name instead of custom one.
      # Project number is different than project ID.
      bucket_project_number = gcs_client.get_bucket(
          url.bucket_name).metadata.projectNumber
      # Fetch the email of the service account that will need access to
      # the new pubsub topic.
      service_account_email = gcs_client.get_service_agent(
          project_number=bucket_project_number)
      log.info(
          'Checking for topic {} with access for project {} service account {}.'
          .format(topic_name, project_id, service_account_email))
      created_new_topic_or_set_new_permissions = _maybe_create_or_modify_topic(
          topic_name, service_account_email)
    else:
      created_new_topic_or_set_new_permissions = False

    if args.event_types:
      event_types = [
          cloud_api.NotificationEventType(event_type)
          for event_type in args.event_types
      ]
    else:
      event_types = None
    create_notification_configuration = functools.partial(
        gcs_client.create_notification_configuration,
        url,
        topic_name,
        custom_attributes=args.custom_attributes,
        event_types=event_types,
        object_name_prefix=args.object_prefix,
        payload_format=cloud_api.NotificationPayloadFormat(args.payload_format))
    try:
      return create_notification_configuration()
    except api_errors.CloudApiError:
      if not created_new_topic_or_set_new_permissions:
        raise
      log.warning(
          'Retrying create notification request because topic changes may'
          ' take up to 10 seconds to process.')
      time.sleep(10)
      return create_notification_configuration()