HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/command_lib/run/op_pollers.py
# -*- coding: utf-8 -*- #
# Copyright 2023 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# pylint: disable=raise-missing-from
"""Pollers for Serverless operations."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

from googlecloudsdk.api_lib.util import waiter
from googlecloudsdk.command_lib.run import exceptions as serverless_exceptions
from googlecloudsdk.core import exceptions


class DomainMappingResourceRecordPoller(waiter.OperationPoller):
  """Poll for when a DomainMapping first has resourceRecords."""

  def __init__(self, ops):
    self._ops = ops

  def IsDone(self, mapping):
    if getattr(mapping.status, 'resourceRecords', None):
      return True
    conditions = mapping.conditions
    # pylint: disable=g-bool-id-comparison
    # False (indicating failure) as distinct from None (indicating not sure yet)
    if conditions and conditions['Ready']['status'] is False:
      return True
    # pylint: enable=g-bool-id-comparison
    return False

  def GetResult(self, mapping):
    return mapping

  def Poll(self, domain_mapping_ref):
    return self._ops.GetDomainMapping(domain_mapping_ref)


class ConditionPoller(waiter.OperationPoller):
  """A poller for CloudRun resource creation or update.

  Takes in a reference to a StagedProgressTracker, and updates it with progress.
  """

  def __init__(
      self, resource_getter, tracker, dependencies=None, ready_message='Done.'
  ):
    """Initialize the ConditionPoller.

    Start any unblocked stages in the tracker immediately.

    Arguments:
      resource_getter: function, returns a resource with conditions.
      tracker: a StagedProgressTracker to keep updated. It must contain a stage
        for each condition in the dependencies map, if the dependencies map is
        provided.  The stage represented by each key can only start when the set
        of conditions in the corresponding value have all completed. If a
        condition should be managed by this ConditionPoller but depends on
        nothing, it should map to an empty set. Conditions in the tracker but
        *not* managed by the ConditionPoller should not appear in the dict.
      dependencies: Dict[str, Set[str]], The dependencies between conditions
        that are managed by this ConditionPoller. The values are the set of
        conditions that must become true before the key begins being worked on
        by the server.  If the entire dependencies dict is None, the poller will
        assume that all keys in the tracker are relevant and none have
        dependencies.
      ready_message: str, message to display in header of tracker when
        conditions are ready.
    """
    # _dependencies is a map of condition -> {preceding conditions}
    # It is meant to be checked off as we finish things.
    self._dependencies = {k: set() for k in tracker}
    if dependencies is not None:
      for k in dependencies:
        # Add dependencies, only if they're still not complete. If a stage isn't
        # in the tracker. consider it "already complete".
        self._dependencies[k] = {
            c
            for c in dependencies[k]
            if c in tracker and not tracker.IsComplete(c)
        }
    self._resource_getter = resource_getter
    self._tracker = tracker
    self._resource_fail_type = exceptions.Error
    self._ready_message = ready_message
    self._StartUnblocked()

  def _IsBlocked(self, condition):
    return condition in self._dependencies and self._dependencies[condition]

  def IsDone(self, conditions):
    """Overrides.

    Args:
      conditions: A condition.Conditions object.

    Returns:
      a bool indicates whether `conditions` is terminal.
    """
    if conditions is None:
      return False
    return conditions.IsTerminal()

  def _PollTerminalSubconditions(self, conditions, conditions_message):
    for condition in conditions.TerminalSubconditions():
      if condition not in self._dependencies:
        continue
      message = conditions[condition]['message']
      status = conditions[condition]['status']
      self._PossiblyUpdateMessage(condition, message, conditions_message)
      if status is None:
        continue
      elif status:
        if self._PossiblyCompleteStage(condition, message):
          # Check all terminal subconditions again to ensure any stages that
          # were unblocked by this stage completing are re-checked before we
          # check the ready condition
          self._PollTerminalSubconditions(conditions, conditions_message)
          break
      else:
        self._PossiblyFailStage(condition, message)

  def Poll(self, unused_ref):
    """Overrides.

    Args:
      unused_ref: A string representing the operation reference. Currently it
        must be 'deploy'.

    Returns:
      A condition.Conditions object.
    """
    conditions = self.GetConditions()

    if conditions is None or not conditions.IsFresh():
      return None

    conditions_message = conditions.DescriptiveMessage()
    self._tracker.UpdateHeaderMessage(conditions_message)

    self._PollTerminalSubconditions(conditions, conditions_message)

    terminal_condition = conditions.TerminalCondition()
    if conditions.IsReady():
      self._tracker.UpdateHeaderMessage(self._ready_message)
      if terminal_condition in self._dependencies:
        self._PossiblyCompleteStage(terminal_condition, None)
      self._tracker.Tick()
    elif conditions.IsFailed():
      if terminal_condition in self._dependencies:
        self._PossiblyFailStage(terminal_condition, None)
      raise self._resource_fail_type(conditions_message)

    return conditions

  def GetResource(self):
    return self._resource_getter()

  def _PossiblyUpdateMessage(self, condition, message, conditions_message):
    """Update the stage message.

    Args:
      condition: str, The name of the status condition.
      message: str, The new message to display
      conditions_message: str, The message from the conditions object we're
        displaying..
    """
    if condition not in self._tracker or self._tracker.IsComplete(condition):
      return

    if self._IsBlocked(condition):
      return

    if message != conditions_message:
      self._tracker.UpdateStage(condition, message)

  def _RecordConditionComplete(self, condition):
    """Take care of the internal-to-this-class bookkeeping stage complete."""
    # Unblock anything that was blocked on this.

    # Strategy: "check off" each dependency as we complete it by removing from
    # the set in the value.
    for requirements in self._dependencies.values():
      requirements.discard(condition)

  def _PossiblyCompleteStage(self, condition, message):
    """Complete the stage if it's not already complete.

    Make sure the necessary internal bookkeeping is done.

    Args:
      condition: str, The name of the condition whose stage should be completed.
      message: str, The detailed message for the condition.

    Returns:
      bool: True if stage was completed, False if no action taken
    """
    if condition not in self._tracker or self._tracker.IsComplete(condition):
      return False
    # A blocked condition is likely to remain True (indicating the previous
    # operation concerning it was successful) until the blocking condition(s)
    # finish and it's time to switch to Unknown (the current operation
    # concerning it is in progress). Don't mark those done before they switch to
    # Unknown.
    if not self._tracker.IsRunning(condition):
      return False
    self._RecordConditionComplete(condition)
    self._StartUnblocked()
    self._tracker.CompleteStage(condition, message)
    return True

  def _StartUnblocked(self):
    """Call StartStage in the tracker for any not-started not-blocked tasks.

    Record the fact that they're started in our internal bookkeeping.
    """
    # The set of stages that aren't marked started and don't have unsatisfied
    # dependencies are newly unblocked.
    for c in self._dependencies:
      if c not in self._tracker:
        continue
      if self._tracker.IsWaiting(c) and not self._IsBlocked(c):
        self._tracker.StartStage(c)
    # TODO(b/120679874): Should not have to manually call Tick()
    self._tracker.Tick()

  def _PossiblyFailStage(self, condition, message):
    """Possibly fail the stage.

    Args:
      condition: str, The name of the status whose stage failed.
      message: str, The detailed message for the condition.

    Raises:
      DeploymentFailedError: If the 'Ready' condition failed.
    """
    # Don't fail an already failed stage.
    if condition not in self._tracker or self._tracker.IsComplete(condition):
      return

    self._tracker.FailStage(
        condition, self._resource_fail_type(message), message
    )

  def GetResult(self, conditions):
    """Overrides.

    Get terminal conditions as the polling result.

    Args:
      conditions: A condition.Conditions object.

    Returns:
      A condition.Conditions object.
    """
    return conditions

  def GetConditions(self):
    """Returns the resource conditions wrapped in condition.Conditions.

    Returns:
      A condition.Conditions object.
    """
    resource = self._resource_getter()

    if resource is None:
      return None
    return resource.conditions


class ServiceConditionPoller(ConditionPoller):
  """A ConditionPoller for services."""

  def __init__(self, getter, tracker, dependencies=None, serv=None):
    super().__init__(getter, tracker, dependencies)
    self._resource_fail_type = serverless_exceptions.DeploymentFailedError


class WorkerPoolConditionPoller(ConditionPoller):
  """A ConditionPoller for worker pools."""

  def __init__(self, getter, tracker, dependencies=None, worker_pool=None):
    super().__init__(getter, tracker, dependencies)
    self._resource_fail_type = serverless_exceptions.DeploymentFailedError


class RevisionNameBasedPoller(waiter.OperationPoller):
  """Poll for the revision with the given name to exist."""

  def __init__(self, operations, revision_ref_getter):
    self._operations = operations
    self._revision_ref_getter = revision_ref_getter

  def IsDone(self, revision_obj):
    return bool(revision_obj)

  def Poll(self, revision_name):
    revision_ref = self._revision_ref_getter(revision_name)
    return self._operations.GetRevision(revision_ref)

  def GetResult(self, revision_obj):
    return revision_obj


class NonceBasedRevisionPoller(waiter.OperationPoller):
  """To poll for exactly one revision with the given nonce to appear."""

  def __init__(self, operations, namespace_ref):
    self._operations = operations
    self._namespace = namespace_ref

  def IsDone(self, revisions):
    return bool(revisions)

  def Poll(self, nonce):
    return self._operations.GetRevisionsByNonce(self._namespace, nonce)

  def GetResult(self, revisions):
    if len(revisions) == 1:
      return revisions[0]
    return None


class ExecutionConditionPoller(ConditionPoller):
  """A ConditionPoller for jobs."""

  def __init__(self, getter, tracker, terminal_condition, dependencies=None):
    super().__init__(getter, tracker, dependencies)
    self._resource_fail_type = serverless_exceptions.ExecutionFailedError
    self._terminal_condition = terminal_condition

  def _PotentiallyUpdateInstanceCompletions(self, job_obj, conditions):
    """Maybe update the terminal condition stage message with number of completions."""
    terminal_condition = conditions.TerminalCondition()
    if terminal_condition not in self._tracker or self._IsBlocked(
        terminal_condition
    ):
      return

    self._tracker.UpdateStage(
        terminal_condition,
        '{} / {} complete'.format(
            job_obj.status.succeededCount or 0, job_obj.task_count
        ),
    )

  def GetConditions(self):
    """Returns the resource conditions wrapped in condition.Conditions.

    Returns:
      A condition.Conditions object.
    """
    job_obj = self._resource_getter()

    if job_obj is None:
      return None

    conditions = job_obj.GetConditions(self._terminal_condition)

    # This is a bit of a cheat to hook into the polling method. This is done
    # because this is only place where the resource is gotten from the server,
    # so reusing it saves an api call. This is also simpler than attempting to
    # override the Poll method which would likely lead to duplicate code and/or
    # complicated error handling.
    self._PotentiallyUpdateInstanceCompletions(job_obj, conditions)

    return conditions


class WaitOperationPoller(waiter.CloudOperationPoller):
  """Poll for a long running operation using Wait instead of Get."""

  def __init__(self, messages_module, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self._messages_module = messages_module

  def Poll(self, operation_ref):
    """Overrides.

    Args:
      operation_ref: googlecloudsdk.core.resources.Resource.

    Returns:
      fetched operation message.
    """
    request_type = self.operation_service.GetRequestType('Wait')
    wait_req = self._messages_module.GoogleLongrunningWaitOperationRequest(
        timeout='10s'
    )
    return self.operation_service.Wait(
        request_type(
            name=operation_ref.RelativeName(),
            googleLongrunningWaitOperationRequest=wait_req,
        )
    )