HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/396/lib/googlecloudsdk/api_lib/functions/v1/triggers.py
# -*- coding: utf-8 -*- #
# Copyright 2021 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""A library that is used to support Functions commands."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import itertools

import enum

UNADVERTISED_PROVIDER_LABEL = 'unadvertised'


class Resource(object):
  def __init__(self, name, collection_id):
    self.name = name
    self.collection_id = collection_id


@enum.unique
class Resources(enum.Enum):
  TOPIC = Resource('topic', 'pubsub.projects.topics')
  BUCKET = Resource('bucket', 'cloudfunctions.projects.buckets')
  FIREBASE_DB = Resource('firebase database', 'google.firebase.database.ref')
  FIRESTORE_DOC = Resource('firestore document', 'google.firestore.document')
  FIREBASE_ANALYTICS_EVENT = Resource(
      'firebase analytics', 'google.firebase.analytics.event'
  )
  PROJECT = Resource('project', 'cloudresourcemanager.projects')


class TriggerProvider(object):
  """Represents --trigger-provider flag value options."""

  def __init__(self, label, events):
    self.label = label
    self.events = events
    for event in self.events:
      # Used to access provider properties when listing event types
      event.provider = self

  @property
  def default_event(self):
    return self.events[0]


class TriggerEvent(object):
  """Represents --trigger-event flag value options."""

  # Currently only project resource is optional
  OPTIONAL_RESOURCE_TYPES = [Resources.PROJECT]

  def __init__(self, label, resource_type):
    self.label = label
    self.resource_type = resource_type

  @property
  def event_is_optional(self):
    return self.provider.default_event == self

  # TODO(b/33097692) Let TriggerEvent know how to handle optional resources.
  @property
  def resource_is_optional(self):
    return self.resource_type in TriggerEvent.OPTIONAL_RESOURCE_TYPES


# TODO (b/73062780): Event types should not be hard-coded.
# Don't use those structures directly. Use registry object instead.
# By convention, first event type is default.
_PROVIDERS = [
    TriggerProvider(
        'cloud.pubsub',
        [
            TriggerEvent('google.pubsub.topic.publish', Resources.TOPIC),
            TriggerEvent(
                'providers/cloud.pubsub/eventTypes/topic.publish',
                Resources.TOPIC,
            ),
        ],
    ),
    TriggerProvider(
        'cloud.storage',
        [
            TriggerEvent('google.storage.object.finalize', Resources.BUCKET),
            TriggerEvent(
                'providers/cloud.storage/eventTypes/object.change',
                Resources.BUCKET,
            ),
            TriggerEvent('google.storage.object.archive', Resources.BUCKET),
            TriggerEvent('google.storage.object.delete', Resources.BUCKET),
            TriggerEvent(
                'google.storage.object.metadataUpdate', Resources.BUCKET
            ),
        ],
    ),
    TriggerProvider(
        'google.firebase.database.ref',
        [
            TriggerEvent(
                'providers/google.firebase.database/eventTypes/ref.create',
                Resources.FIREBASE_DB,
            ),
            TriggerEvent(
                'providers/google.firebase.database/eventTypes/ref.update',
                Resources.FIREBASE_DB,
            ),
            TriggerEvent(
                'providers/google.firebase.database/eventTypes/ref.delete',
                Resources.FIREBASE_DB,
            ),
            TriggerEvent(
                'providers/google.firebase.database/eventTypes/ref.write',
                Resources.FIREBASE_DB,
            ),
        ],
    ),
    TriggerProvider(
        'google.firestore.document',
        [
            TriggerEvent(
                'providers/cloud.firestore/eventTypes/document.create',
                Resources.FIRESTORE_DOC,
            ),
            TriggerEvent(
                'providers/cloud.firestore/eventTypes/document.update',
                Resources.FIRESTORE_DOC,
            ),
            TriggerEvent(
                'providers/cloud.firestore/eventTypes/document.delete',
                Resources.FIRESTORE_DOC,
            ),
            TriggerEvent(
                'providers/cloud.firestore/eventTypes/document.write',
                Resources.FIRESTORE_DOC,
            ),
        ],
    ),
    TriggerProvider(
        'google.firebase.analytics.event',
        [
            TriggerEvent(
                'providers/google.firebase.analytics/eventTypes/event.log',
                Resources.FIREBASE_ANALYTICS_EVENT,
            ),
        ],
    ),
    TriggerProvider(
        'google.firebase.remoteConfig',
        [
            TriggerEvent(
                'google.firebase.remoteconfig.update', Resources.PROJECT
            ),
        ],
    ),
    TriggerProvider(
        'firebase.auth',
        [
            TriggerEvent(
                'providers/firebase.auth/eventTypes/user.create',
                Resources.PROJECT,
            ),
            TriggerEvent(
                'providers/firebase.auth/eventTypes/user.delete',
                Resources.PROJECT,
            ),
        ],
    ),
]


class _TriggerProviderRegistry(object):
  """This class encapsulates all Event Trigger related functionality."""

  def __init__(self, all_providers):
    self.providers = all_providers
    self._unadvertised_provider = TriggerProvider(
        UNADVERTISED_PROVIDER_LABEL, []
    )

  def ProvidersLabels(self):
    return (p.label for p in self.providers)

  def Provider(self, provider):
    return next((p for p in self.providers if p.label == provider))

  def EventsLabels(self, provider):
    return (e.label for e in self.Provider(provider).events)

  def AllEventLabels(self):
    all_events = (self.EventsLabels(p.label) for p in self.providers)
    return itertools.chain.from_iterable(all_events)

  def Event(self, provider, event):
    return next((e for e in self.Provider(provider).events if e.label == event))

  def ProviderForEvent(self, event_label):
    for p in self.providers:
      if event_label in self.EventsLabels(p.label):
        return p
    return self._unadvertised_provider


TRIGGER_PROVIDER_REGISTRY = _TriggerProviderRegistry(_PROVIDERS)