HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/394/lib/googlecloudsdk/api_lib/eventarc/triggers.py
# -*- coding: utf-8 -*- #
# Copyright 2020 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilities for Eventarc Triggers API."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

from apitools.base.py import list_pager
from googlecloudsdk.api_lib.eventarc import common
from googlecloudsdk.api_lib.eventarc.base import EventarcClientBase
from googlecloudsdk.api_lib.util import apis
from googlecloudsdk.command_lib.eventarc import types
from googlecloudsdk.core import exceptions
from googlecloudsdk.core import resources
from googlecloudsdk.core.util import iso_duration
from googlecloudsdk.core.util import times

MAX_ACTIVE_DELAY_MINUTES = 2


class NoFieldsSpecifiedError(exceptions.Error):
  """Error when no fields were specified for a Patch operation."""


def GetTriggerURI(resource):
  trigger = resources.REGISTRY.ParseRelativeName(
      resource.name, collection='eventarc.projects.locations.triggers')
  return trigger.SelfLink()


def TriggerActiveTime(event_type, update_time):
  """Computes the time by which the trigger will become active.

  Args:
    event_type: str, the trigger's event type.
    update_time: str, the time when the trigger was last modified.

  Returns:
    The active time as a string, or None if the trigger is already active.
  """
  if not types.IsAuditLogType(event_type):
    # The delay only applies to Audit Log triggers.
    return None
  update_dt = times.ParseDateTime(update_time)
  delay = iso_duration.Duration(minutes=MAX_ACTIVE_DELAY_MINUTES)
  active_dt = times.GetDateTimePlusDuration(update_dt, delay)
  if times.Now() >= active_dt:
    return None
  return times.FormatDateTime(active_dt, fmt='%H:%M:%S', tzinfo=times.LOCAL)


class _BaseTriggersClient(EventarcClientBase):
  """Base Triggers Client."""

  def __init__(self):
    super(_BaseTriggersClient, self).__init__(common.API_NAME, 'v1', 'trigger')
    client = apis.GetClientInstance(common.API_NAME, 'v1')
    self._messages = client.MESSAGES_MODULE
    self._service = client.projects_locations_triggers
    self._operation_service = client.projects_locations_operations

  def Create(self, trigger_ref, trigger_message):
    """Creates a new Trigger.

    Args:
      trigger_ref: Resource, the Trigger to create.
      trigger_message: Trigger, the trigger message that holds trigger's
        event_filters, service account, destination, transport, etc.

    Returns:
      A long-running operation for create.
    """
    create_req = self._messages.EventarcProjectsLocationsTriggersCreateRequest(
        parent=trigger_ref.Parent().RelativeName(),
        trigger=trigger_message,
        triggerId=trigger_ref.Name())
    return self._service.Create(create_req)

  def Delete(self, trigger_ref):
    """Deletes a Trigger.

    Args:
      trigger_ref: Resource, the Trigger to delete.

    Returns:
      A long-running operation for delete.
    """
    delete_req = self._messages.EventarcProjectsLocationsTriggersDeleteRequest(
        name=trigger_ref.RelativeName())
    return self._service.Delete(delete_req)

  def Get(self, trigger_ref):
    """Gets a Trigger.

    Args:
      trigger_ref: Resource, the Trigger to get.

    Returns:
      The Trigger message.
    """
    get_req = self._messages.EventarcProjectsLocationsTriggersGetRequest(
        name=trigger_ref.RelativeName())
    return self._service.Get(get_req)

  def List(self, location_ref, limit, page_size):
    """Lists Triggers in a given location.

    Args:
      location_ref: Resource, the location to list Triggers in.
      limit: int or None, the total number of results to return.
      page_size: int, the number of entries in each batch (affects requests
        made, but not the yielded results).

    Returns:
      A generator of Triggers in the location.
    """
    list_req = self._messages.EventarcProjectsLocationsTriggersListRequest(
        parent=location_ref.RelativeName(), pageSize=page_size)
    return list_pager.YieldFromList(
        self._service,
        list_req,
        field='triggers',
        batch_size=page_size,
        limit=limit,
        batch_size_attribute='pageSize')

  def Patch(self, trigger_ref, trigger_message, update_mask):
    """Updates a Trigger.

    Args:
      trigger_ref: Resource, the Trigger to update.
      trigger_message: Trigger, the trigger message that holds trigger's
        event_filters, service account, destination, transport, etc.
      update_mask: str, a comma-separated list of Trigger fields to update.

    Returns:
      A long-running operation for update.
    """
    patch_req = self._messages.EventarcProjectsLocationsTriggersPatchRequest(
        name=trigger_ref.RelativeName(),
        trigger=trigger_message,
        updateMask=update_mask)
    return self._service.Patch(patch_req)


class TriggersClientV1(_BaseTriggersClient):
  """Client for Triggers service in the Eventarc GA API."""

  def BuildTriggerMessage(
      self,
      trigger_ref,
      event_filters,
      event_filters_path_pattern,
      event_data_content_type,
      service_account,
      destination_message,
      transport_topic_ref,
      channel_ref,
      labels,
  ):
    """Builds a Trigger message with the given data.

    Args:
      trigger_ref: Resource, the Trigger to create.
      event_filters: dict or None, the Trigger's event filters.
      event_filters_path_pattern: dict or None, the Trigger's event filters in
        path pattern format.
      event_data_content_type: str or None, the data content type of the
        Trigger's event.
      service_account: str or None, the Trigger's service account.
      destination_message: Destination message or None, the Trigger's
        destination.
      transport_topic_ref: Resource or None, the user-provided transport topic.
      channel_ref: Resource or None, the channel for 3p events
      labels: dict or None, the Trigger's labels.

    Returns:
      A Trigger message with a destination service.
    """
    filter_messages = [] if event_filters is None else [
        self._messages.EventFilter(attribute=key, value=value)
        for key, value in event_filters.items()
    ]
    if event_filters_path_pattern is not None:
      for key, value in event_filters_path_pattern.items():
        filter_messages.append(
            self._messages.EventFilter(
                attribute=key, value=value, operator='match-path-pattern'))

    transport_topic_name = transport_topic_ref.RelativeName(
    ) if transport_topic_ref else None

    pubsub = self._messages.Pubsub(topic=transport_topic_name)
    transport = self._messages.Transport(pubsub=pubsub)
    channel = channel_ref.RelativeName() if channel_ref else None
    return self._messages.Trigger(
        name=trigger_ref.RelativeName(),
        eventFilters=filter_messages,
        eventDataContentType=event_data_content_type,
        serviceAccount=service_account,
        destination=destination_message,
        transport=transport,
        channel=channel,
        labels=labels,
    )

  def BuildCloudRunDestinationMessage(self, destination_run_service,
                                      destination_run_job, destination_run_path,
                                      destination_run_region):
    """Builds a Destination message for a destination Cloud Run service.

    Args:
      destination_run_service: str or None, the destination Cloud Run service.
      destination_run_job: str or None, the destination Cloud Run job.
      destination_run_path: str or None, the path on the destination Cloud Run
        service.
      destination_run_region: str or None, the destination Cloud Run service's
        region.

    Returns:
      A Destination message for a destination Cloud Run service.
    """
    # Because the flags for service and job are in a mutually exclusive group,
    # we can set both here under the assumption one of them will be None.
    run_message = self._messages.CloudRun(
        service=destination_run_service,
        job=destination_run_job,
        path=destination_run_path,
        region=destination_run_region)
    return self._messages.Destination(cloudRun=run_message)

  def BuildGKEDestinationMessage(self, destination_gke_cluster,
                                 destination_gke_location,
                                 destination_gke_namespace,
                                 destination_gke_service, destination_gke_path):
    """Builds a Destination message for a destination GKE service.

    Args:
      destination_gke_cluster: str or None, the destination GKE service's
        cluster.
      destination_gke_location: str or None, the location of the destination GKE
        service's cluster.
      destination_gke_namespace: str or None, the destination GKE service's
        namespace.
      destination_gke_service: str or None, the destination GKE service.
      destination_gke_path: str or None, the path on the destination GKE
        service.

    Returns:
      A Destination message for a destination GKE service.
    """
    gke_message = self._messages.GKE(
        cluster=destination_gke_cluster,
        location=destination_gke_location,
        namespace=destination_gke_namespace,
        service=destination_gke_service,
        path=destination_gke_path)
    return self._messages.Destination(gke=gke_message)

  def BuildWorkflowDestinationMessage(self, project_id, destination_workflow,
                                      destination_workflow_location):
    """Builds a Workflow Destination message with the given data.

    Args:
      project_id: the ID of the project.
      destination_workflow: str or None, the Trigger's destination Workflow ID.
      destination_workflow_location: str or None, the location of the Trigger's
        destination Workflow. It defaults to the Trigger's location.

    Returns:
      A Destination message with a Workflow destination.
    """

    workflow_message = 'projects/{}/locations/{}/workflows/{}'.format(
        project_id, destination_workflow_location, destination_workflow)
    return self._messages.Destination(workflow=workflow_message)

  def BuildFunctionDestinationMessage(self, project_id, destination_function,
                                      destination_function_location):
    """Builds a Function Destination message with the given data.

    Args:
      project_id: the ID of the project.
      destination_function: str or None, the Trigger's destination Function ID.
      destination_function_location: str or None, the location of the Trigger's
        destination Function. It defaults to the Trigger's location.

    Returns:
      A Destination message with a Function destination.
    """

    function_message = 'projects/{}/locations/{}/functions/{}'.format(
        project_id, destination_function_location, destination_function)
    return self._messages.Destination(cloudFunction=function_message)

  def BuildHTTPEndpointDestinationMessage(
      self,
      destination_http_endpoint_uri,
      network_attachment
  ):
    """Builds a HTTP Endpoint Destination message with the given data.

    Args:
      destination_http_endpoint_uri: str or None, the Trigger's destination uri.
      network_attachment: str or None, the Trigger's destination
        network attachment.

    Returns:
      A Destination message with a HTTP Endpoint destination.
    """
    http_endpoint_message = self._messages.HttpEndpoint(
        uri=destination_http_endpoint_uri,
    )
    network_config_message = self._messages.NetworkConfig(
        networkAttachment=network_attachment
    )
    return self._messages.Destination(
        httpEndpoint=http_endpoint_message,
        networkConfig=network_config_message
    )

  def BuildUpdateMask(
      self,
      event_filters,
      event_filters_path_pattern,
      event_data_content_type,
      service_account,
      destination_run_service,
      destination_run_job,
      destination_run_path,
      destination_run_region,
      destination_gke_namespace,
      destination_gke_service,
      destination_gke_path,
      destination_workflow,
      destination_workflow_location,
      destination_function,
      destination_function_location,
      labels,
  ):
    """Builds an update mask for updating a Cloud Run trigger.

    Args:
      event_filters: bool, whether to update the event filters.
      event_filters_path_pattern: bool, whether to update the event filters with
        path pattern syntax.
      event_data_content_type: bool, whether to update the event data content
        type.
      service_account: bool, whether to update the service account.
      destination_run_service: bool, whether to update the destination Cloud Run
        service.
      destination_run_job: bool, whether to update the desintation Cloud Run
        job.
      destination_run_path: bool, whether to update the destination Cloud Run
        path.
      destination_run_region: bool, whether to update the destination Cloud Run
        region.
      destination_gke_namespace: bool, whether to update the destination GKE
        service namespace.
      destination_gke_service: bool, whether to update the destination GKE
        service name.
      destination_gke_path: bool, whether to update the destination GKE path.
      destination_workflow: bool, whether to update the destination workflow.
      destination_workflow_location: bool, whether to update the destination
        workflow location.
      destination_function: bool, whether to update the destination function.
      destination_function_location: bool, whether to update the destination
        function location.
      labels: bool, whether to update the labels.

    Returns:
      The update mask as a string.

    Raises:
      NoFieldsSpecifiedError: No fields are being updated.
    """
    update_mask = []
    if destination_run_path:
      update_mask.append('destination.cloudRun.path')
    if destination_run_region:
      update_mask.append('destination.cloudRun.region')
    if destination_run_service:
      update_mask.append('destination.cloudRun.service')
    if destination_run_job:
      update_mask.append('destination.cloudRun.job')
    if destination_gke_namespace:
      update_mask.append('destination.gke.namespace')
    if destination_gke_service:
      update_mask.append('destination.gke.service')
    if destination_gke_path:
      update_mask.append('destination.gke.path')
    if destination_workflow or destination_workflow_location:
      update_mask.append('destination.workflow')
    if destination_function or destination_function_location:
      update_mask.append('destination.cloudFunction')
    if event_filters or event_filters_path_pattern:
      update_mask.append('eventFilters')
    if service_account:
      update_mask.append('serviceAccount')
    if event_data_content_type:
      update_mask.append('eventDataContentType')
    if labels:
      update_mask.append('labels')
    if not update_mask:
      raise NoFieldsSpecifiedError('Must specify at least one field to update.')
    return ','.join(update_mask)

  def GetEventType(self, trigger_message):
    """Gets the Trigger's event type."""
    return types.EventFiltersMessageToType(trigger_message.eventFilters)

  def LabelsValueClass(self):
    """Returns the labels value class."""
    return self._messages.Trigger.LabelsValue