HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/api_lib/run/execution.py
# -*- coding: utf-8 -*- #
# Copyright 2022 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Wraps a Cloud Run Execution message with convenience methods."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import enum
from googlecloudsdk.api_lib.run import container_resource
from googlecloudsdk.api_lib.run import k8s_object

AUTHOR_ANNOTATION = k8s_object.RUN_GROUP + '/creator'

STARTED_CONDITION = 'Started'
COMPLETED_CONDITION = 'Completed'

JOB_LABEL = 'run.googleapis.com/job'


class RestartPolicy(enum.Enum):
  NEVER = 'Never'
  ON_FAILURE = 'OnFailure'


class Execution(k8s_object.KubernetesObject):
  """Wraps a Cloud Run Execution message, making fields more convenient."""

  API_CATEGORY = 'run.googleapis.com'
  KIND = 'Execution'
  READY_CONDITION = COMPLETED_CONDITION
  TERMINAL_CONDITIONS = frozenset({STARTED_CONDITION, READY_CONDITION})

  class TaskTemplateSpec(container_resource.ContainerResource):
    """Wrapper class for Execution subfield TaskTemplateSpec."""

    KIND = 'TaskTemplateSpec'

    @classmethod
    def SpecAndParitialMetadataOnly(cls, execution):
      """Special wrapper for spec only that also covers partial metadata.

      For a message type without its own metadata, like TaskTemplateSpec,
      metadata fields should either raise AttributeErrors or refer to the
      metadata of a different message depending on use case. This method handles
      the annotations and labels of metadata by referencing the parent
      execution's annotations and labels.
      All other metadata fields will fall through to k8s_object which will
      lead to AttributeErrors.

      Args:
        execution: The parent execution for this TaskTemplateSpec

      Returns:
        A new k8s_object to wrap the TaskTemplateSpec with only the spec
        fields and the metadata annotations and labels.
      """
      spec_wrapper = super(Execution.TaskTemplateSpec,
                           cls).SpecOnly(execution.spec.template.spec,
                                         execution.MessagesModule())
      # pylint: disable=protected-access
      spec_wrapper._annotations = execution.annotations
      spec_wrapper._labels = execution.labels
      return spec_wrapper

    @property
    def annotations(self):
      """Override to return the parent execution's annotations."""
      try:
        return self._annotations
      except AttributeError:
        raise ValueError(
            'Execution templates do not have their own annotations. Initialize '
            'the wrapper with SpecAndAnnotationsOnly to be able to use '
            'annotations.')

    @property
    def labels(self):
      """Override to return the parent execution's labels."""
      try:
        return self._labels
      except AttributeError:
        raise ValueError(
            'Execution templates do not have their own labels. Initialize '
            'the wrapper with SpecAndAnnotationsOnly to be able to use '
            'labels.'
        )

    @property
    def service_account(self):
      """The service account to use as the container identity."""
      return self.spec.serviceAccountName

    @service_account.setter
    def service_account(self, value):
      self.spec.serviceAccountName = value

    def _EnsureNodeSelector(self):
      if self.spec.nodeSelector is None:
        self.spec.nodeSelector = k8s_object.InitializedInstance(
            self._messages.TaskSpec.NodeSelectorValue
        )

    @property
    def node_selector(self):
      """The node selector as a dictionary { accelerator_type: value}."""
      self._EnsureNodeSelector()
      return k8s_object.KeyValueListAsDictionaryWrapper(
          self.spec.nodeSelector.additionalProperties,
          self._messages.TaskSpec.NodeSelectorValue.AdditionalProperty,
          key_field='key',
          value_field='value',
      )

  @property
  def template(self):
    return Execution.TaskTemplateSpec.SpecAndParitialMetadataOnly(self)

  @property
  def author(self):
    return self.annotations.get(AUTHOR_ANNOTATION)

  @property
  def image(self):
    return self.template.image

  @image.setter
  def image(self, value):
    self.template.image = value

  @property
  def parallelism(self):
    return self.spec.parallelism

  @parallelism.setter
  def parallelism(self, value):
    self.spec.parallelism = value

  @property
  def task_count(self):
    return self.spec.taskCount

  @task_count.setter
  def task_count(self, value):
    self.spec.taskCount = value

  @property
  def started_condition(self):
    if self.conditions and STARTED_CONDITION in self.conditions:
      return self.conditions[STARTED_CONDITION]

  @property
  def job_name(self):
    return self.labels[JOB_LABEL]