HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/394/lib/surface/dataflow/jobs/update_options.py
# -*- coding: utf-8 -*- #
# Copyright 2023 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Implementation of gcloud dataflow jobs update-options command."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

from googlecloudsdk.api_lib.dataflow import apis
from googlecloudsdk.calliope import base
from googlecloudsdk.calliope import exceptions
from googlecloudsdk.command_lib.dataflow import job_utils


@base.ReleaseTracks(base.ReleaseTrack.GA, base.ReleaseTrack.BETA)
class UpdateOptions(base.Command):
  """Update pipeline options on-the-fly for running Dataflow jobs.

  This command can modify properties of running Dataflow jobs. Currently, only
  updating autoscaling settings for Streaming Engine jobs is supported.

  Adjust the autoscaling settings for Streaming Engine Dataflow jobs by
  providing at-least one of --min-num-workers or --max-num-workers or
  --worker-utilization-hint (or all 3), or --unset-worker-utilization-hint
  (which cannot be run at the same time as --worker-utilization-hint but works
  with the others).
  Allow a few minutes for the changes to take effect.

  Note that autoscaling settings can only be modified on-the-fly for Streaming
  Engine jobs. Attempts to modify batch job or Streaming Appliance jobs will
  fail.


  ## EXAMPLES

  Modify autoscaling settings to scale between 5-10 workers:

    $ {command} --min-num-workers=5 --max-num-workers=10

  Require a job to use at least 2 workers:

    $ {command} --min-num-workers=2

  Require a job to use at most 20 workers:

    $ {command} --max-num-workers=20

  Adjust the hint of target worker utilization to 70% for horizontal
  autoscaling:

    $ {command} --worker-utilization-hint=0.7

  "Unset" worker utilization hint so that horizontal scaling will rely on its
  default CPU utilization target:

    $ {command} --unset-worker-utilization-hint
  """

  @staticmethod
  def Args(parser):
    """Register flags for this command."""
    job_utils.ArgsForJobRef(parser)
    parser.add_argument(
        '--min-num-workers',
        type=int,
        help=(
            'Lower-bound for autoscaling, between 1-1000. Only supported for'
            ' streaming-engine jobs.'
        ),
    )
    parser.add_argument(
        '--max-num-workers',
        type=int,
        help=(
            'Upper-bound for autoscaling, between 1-1000. Only supported for'
            ' streaming-engine jobs.'
        ),
    )
    parser.add_argument(
        '--worker-utilization-hint',
        type=float,
        help=(
            'Target CPU utilization for autoscaling, ranging from 0.1 to 0.9.'
            ' Only supported for streaming-engine jobs with autoscaling'
            ' enabled.'
        ),
    )
    parser.add_argument(
        '--unset-worker-utilization-hint',
        action='store_true',
        help=(
            'Unset --worker-utilization-hint. This causes the'
            ' job autoscaling to fall back to internal tunings'
            ' if they exist, or otherwise use the default hint value.'
        ),
    )

  def Run(self, args):
    """Called when the user runs gcloud dataflow jobs update-options ...

    Args:
      args: all the arguments that were provided to this command invocation.

    Returns:
      The updated Job
    """
    if (
        args.min_num_workers is None
        and args.max_num_workers is None
        and args.worker_utilization_hint is None
        and not args.unset_worker_utilization_hint
    ):
      raise exceptions.OneOfArgumentsRequiredException(
          [
              '--min-num-workers',
              '--max-num-workers',
              '--worker-utilization-hint',
              '--unset-worker-utilization-hint',
          ],
          'You must provide at-least one field to update',
      )
    elif (
        args.worker_utilization_hint is not None
        and args.unset_worker_utilization_hint
    ):
      raise exceptions.ConflictingArgumentsException(
          'The arguments --worker-utilization-hint and'
          ' --unset-worker-utilization-hint are mutually exclusive (as the'
          ' unset command will unset the given hint), and must be called'
          ' separately.',
      )

    job_ref = job_utils.ExtractJobRef(args)
    return apis.Jobs.UpdateOptions(
        job_ref.jobId,
        project_id=job_ref.projectId,
        region_id=job_ref.location,
        min_num_workers=args.min_num_workers,
        max_num_workers=args.max_num_workers,
        worker_utilization_hint=args.worker_utilization_hint,
        unset_worker_utilization_hint=args.unset_worker_utilization_hint,
    )