HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/394/lib/surface/dataproc/clusters/update.py
# -*- coding: utf-8 -*- #
# Copyright 2015 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Update cluster command."""

from googlecloudsdk.api_lib.dataproc import constants as dataproc_constants
from googlecloudsdk.api_lib.dataproc import dataproc as dp
from googlecloudsdk.api_lib.dataproc import exceptions
from googlecloudsdk.api_lib.dataproc import util
from googlecloudsdk.calliope import actions
from googlecloudsdk.calliope import arg_parsers
from googlecloudsdk.calliope import base
from googlecloudsdk.command_lib.dataproc import clusters
from googlecloudsdk.command_lib.dataproc import flags
from googlecloudsdk.command_lib.dataproc.utils import user_sa_mapping_util
from googlecloudsdk.command_lib.util.args import labels_util
from googlecloudsdk.core import log
from googlecloudsdk.core.util import times
import six


@base.UniverseCompatible
class Update(base.UpdateCommand):
  """Update labels and/or the number of worker nodes in a cluster.

  Update the number of worker nodes and/or the labels in a cluster.

  ## EXAMPLES

  To resize a cluster, run:

    $ {command} my-cluster --region=us-central1 --num-workers=5

  To change the number preemptible workers in a cluster, run:

    $ {command} my-cluster --region=us-central1 --num-preemptible-workers=5

  To add the label 'customer=acme' to a cluster, run:

    $ {command} my-cluster --region=us-central1 --update-labels=customer=acme

  To update the label 'customer=ackme' to 'customer=acme', run:

    $ {command} my-cluster --region=us-central1 --update-labels=customer=acme

  To remove the label whose key is 'customer', run:

    $ {command} my-cluster --region=us-central1 --remove-labels=customer

  """

  @classmethod
  def Args(cls, parser):
    dataproc = dp.Dataproc(cls.ReleaseTrack())
    base.ASYNC_FLAG.AddToParser(parser)
    # Allow the user to specify new labels as well as update/remove existing
    labels_util.AddUpdateLabelsFlags(parser)
    # Graceful decomissioning timeouts can be up to 24 hours + add 1 hour for
    # deleting VMs, etc.
    flags.AddTimeoutFlag(parser, default='25h')
    flags.AddClusterResourceArg(parser, 'update', dataproc.api_version)
    parser.add_argument(
        '--num-workers',
        type=int,
        help='The new number of worker nodes in the cluster.')
    num_secondary_workers = parser.add_argument_group(mutex=True)
    num_secondary_workers.add_argument(
        '--num-preemptible-workers',
        action=actions.DeprecationAction(
            '--num-preemptible-workers',
            warn=('The `--num-preemptible-workers` flag is deprecated. '
                  'Use the `--num-secondary-workers` flag instead.')),
        type=int,
        hidden=True,
        help='The new number of preemptible worker nodes in the cluster.')
    num_secondary_workers.add_argument(
        '--num-secondary-workers',
        type=int,
        help='The new number of secondary worker nodes in the cluster.')

    parser.add_argument(
        '--graceful-decommission-timeout',
        type=arg_parsers.Duration(lower_bound='0s', upper_bound='1d'),
        help="""
              The graceful decommission timeout for decommissioning Node Managers
              in the cluster, used when removing nodes. Graceful decommissioning
              allows removing nodes from the cluster without interrupting jobs in
              progress. Timeout specifies how long to wait for jobs in progress to
              finish before forcefully removing nodes (and potentially
              interrupting jobs). Timeout defaults to 0 if not set (for forceful
              decommission), and the maximum allowed timeout is 1 day.
              See $ gcloud topic datetimes for information on duration formats.
              """,
    )

    parser.add_argument(
        '--min-secondary-worker-fraction',
        help=(
            'Minimum fraction of new secondary worker nodes added in a scale up'
            ' update operation, required to update the cluster. If it is not'
            ' met, cluster updation will rollback the addition of secondary'
            ' workers. Must be a decimal value between 0 and 1. Defaults to'
            ' 0.0001.'
        ),
        type=float,
    )

    _AddAlphaArguments(parser, cls.ReleaseTrack())

    idle_delete_group = parser.add_mutually_exclusive_group()
    idle_delete_group.add_argument(
        '--max-idle',
        type=arg_parsers.Duration(),
        hidden=True,
        help="""\
        The duration after the last job completes to auto-delete the cluster,
        such as "2h" or "1d".
        See $ gcloud topic datetimes for information on duration formats.
        """)
    idle_delete_group.add_argument(
        '--no-max-idle',
        action='store_true',
        hidden=True,
        help="""\
        Cancels the cluster auto-deletion by cluster idle duration (configured
         by --max-idle flag)
        """,
    )
    idle_delete_group.add_argument(
        '--delete-max-idle',
        type=arg_parsers.Duration(),
        help="""\
        The duration after the last job completes to auto-delete the cluster,
        such as "2h" or "1d".
        See $ gcloud topic datetimes for information on duration formats.
        """)
    idle_delete_group.add_argument(
        '--no-delete-max-idle',
        action='store_true',
        help="""\
        Cancels the cluster auto-deletion by cluster idle duration (configured
        by --delete-max-idle flag)
        """)

    auto_delete_group = parser.add_mutually_exclusive_group()
    auto_delete_group.add_argument(
        '--max-age',
        type=arg_parsers.Duration(),
        hidden=True,
        help="""\
        The lifespan of the cluster, with auto-deletion upon completion,
        "2h" or "1d".
        See $ gcloud topic datetimes for information on duration formats.
        """)
    auto_delete_group.add_argument(
        '--expiration-time',
        type=arg_parsers.Datetime.Parse,
        hidden=True,
        help="""\
        The time when the cluster will be auto-deleted, such as
        "2017-08-29T18:52:51.142Z". See $ gcloud topic datetimes for
        information on time formats.
        """)
    auto_delete_group.add_argument(
        '--no-max-age',
        action='store_true',
        hidden=True,
        help="""\
        Cancels the cluster auto-deletion by maximum cluster age (configured by
         --max-age or --expiration-time flags)
        """)

    auto_delete_group.add_argument(
        '--delete-max-age',
        type=arg_parsers.Duration(),
        help="""\
        The lifespan of the cluster with auto-deletion upon completion,
        such as "2h" or "1d".
        See $ gcloud topic datetimes for information on duration formats.
        """)
    auto_delete_group.add_argument(
        '--delete-expiration-time',
        type=arg_parsers.Datetime.Parse,
        help="""\
        The time when the cluster will be auto-deleted, such as
        "2017-08-29T18:52:51.142Z". See $ gcloud topic datetimes for
        information on time formats.
        """)
    auto_delete_group.add_argument(
        '--no-delete-max-age',
        action='store_true',
        help="""\
        Cancels the cluster auto-deletion by maximum cluster age (configured
        by --delete-max-age or --delete-expiration-time flags)
        """)

    idle_stop_group = parser.add_mutually_exclusive_group()
    idle_stop_group.add_argument(
        '--stop-max-idle',
        type=arg_parsers.Duration(),
        help="""\
        The duration after the last job completes to auto-stop the cluster,
        such as "2h" or "1d".
        See $ gcloud topic datetimes for information on duration formats.
        """)
    idle_stop_group.add_argument(
        '--no-stop-max-idle',
        action='store_true',
        help="""\
        Cancels the cluster auto-stop by cluster idle duration (configured
        by --stop-max-idle flag)
        """)

    auto_stop_group = parser.add_mutually_exclusive_group()
    auto_stop_group.add_argument(
        '--stop-max-age',
        type=arg_parsers.Duration(),
        help="""\
        The lifespan of the cluster, with auto-stop upon completion,
        such as "2h" or "1d".
        See $ gcloud topic datetimes for information on duration formats.
        """)
    auto_stop_group.add_argument(
        '--stop-expiration-time',
        type=arg_parsers.Datetime.Parse,
        help="""\
        The time when the cluster will be auto-stopped, such as
        "2017-08-29T18:52:51.142Z". See $ gcloud topic datetimes for
        information on time formats.
        """)
    auto_stop_group.add_argument(
        '--no-stop-max-age',
        action='store_true',
        help="""\
        Cancels the cluster auto-stop by maximum cluster age (configured by
        --stop-max-age or --stop-expiration-time flags)
        """)

    # Can only specify one of --autoscaling-policy or --disable-autoscaling
    autoscaling_group = parser.add_mutually_exclusive_group()
    flags.AddAutoscalingPolicyResourceArgForCluster(
        autoscaling_group, api_version='v1')
    autoscaling_group.add_argument(
        '--disable-autoscaling',
        action='store_true',
        help="""\
        Disable autoscaling, if it is enabled. This is an alias for passing the
        empty string to --autoscaling-policy'.
        """)
    user_sa_mapping_util.AddUpdateUserSaMappingFlags(parser)

  def Run(self, args):
    dataproc = dp.Dataproc(self.ReleaseTrack())
    cluster_ref = args.CONCEPTS.cluster.Parse()

    cluster_config = dataproc.messages.ClusterConfig()
    changed_fields = []

    has_changes = False
    no_update_error_msg = (
        'Must specify at least one cluster parameter to update.'
    )

    if args.num_workers is not None:
      worker_config = dataproc.messages.InstanceGroupConfig(
          numInstances=args.num_workers)
      cluster_config.workerConfig = worker_config
      changed_fields.append('config.worker_config.num_instances')
      has_changes = True

    num_secondary_workers = _FirstNonNone(args.num_preemptible_workers,
                                          args.num_secondary_workers)
    if num_secondary_workers is not None:
      worker_config = dataproc.messages.InstanceGroupConfig(
          numInstances=num_secondary_workers)
      cluster_config.secondaryWorkerConfig = worker_config
      changed_fields.append(
          'config.secondary_worker_config.num_instances')
      has_changes = True

    if args.min_secondary_worker_fraction is not None:
      if cluster_config.secondaryWorkerConfig is None:
        worker_config = dataproc.messages.InstanceGroupConfig(
            startupConfig=dataproc.messages.StartupConfig(
                requiredRegistrationFraction=(
                    args.min_secondary_worker_fraction
                )
            )
        )
      else:
        worker_config = dataproc.messages.InstanceGroupConfig(
            numInstances=num_secondary_workers,
            startupConfig=dataproc.messages.StartupConfig(
                requiredRegistrationFraction=(
                    args.min_secondary_worker_fraction
                )
            ),
        )
      cluster_config.secondaryWorkerConfig = worker_config
      changed_fields.append(
          'config.secondary_worker_config.startup_config.required_registration_fraction'
      )
      has_changes = True

    if self.ReleaseTrack() == base.ReleaseTrack.ALPHA:
      if args.secondary_worker_standard_capacity_base is not None:
        if cluster_config.secondaryWorkerConfig is None:
          worker_config = dataproc.messages.InstanceGroupConfig(
              instanceFlexibilityPolicy=dataproc.messages.InstanceFlexibilityPolicy(
                  provisioningModelMix=dataproc.messages.ProvisioningModelMix(
                      standardCapacityBase=args.secondary_worker_standard_capacity_base
                  )))
        else:
          worker_config = dataproc.messages.InstanceGroupConfig(
              numInstances=num_secondary_workers,
              startupConfig=cluster_config.secondaryWorkerConfig.startupConfig,
              instanceFlexibilityPolicy=dataproc.messages.InstanceFlexibilityPolicy(
                  provisioningModelMix=dataproc.messages.ProvisioningModelMix(
                      standardCapacityBase=args.secondary_worker_standard_capacity_base
                  )
              )
          )
        cluster_config.secondaryWorkerConfig = worker_config
        changed_fields.append(
            'config.secondary_worker_config.instance_flexibility_policy.provisioning_model_mix.standard_capacity_base'
        )
        has_changes = True

    if args.autoscaling_policy:
      cluster_config.autoscalingConfig = dataproc.messages.AutoscalingConfig(
          policyUri=args.CONCEPTS.autoscaling_policy.Parse().RelativeName())
      changed_fields.append('config.autoscaling_config.policy_uri')
      has_changes = True
    elif args.autoscaling_policy == '' or args.disable_autoscaling:  # pylint: disable=g-explicit-bool-comparison
      # Disabling autoscaling. Don't need to explicitly set
      # cluster_config.autoscaling_config to None.
      changed_fields.append('config.autoscaling_config.policy_uri')
      has_changes = True

    lifecycle_config = dataproc.messages.LifecycleConfig()
    changed_config = False
    # Flags max_age, expiration_time, max_idle, no_max_age, no_max_idle are
    # hidden, but still supported. They are replaced with new flags
    # delete_max_age, delete_expiration_time, delete_max_idle,
    # no_delete_max_age and no_delete_max_idle.
    if args.max_age is not None:
      lifecycle_config.autoDeleteTtl = six.text_type(args.max_age) + 's'
      changed_fields.append('config.lifecycle_config.auto_delete_ttl')
      changed_config = True
    if args.expiration_time is not None:
      lifecycle_config.autoDeleteTime = times.FormatDateTime(
          args.expiration_time)
      changed_fields.append('config.lifecycle_config.auto_delete_time')
      changed_config = True
    if args.max_idle is not None:
      lifecycle_config.idleDeleteTtl = six.text_type(args.max_idle) + 's'
      changed_fields.append('config.lifecycle_config.idle_delete_ttl')
      changed_config = True
    if args.no_max_age:
      lifecycle_config.autoDeleteTtl = None
      changed_fields.append('config.lifecycle_config.auto_delete_ttl')
      changed_config = True
    if args.no_max_idle:
      lifecycle_config.idleDeleteTtl = None
      changed_fields.append('config.lifecycle_config.idle_delete_ttl')
      changed_config = True

    if args.delete_max_age is not None:
      lifecycle_config.autoDeleteTtl = (
          six.text_type(args.delete_max_age) + 's'
      )
      changed_fields.append('config.lifecycle_config.auto_delete_ttl')
      changed_config = True
    if args.delete_expiration_time is not None:
      lifecycle_config.autoDeleteTime = times.FormatDateTime(
          args.delete_expiration_time
      )
      changed_fields.append('config.lifecycle_config.auto_delete_time')
      changed_config = True
    if args.delete_max_idle is not None:
      lifecycle_config.idleDeleteTtl = (
          six.text_type(args.delete_max_idle) + 's'
      )
      changed_fields.append('config.lifecycle_config.idle_delete_ttl')
      changed_config = True
    if args.no_delete_max_age:
      lifecycle_config.autoDeleteTtl = None
      changed_fields.append('config.lifecycle_config.auto_delete_ttl')
      changed_config = True
    if args.no_delete_max_idle:
      lifecycle_config.idleDeleteTtl = None
      changed_fields.append('config.lifecycle_config.idle_delete_ttl')
      changed_config = True

    if args.stop_max_age is not None:
      lifecycle_config.autoStopTtl = six.text_type(args.stop_max_age) + 's'
      changed_fields.append('config.lifecycle_config.auto_stop_ttl')
      changed_config = True
    if args.stop_expiration_time is not None:
      lifecycle_config.autoStopTime = times.FormatDateTime(
          args.stop_expiration_time)
      changed_fields.append('config.lifecycle_config.auto_stop_time')
      changed_config = True
    if args.stop_max_idle is not None:
      lifecycle_config.idleStopTtl = six.text_type(args.stop_max_idle) + 's'
      changed_fields.append('config.lifecycle_config.idle_stop_ttl')
      changed_config = True
    if args.no_stop_max_age:
      lifecycle_config.autoStopTtl = None
      changed_fields.append('config.lifecycle_config.auto_stop_ttl')
      changed_config = True
    if args.no_stop_max_idle:
      lifecycle_config.idleStopTtl = None
      changed_fields.append('config.lifecycle_config.idle_stop_ttl')
      changed_config = True

    if changed_config:
      cluster_config.lifecycleConfig = lifecycle_config
      has_changes = True

    # Put in a thunk so we only make this call if needed
    def _GetCurrentLabels():
      # We need to fetch cluster first so we know what the labels look like. The
      # labels_util will fill out the proto for us with all the updates and
      # removals, but first we need to provide the current state of the labels
      current_cluster = _GetCurrentCluster(dataproc, cluster_ref)
      return current_cluster.labels
    labels_update = labels_util.ProcessUpdateArgsLazy(
        args, dataproc.messages.Cluster.LabelsValue,
        orig_labels_thunk=_GetCurrentLabels)
    if labels_update.needs_update:
      has_changes = True
      changed_fields.append('labels')
    labels = labels_update.GetOrNone()

    def _GetCurrentUserServiceAccountMapping():
      current_cluster = _GetCurrentCluster(dataproc, cluster_ref)
      if (
          current_cluster.config.securityConfig
          and current_cluster.config.securityConfig.identityConfig
      ):
        return (
            current_cluster.config.securityConfig.identityConfig.userServiceAccountMapping
        )
      return None

    def _UpdateSecurityConfig(cluster_config, user_sa_mapping):
      if cluster_config.securityConfig is None:
        cluster_config.securityConfig = dataproc.messages.SecurityConfig()
      if cluster_config.securityConfig.identityConfig is None:
        cluster_config.securityConfig.identityConfig = (
            dataproc.messages.IdentityConfig()
        )

      cluster_config.securityConfig.identityConfig.userServiceAccountMapping = (
          user_sa_mapping
      )

    if args.add_user_mappings or args.remove_user_mappings:
      if not _IsMultitenancyCluster(_GetCurrentCluster(dataproc, cluster_ref)):
        raise exceptions.ArgumentError(
            'User service account mapping can only be updated for multi-tenant'
            ' clusters.'
        )
      user_sa_mapping_update = user_sa_mapping_util.ProcessUpdateArgsLazy(
          args,
          dataproc.messages.IdentityConfig.UserServiceAccountMappingValue,
          orig_user_sa_mapping_thunk=_GetCurrentUserServiceAccountMapping,
      )
      if user_sa_mapping_update.needs_update:
        changed_fields.append(
            'config.security_config.identity_config.user_service_account_mapping'
        )
        has_changes = True
      else:
        if args.add_user_mappings:
          no_update_error_msg += (
              ' User to add is already present in service account mapping.'
          )
        if args.remove_user_mappings:
          no_update_error_msg += (
              ' User to remove is not present in service account mapping.'
          )
      user_sa_mapping = user_sa_mapping_update.GetOrNone()
      if user_sa_mapping:
        _UpdateSecurityConfig(cluster_config, user_sa_mapping)
    elif args.identity_config_file:
      if not _IsMultitenancyCluster(_GetCurrentCluster(dataproc, cluster_ref)):
        raise exceptions.ArgumentError(
            'User service account mapping can only be updated for multi-tenant'
            ' clusters.'
        )
      if cluster_config.securityConfig is None:
        cluster_config.securityConfig = dataproc.messages.SecurityConfig()
      cluster_config.securityConfig.identityConfig = (
          clusters.ParseIdentityConfigFile(dataproc, args.identity_config_file)
      )
      changed_fields.append(
          'config.security_config.identity_config.user_service_account_mapping'
      )
      has_changes = True

    if not has_changes:
      raise exceptions.ArgumentError(no_update_error_msg)

    cluster = dataproc.messages.Cluster(
        config=cluster_config,
        clusterName=cluster_ref.clusterName,
        labels=labels,
        projectId=cluster_ref.projectId)

    request = dataproc.messages.DataprocProjectsRegionsClustersPatchRequest(
        clusterName=cluster_ref.clusterName,
        region=cluster_ref.region,
        projectId=cluster_ref.projectId,
        cluster=cluster,
        updateMask=','.join(changed_fields),
        requestId=util.GetUniqueId())

    if args.graceful_decommission_timeout is not None:
      request.gracefulDecommissionTimeout = (
          six.text_type(args.graceful_decommission_timeout) + 's')

    operation = dataproc.client.projects_regions_clusters.Patch(request)

    if args.async_:
      log.status.write(
          'Updating [{0}] with operation [{1}].'.format(
              cluster_ref, operation.name))
      return

    util.WaitForOperation(
        dataproc,
        operation,
        message='Waiting for cluster update operation',
        timeout_s=args.timeout)

    request = dataproc.messages.DataprocProjectsRegionsClustersGetRequest(
        projectId=cluster_ref.projectId,
        region=cluster_ref.region,
        clusterName=cluster_ref.clusterName)
    cluster = dataproc.client.projects_regions_clusters.Get(request)
    log.UpdatedResource(cluster_ref)
    return cluster


def _FirstNonNone(first, second):
  return first if first is not None else second


def _AddAlphaArguments(parser, release_track):

  if release_track == base.ReleaseTrack.ALPHA:

    parser.add_argument(
        '--secondary-worker-standard-capacity-base',
        type=int,
        help="""
              The number of standard VMs in the Spot and Standard Mix
        feature.
              """,
    )


def _IsMultitenancyCluster(cluster) -> bool:
  """Checks if the cluster is a multi-tenant cluster.

  Args:
    cluster: The cluster configuration.

  Returns:
    True if the cluster is a multi-tenant cluster, False otherwise.
  """
  config = cluster.config
  if config and config.softwareConfig and config.softwareConfig.properties:
    props = config.softwareConfig.properties
    for prop in props.additionalProperties:
      if (
          prop.key == dataproc_constants.ENABLE_DYNAMIC_MULTI_TENANCY_PROPERTY
          and prop.value.lower() == 'true'
      ):
        return True
  return False


def _GetCurrentCluster(dataproc, cluster_ref):
  """Retrieves the current cluster configuration.

  Args:
    dataproc: The Dataproc API client.
    cluster_ref: The reference to the cluster.

  Returns:
    The current cluster configuration.
  """
  # This is used for labels and auxiliary_node_pool_configs
  get_cluster_request = (
      dataproc.messages.DataprocProjectsRegionsClustersGetRequest(
          projectId=cluster_ref.projectId,
          region=cluster_ref.region,
          clusterName=cluster_ref.clusterName,
      )
  )
  current_cluster = dataproc.client.projects_regions_clusters.Get(
      get_cluster_request
  )
  return current_cluster