HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/surface/run/jobs/execute.py
# -*- coding: utf-8 -*- #
# Copyright 2022 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Command for running jobs."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

from googlecloudsdk.calliope import base
from googlecloudsdk.command_lib.run import connection_context
from googlecloudsdk.command_lib.run import container_parser
from googlecloudsdk.command_lib.run import exceptions
from googlecloudsdk.command_lib.run import flags
from googlecloudsdk.command_lib.run import messages_util
from googlecloudsdk.command_lib.run import pretty_print
from googlecloudsdk.command_lib.run import resource_args
from googlecloudsdk.command_lib.run import serverless_operations
from googlecloudsdk.command_lib.run import stages
from googlecloudsdk.command_lib.util.concepts import concept_parsers
from googlecloudsdk.command_lib.util.concepts import presentation_specs
from googlecloudsdk.core import log
from googlecloudsdk.core.console import progress_tracker


def ContainerOverridesGroup():
  """Returns an argument group with all per-container args for overrides."""

  help_text = """
Container Flags

  If the --container is specified the following arguments may only be specified after a --container flag.
"""
  group = base.ArgumentGroup(help=help_text)
  group.AddArgument(flags.ArgsFlag(for_execution_overrides=True))
  group.AddArgument(flags.OverrideEnvVarsFlag())
  return group


@base.UniverseCompatible
@base.ReleaseTracks(base.ReleaseTrack.GA)
class Execute(base.Command):
  """Execute a job."""

  detailed_help = {
      'DESCRIPTION': """
          {description}
          """,
      'EXAMPLES': """
          To execute a job:

              $ {command} my-job
          """,
  }

  container_flags_text = '`--update-env-vars`, `--args`'

  @classmethod
  def CommonArgs(cls, parser):
    job_presentation = presentation_specs.ResourcePresentationSpec(
        'JOB',
        resource_args.GetJobResourceSpec(prompt=True),
        'Job to execute.',
        required=True,
        prefixes=False,
    )
    concept_parsers.ConceptParser([job_presentation]).AddToParser(parser)
    polling_group = parser.add_mutually_exclusive_group()
    flags.AddAsyncFlag(polling_group)
    flags.AddWaitForCompletionFlag(polling_group)
    # No output by default, can be overridden by --format
    parser.display_info.AddFormat('none')
    flags.AddTaskTimeoutFlags(parser, for_execution_overrides=True)
    flags.AddTasksFlag(parser, for_execution_overrides=True)

  @staticmethod
  def Args(parser):
    Execute.CommonArgs(parser)
    container_args = ContainerOverridesGroup()
    container_parser.AddContainerFlags(parser, container_args)

  def _MakeContainerOverrde(self, operations, args, container_name=None):
    # If args list has been explicitly set as an empty list,
    # this is to clear out the existing args list.
    clear_args = flags.FlagIsExplicitlySet(args, 'args') and not args.args
    return operations.MakeContainerOverride(
        name=container_name,
        update_env_vars=args.update_env_vars,
        args=args.args,
        clear_args=clear_args,
    )

  def _AssertContainerOverrides(self, args):
    if flags.FlagIsExplicitlySet(args, 'containers'):
      for container_name, container_args in args.containers.items():
        if not flags.FlagIsExplicitlySet(
            container_args, 'args'
        ) and not flags.FlagIsExplicitlySet(container_args, 'update_env_vars'):
          raise exceptions.NoConfigurationChangeError(
              'No container overrides requested to container `{}`. '
              'Did you mean to include the flags {} after `--container` flag?'
              .format(container_name, self.container_flags_text)
          )

  def Run(self, args):
    """Execute a Job on Cloud Run."""
    job_ref = args.CONCEPTS.job.Parse()
    flags.ValidateResource(job_ref)
    self._AssertContainerOverrides(args)
    conn_context = connection_context.GetConnectionContext(
        args, flags.Product.RUN, self.ReleaseTrack()
    )
    with serverless_operations.Connect(conn_context) as operations:
      with progress_tracker.StagedProgressTracker(
          'Creating execution...',
          stages.ExecutionStages(include_completion=args.wait),
          failure_message='Executing job failed',
          suppress_output=args.async_,
      ) as tracker:
        overrides = None
        if flags.HasExecutionOverrides(args):
          operations.ValidateConfigOverrides(
              job_ref, flags.GetExecutionOverridesChangesForValidation(args)
          )
          container_overrides = []
          if flags.HasContainerOverrides(args):
            if flags.HasTopLevelContainerOverride(args):
              container_overrides.append(
                  self._MakeContainerOverrde(operations, args)
              )
            if flags.FlagIsExplicitlySet(args, 'containers'):
              for container_name, container_args in args.containers.items():
                container_overrides.append(
                    self._MakeContainerOverrde(
                        operations, container_args, container_name
                    )
                )
          overrides = operations.GetExecutionOverrides(
              args.tasks, args.task_timeout, container_overrides
          )
        e = operations.RunJob(
            job_ref,
            tracker,
            args.wait,
            args.async_,
            self.ReleaseTrack(),
            overrides
        )

      if args.async_:
        pretty_print.Success(
            'Execution [{{bold}}{execution}{{reset}}] is being'
            ' started asynchronously.'.format(execution=e.name)
        )
      else:
        operation = 'completed' if args.wait else 'started running'

        pretty_print.Success(
            'Execution [{{bold}}{execution}{{reset}}] has '
            'successfully {operation}.'.format(
                execution=e.name, operation=operation
            )
        )

      log.status.Print(
          messages_util.GetExecutionCreatedMessage(self.ReleaseTrack(), e)
      )
      return e


@base.ReleaseTracks(base.ReleaseTrack.BETA)
class BetaExecute(Execute):
  """Execute a job."""

  @classmethod
  def Args(cls, parser):
    cls.CommonArgs(parser)
    container_args = ContainerOverridesGroup()
    container_parser.AddContainerFlags(
        parser, container_args, cls.ReleaseTrack()
    )


@base.ReleaseTracks(base.ReleaseTrack.ALPHA)
class AlphaExecute(BetaExecute):
  """Execute a job."""

  @classmethod
  def Args(cls, parser):
    cls.CommonArgs(parser)
    container_args = ContainerOverridesGroup()
    container_parser.AddContainerFlags(
        parser, container_args, cls.ReleaseTrack()
    )