HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/394/lib/googlecloudsdk/command_lib/dataproc/clusters.py
# -*- coding: utf-8 -*- #
# Copyright 2015 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilities for building the dataproc clusters CLI."""

import collections
import re
import textwrap
from typing import Dict, List
from apitools.base.py import encoding
from googlecloudsdk.api_lib.compute import utils as api_utils
from googlecloudsdk.api_lib.dataproc import compute_helpers
from googlecloudsdk.api_lib.dataproc import constants
from googlecloudsdk.api_lib.dataproc import exceptions
from googlecloudsdk.api_lib.dataproc import storage_helpers
from googlecloudsdk.api_lib.dataproc import util
from googlecloudsdk.calliope import actions
from googlecloudsdk.calliope import arg_parsers
from googlecloudsdk.command_lib.compute.instances import flags as instances_flags
from googlecloudsdk.command_lib.dataproc import flags
from googlecloudsdk.command_lib.kms import resource_args as kms_resource_args
from googlecloudsdk.command_lib.util.apis import arg_utils
from googlecloudsdk.command_lib.util.args import labels_util
from googlecloudsdk.core import log
from googlecloudsdk.core import properties
from googlecloudsdk.core import yaml
from googlecloudsdk.core.console import console_io
from googlecloudsdk.core.util import times
import six


GENERATED_LABEL_PREFIX = 'goog-dataproc-'

VALID_DISK_TYPES = (
    'hyperdisk-balanced',
    'hyperdisk-extreme',
    'hyperdisk-ml',
    'hyperdisk-throughput',
)

ALLOWED_DISK_CONFIG_KEYSET = frozenset({'type', 'size', 'iops', 'throughput'})


# beta is unused but still useful when we add new beta features
# pylint: disable=unused-argument
def ArgsForClusterRef(
    parser,
    dataproc,
    beta=False,
    alpha=False,
    include_deprecated=True,
    include_ttl_config=False,
    include_gke_platform_args=False,
    include_driver_pool_args=False,
):
  """Register flags for creating a dataproc cluster.

  Args:
    parser: The argparse.ArgParser to configure with dataproc cluster arguments.
    dataproc: Dataproc object that contains client, messages, and resources.
    beta: whether or not this is a beta command (may affect flag visibility)
    alpha: whether or not this is a alpha command (may affect flag visibility)
    include_deprecated: whether deprecated flags should be included
    include_ttl_config: whether to include Scheduled Delete and Stop (TTL) args
    include_gke_platform_args: whether to include GKE-based cluster args
    include_driver_pool_args: whether to include driver pool cluster args
  """
  labels_util.AddCreateLabelsFlags(parser)
  # 30m is backend timeout + 5m for safety buffer.
  flags.AddTimeoutFlag(parser, default='35m')
  flags.AddZoneAndExcludedZonesFlags(parser, short_flags=include_deprecated)
  flags.AddComponentFlag(parser)

  platform_group = parser.add_argument_group(mutex=True)
  gce_platform_group = platform_group.add_argument_group(help="""\
    Compute Engine options for Dataproc clusters.
    """)

  instances_flags.AddTagsArgs(gce_platform_group)
  gce_platform_group.add_argument(
      '--metadata',
      type=arg_parsers.ArgDict(min_length=1),
      action='append',
      default=None,
      help=(
          'Metadata to be made available to the guest operating system '
          'running on the instances'
      ),
      metavar='KEY=VALUE',
  )
  gce_platform_group.add_argument(
      '--resource-manager-tags',
      type=arg_parsers.ArgDict(min_length=1),
      action='append',
      default=None,
      help=(
          'Specifies a list of resource manager tags to apply to each cluster'
          ' node (master and worker nodes). '
      ),
      metavar='KEY=VALUE',
  )

  # Either allow creating a single node cluster (--single-node), or specifying
  # the number of workers in the multi-node cluster (--num-workers and
  # --num-secondary-workers)
  node_group = parser.add_argument_group(mutex=True)  # Mutually exclusive
  node_group.add_argument(
      '--single-node',
      action='store_true',
      help="""\
      Create a single node cluster.

      A single node cluster has all master and worker components.
      It cannot have any separate worker nodes. If this flag is not
      specified, a cluster with separate workers is created.
      """,
  )
  # Not mutually exclusive
  worker_group = node_group.add_argument_group(help='Multi-node cluster flags')
  worker_group.add_argument(
      '--num-workers',
      type=int,
      help=(
          'The number of worker nodes in the cluster. Defaults to '
          'server-specified.'
      ),
  )

  min_workers = worker_group.add_argument_group(mutex=True)
  min_workers.add_argument(
      '--min-num-workers',
      type=int,
      help=(
          'Minimum number of primary worker nodes to provision for cluster'
          ' creation to succeed.'
      ),
  )
  min_workers.add_argument(
      '--min-worker-fraction',
      type=float,
      hidden=True,
      metavar='[0-1]',
      help=(
          'Minimum fraction of worker nodes required to create the cluster.'
          ' If it is not met, cluster creation will fail. Must be a decimal'
          ' value between 0 and 1. The number of required workers will be'
          ' calculated by ceil(min-worker-fraction * num_workers).'
      ),
  )
  worker_group.add_argument(
      '--secondary-worker-type',
      metavar='TYPE',
      choices=['preemptible', 'non-preemptible', 'spot'],
      default='preemptible',
      help='The type of the secondary worker group.',
  )
  num_secondary_workers = worker_group.add_argument_group(mutex=True)
  num_secondary_workers.add_argument(
      '--num-preemptible-workers',
      action=actions.DeprecationAction(
          '--num-preemptible-workers',
          warn=(
              'The `--num-preemptible-workers` flag is deprecated. '
              'Use the `--num-secondary-workers` flag instead.'
          ),
      ),
      type=int,
      hidden=True,
      help='The number of preemptible worker nodes in the cluster.',
  )
  num_secondary_workers.add_argument(
      '--num-secondary-workers',
      type=int,
      help='The number of secondary worker nodes in the cluster.',
  )

  master_machine_type_group = parser.add_argument_group(mutex=True)
  master_machine_type_group.add_argument(
      '--master-machine-type',
      help=(
          'The type of machine to use for the master. Defaults to '
          'server-specified.'
      ),
  )
  master_machine_type_group.add_argument(
      '--master-machine-types',
      help=(
          'Types of machines with optional rank for master nodes to use. '
          'Defaults to server-specified.'
          'eg. --master-machine-types="type=e2-standard-8,type=t2d-standard-8,rank=0"'
      ),
      metavar='type=MACHINE_TYPE[,type=MACHINE_TYPE...][,rank=RANK]',
      type=ArgMultiValueDict(),
      hidden=True,
      action=arg_parsers.FlattenAction(),
  )

  worker_machine_type_group = parser.add_argument_group(mutex=True)
  worker_machine_type_group.add_argument(
      '--worker-machine-type',
      help=(
          'The type of machine to use for primary workers. Defaults to '
          'server-specified.'
      ),
  )
  worker_machine_type_group.add_argument(
      '--worker-machine-types',
      help=(
          '[Machine'
          ' types](https://cloud.google.com/dataproc/docs/concepts/compute/supported-machine-types)'
          ' for primary worker nodes to use with optional rank. A lower rank'
          ' number is given higher preference. Based on availablilty, Dataproc'
          ' tries to create primary worker VMs using the worker machine type'
          ' with the lowest rank, and then tries to use machine types with'
          ' higher ranks as necessary. Machine types with the same rank are'
          ' given the same preference. Example use:'
          ' --worker-machine-types="type=e2-standard-8,type=n2-standard-8,rank=0".'
          ' For more information, see [Dataproc Flexible'
          ' VMs](https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/flexible-vms)'
      ),
      metavar='type=MACHINE_TYPE[,type=MACHINE_TYPE...][,rank=RANK]',
      type=ArgMultiValueDict(),
      action=arg_parsers.FlattenAction(),
  )

  parser.add_argument(
      '--min-secondary-worker-fraction',
      help=(
          'Minimum fraction of secondary worker nodes required to create the'
          ' cluster. If it is not met, cluster creation will fail. Must be a'
          ' decimal value between 0 and 1. The number of required secondary'
          ' workers is calculated by ceil(min-secondary-worker-fraction *'
          ' num_secondary_workers). Defaults to 0.0001.'
      ),
      type=float,
  )
  kms_resource_args.AddKmsKeyResourceArg(parser, 'cluster', name='--kms-key')

  parser.add_argument(
      '--secondary-worker-standard-capacity-base',
      type=int,
      help=(
          'This flag sets the base number of Standard VMs to use for [secondary'
          ' workers](https://cloud.google.com/dataproc/docs/concepts/compute/secondary-vms#preemptible_and_non-preemptible_secondary_workers).'
          ' Dataproc will create only standard VMs until it reaches this'
          ' number, then it will mix Spot and Standard VMs according to'
          " ``SECONDARY_WORKER_STANDARD_CAPACITY_PERCENT_ABOVE_BASE''."
      ),
  )

  parser.add_argument(
      '--secondary-worker-standard-capacity-percent-above-base',
      type=int,
      help=(
          'When combining Standard and Spot VMs for'
          ' [secondary-workers](https://cloud.google.com/dataproc/docs/concepts/compute/secondary-vms#preemptible_and_non-preemptible_secondary_workers)'
          ' once the number of Standard VMs specified by'
          " ``SECONDARY_WORKER_STANDARD_CAPACITY_BASE'' has been used, this"
          ' flag specifies the percentage of the total number of additional'
          ' Standard VMs secondary workers will use. Spot VMs will be used for'
          ' the remaining percentage.'
      ),
  )

  parser.add_argument(
      '--secondary-worker-machine-types',
      help=(
          'Types of machines with optional rank for secondary workers to use. '
          'Defaults to server-specified.'
          'eg. --secondary-worker-machine-types="type=e2-standard-8,type=t2d-standard-8,rank=0"'
      ),
      metavar='type=MACHINE_TYPE[,type=MACHINE_TYPE...][,rank=RANK]',
      type=ArgMultiValueDict(),
      action=arg_parsers.FlattenAction(),
  )

  parser.add_argument(
      '--cluster-type',
      metavar='TYPE',
      choices=['standard', 'single-node', 'zero-scale'],
      help='The type of cluster.',
  )

  parser.add_argument(
      '--tier',
      metavar='TIER',
      choices=['premium', 'standard'],
      help='Cluster tier',
  )

  image_parser = parser.add_mutually_exclusive_group()
  # TODO(b/73291743): Add external doc link to --image
  image_parser.add_argument(
      '--image',
      metavar='IMAGE',
      help=(
          'The custom image used to create the cluster. It can '
          'be the image name, the image URI, or the image family URI, which '
          'selects the latest image from the family.'
      ),
  )
  image_parser.add_argument(
      '--image-version',
      metavar='VERSION',
      help=(
          'The image version to use for the cluster. Defaults to the '
          'latest version.'
      ),
  )
  parser.add_argument(
      '--bucket',
      help="""\
      The Google Cloud Storage bucket to use by default to stage job
      dependencies, miscellaneous config files, and job driver console output
      when using this cluster.
      """,
  )
  parser.add_argument(
      '--temp-bucket',
      help="""\
      The Google Cloud Storage bucket to use by default to store
      ephemeral cluster and jobs data, such as Spark and MapReduce history files.
      """,
  )

  netparser = gce_platform_group.add_argument_group(mutex=True)
  netparser.add_argument(
      '--network',
      help="""\
      The Compute Engine network that the VM instances of the cluster will be
      part of. This is mutually exclusive with --subnet. If neither is
      specified, this defaults to the "default" network.
      """,
  )
  netparser.add_argument(
      '--subnet',
      help="""\
      Specifies the subnet that the cluster will be part of. This is mutally
      exclusive with --network.
      """,
  )
  parser.add_argument(
      '--num-worker-local-ssds',
      type=int,
      help='The number of local SSDs to attach to each worker in a cluster.',
  )
  parser.add_argument(
      '--num-master-local-ssds',
      type=int,
      help='The number of local SSDs to attach to the master in a cluster.',
  )
  secondary_worker_local_ssds = parser.add_argument_group(mutex=True)
  secondary_worker_local_ssds.add_argument(
      '--num-preemptible-worker-local-ssds',
      type=int,
      hidden=True,
      action=actions.DeprecationAction(
          '--num-preemptible-worker-local-ssds',
          warn=(
              'The `--num-preemptible-worker-local-ssds` flag is deprecated. '
              'Use the `--num-secondary-worker-local-ssds` flag instead.'
          ),
      ),
      help="""\
      The number of local SSDs to attach to each secondary worker in
      a cluster.
      """,
  )
  secondary_worker_local_ssds.add_argument(
      '--num-secondary-worker-local-ssds',
      type=int,
      help="""\
      The number of local SSDs to attach to each preemptible worker in
      a cluster.
      """,
  )
  parser.add_argument(
      '--master-local-ssd-interface',
      help="""\
      Interface to use to attach local SSDs to master node(s) in a cluster.
      """,
  )
  parser.add_argument(
      '--worker-local-ssd-interface',
      help="""\
      Interface to use to attach local SSDs to each worker in a cluster.
      """,
  )
  parser.add_argument(
      '--secondary-worker-local-ssd-interface',
      help="""\
      Interface to use to attach local SSDs to each secondary worker
      in a cluster.
      """,
  )
  parser.add_argument(
      '--initialization-actions',
      type=arg_parsers.ArgList(min_length=1),
      metavar='CLOUD_STORAGE_URI',
      help=(
          'A list of Google Cloud Storage URIs of '
          'executables to run on each node in the cluster.'
      ),
  )
  parser.add_argument(
      '--initialization-action-timeout',
      type=arg_parsers.Duration(),
      metavar='TIMEOUT',
      default='10m',
      help=(
          'The maximum duration of each initialization action. See '
          '$ gcloud topic datetimes for information on duration formats.'
      ),
  )
  parser.add_argument(
      '--num-masters',
      type=arg_parsers.CustomFunctionValidator(
          lambda n: int(n) in [1, 3],
          'Number of masters must be 1 (Standard) or 3 (High Availability)',
          parser=arg_parsers.BoundedInt(1, 3),
      ),
      help="""\
      The number of master nodes in the cluster.

      Number of Masters | Cluster Mode
      --- | ---
      1 | Standard
      3 | High Availability
      """,
  )
  parser.add_argument(
      '--properties',
      type=arg_parsers.ArgDict(),
      action=arg_parsers.UpdateAction,
      default={},
      metavar='PREFIX:PROPERTY=VALUE',
      help="""\
Specifies configuration properties for installed packages, such as Hadoop
and Spark.

Properties are mapped to configuration files by specifying a prefix, such as
"core:io.serializations". The following are supported prefixes and their
mappings:

Prefix | File | Purpose of file
--- | --- | ---
capacity-scheduler | capacity-scheduler.xml | Hadoop YARN Capacity Scheduler configuration
core | core-site.xml | Hadoop general configuration
distcp | distcp-default.xml | Hadoop Distributed Copy configuration
hadoop-env | hadoop-env.sh | Hadoop specific environment variables
hdfs | hdfs-site.xml | Hadoop HDFS configuration
hive | hive-site.xml | Hive configuration
mapred | mapred-site.xml | Hadoop MapReduce configuration
mapred-env | mapred-env.sh | Hadoop MapReduce specific environment variables
pig | pig.properties | Pig configuration
spark | spark-defaults.conf | Spark configuration
spark-env | spark-env.sh | Spark specific environment variables
yarn | yarn-site.xml | Hadoop YARN configuration
yarn-env | yarn-env.sh | Hadoop YARN specific environment variables

See https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/cluster-properties
for more information.

""",
  )
  gce_platform_group.add_argument(
      '--service-account',
      help='The Google Cloud IAM service account to be authenticated as.',
  )
  gce_platform_group.add_argument(
      '--scopes',
      type=arg_parsers.ArgList(min_length=1),
      metavar='SCOPE',
      help="""\
Specifies scopes for the node instances. Multiple SCOPEs can be specified,
separated by commas.
Examples:

  $ {{command}} example-cluster --scopes https://www.googleapis.com/auth/bigtable.admin

  $ {{command}} example-cluster --scopes sqlservice,bigquery

The following *minimum scopes* are necessary for the cluster to function
properly and are always added, even if not explicitly specified:

  {minimum_scopes}

If the `--scopes` flag is not specified, the following *default scopes*
are also included:

  {additional_scopes}

If you want to enable all scopes use the 'cloud-platform' scope.

{scopes_help}
""".format(
          minimum_scopes='\n  '.join(constants.MINIMUM_SCOPE_URIS),
          additional_scopes='\n  '.join(
              constants.ADDITIONAL_DEFAULT_SCOPE_URIS
          ),
          scopes_help=compute_helpers.SCOPES_HELP,
      ),
  )

  if include_deprecated:
    _AddDiskArgsDeprecated(parser, include_driver_pool_args)
  else:
    _AddDiskArgs(parser, include_driver_pool_args)

  ip_address_parser = parser.add_mutually_exclusive_group()

  # --no-address is an exception to the no negative-flag style guildline to be
  # consistent with gcloud compute instances create --no-address
  ip_address_parser.add_argument(
      '--no-address',
      action='store_true',
      help="""\
      If provided, the instances in the cluster will not be assigned external
      IP addresses.

      If omitted, then the Dataproc service will apply a default policy to determine if each instance in the cluster gets an external IP address or not.

      Note: Dataproc VMs need access to the Dataproc API. This can be achieved
      without external IP addresses using Private Google Access
      (https://cloud.google.com/compute/docs/private-google-access).
      """,
  )

  ip_address_parser.add_argument(
      '--public-ip-address',
      action='store_true',
      help="""\
      If provided, cluster instances are assigned external IP addresses.

      If omitted, the Dataproc service applies a default policy to determine
      whether or not each instance in the cluster gets an external IP address.

      Note: Dataproc VMs need access to the Dataproc API. This can be achieved
      without external IP addresses using Private Google Access
      (https://cloud.google.com/compute/docs/private-google-access).
      """,
  )

  parser.add_argument(
      '--private-ipv6-google-access-type',
      choices=['inherit-subnetwork', 'outbound', 'bidirectional'],
      help="""\
      The private IPv6 Google access type for the cluster.
      """,
  )

  boot_disk_type_detailed_help = """\
      The type of the boot disk. The value must be `pd-balanced`,
      `pd-ssd`, or `pd-standard`.
      """
  parser.add_argument(
      '--master-boot-disk-type', help=boot_disk_type_detailed_help
  )
  parser.add_argument(
      '--worker-boot-disk-type', help=boot_disk_type_detailed_help
  )
  secondary_worker_boot_disk_type = parser.add_argument_group(mutex=True)
  secondary_worker_boot_disk_type.add_argument(
      '--preemptible-worker-boot-disk-type',
      help=boot_disk_type_detailed_help,
      hidden=True,
      action=actions.DeprecationAction(
          '--preemptible-worker-boot-disk-type',
          warn=(
              'The `--preemptible-worker-boot-disk-type` flag is deprecated. '
              'Use the `--secondary-worker-boot-disk-type` flag instead.'
          ),
      ),
  )
  secondary_worker_boot_disk_type.add_argument(
      '--secondary-worker-boot-disk-type', help=boot_disk_type_detailed_help
  )

  boot_disk_provisioned_iops_detailed_help = """\
      Indicates the [IOPS](https://cloud.google.com/compute/docs/disks/hyperdisks#iops)
      to provision for the disk. This sets the limit for disk I/O operations per
      second. This is only supported if the bootdisk type is
      [hyperdisk-balanced](https://cloud.google.com/compute/docs/disks/hyperdisks).
      """

  parser.add_argument(
      '--master-boot-disk-provisioned-iops',
      help=boot_disk_provisioned_iops_detailed_help,
      type=int,
  )
  parser.add_argument(
      '--worker-boot-disk-provisioned-iops',
      help=boot_disk_provisioned_iops_detailed_help,
      type=int,
  )
  parser.add_argument(
      '--secondary-worker-boot-disk-provisioned-iops',
      help=boot_disk_provisioned_iops_detailed_help,
      type=int,
      hidden=True,
  )

  boot_disk_provisioned_throughput_detailed_help = """\
      Indicates the [throughput](https://cloud.google.com/compute/docs/disks/hyperdisks#throughput)
      to provision for the disk. This sets the limit for throughput in MiB per
      second. This is only supported if the bootdisk type is
      [hyperdisk-balanced](https://cloud.google.com/compute/docs/disks/hyperdisks).
      """

  parser.add_argument(
      '--master-boot-disk-provisioned-throughput',
      help=boot_disk_provisioned_throughput_detailed_help,
      type=int,
  )
  parser.add_argument(
      '--worker-boot-disk-provisioned-throughput',
      help=boot_disk_provisioned_throughput_detailed_help,
      type=int,
  )
  parser.add_argument(
      '--secondary-worker-boot-disk-provisioned-throughput',
      help=boot_disk_provisioned_throughput_detailed_help,
      type=int,
      hidden=True,
  )

  attached_disk_detailed_help = """\
      A list of disk configurations to attach to nodes.
      Each configuration should be a string with the format:
      type=<disk_type>,(optional)size=<size>,(optional)iops=<iops>,
      (optional)throughput=<throughput>, separated by semicolon.

      Allowed disk types are: hyperdisk-balanced, hyperdisk-extreme,
      hyperdisk-ml, hyperdisk-throughput.

      Example:
      type=hyperdisk-balanced,iops=1000,throughput=500,size=100G;type=hyperdisk-throughput,size=2000G'
      """

  parser.add_argument(
      '--master-attached-disks',
      help=attached_disk_detailed_help,
      type=DiskConfigParser(),
      hidden=True,
  )

  parser.add_argument(
      '--worker-attached-disks',
      help=attached_disk_detailed_help,
      type=DiskConfigParser(),
      hidden=True,
  )

  parser.add_argument(
      '--secondary-worker-attached-disks',
      help=attached_disk_detailed_help,
      type=DiskConfigParser(),
      hidden=True,
  )

  # Note: include_driver_pool_args is only supported in the default universe.
  # The check occurs in create.py.
  if include_driver_pool_args:
    flags.AddDriverPoolId(parser)
    parser.add_argument(
        '--driver-pool-boot-disk-type', help=boot_disk_type_detailed_help
    )
    parser.add_argument(
        '--driver-pool-size',
        type=int,
        help='The size of the cluster driver pool.',
    )
    parser.add_argument(
        '--driver-pool-machine-type',
        help=(
            'The type of machine to use for the cluster driver pool nodes.'
            ' Defaults to server-specified.'
        ),
    )
    parser.add_argument(
        '--num-driver-pool-local-ssds',
        type=int,
        help="""\
        The number of local SSDs to attach to each cluster driver pool node.
        """,
    )
    parser.add_argument(
        '--driver-pool-local-ssd-interface',
        help="""\
        Interface to use to attach local SSDs to cluster driver pool node(s).
        """,
    )

  parser.add_argument(
      '--enable-component-gateway',
      action='store_true',
      help=arg_parsers.UniverseHelpText(
          default=(
              'Enable access to the web UIs of selected components on the'
              ' cluster through the component gateway.'
          ),
          universe_help=(
              'Component gateway is only supported in the default universe'
          ),
      ),
  )
  parser.add_argument(
      '--node-group',
      help="""\
        The name of the sole-tenant node group to create the cluster on. Can be
        a short name ("node-group-name") or in the format
        "projects/{project-id}/zones/{zone}/nodeGroups/{node-group-name}".
        """,
  )
  parser.add_argument(
      '--shielded-secure-boot',
      action='store_true',
      help="""\
        The cluster's VMs will boot with secure boot enabled.
        """,
  )
  parser.add_argument(
      '--shielded-vtpm',
      action='store_true',
      help="""\
        The cluster's VMs will boot with the TPM (Trusted Platform Module) enabled.
        A TPM is a hardware module that can be used for different security
        operations, such as remote attestation, encryption, and sealing of keys.
        """,
  )
  parser.add_argument(
      '--shielded-integrity-monitoring',
      action='store_true',
      help="""\
        Enables monitoring and attestation of the boot integrity of the
        cluster's VMs. vTPM (virtual Trusted Platform Module) must also be
        enabled. A TPM is a hardware module that can be used for different
        security operations, such as remote attestation, encryption, and sealing
        of keys.
        """,
  )
  if not beta:
    parser.add_argument(
        '--confidential-compute',
        action='store_true',
        help="""\
        Enables Confidential VM. See https://cloud.google.com/compute/confidential-vm/docs for more information.
        Note that Confidential VM can only be enabled when the machine types
        are N2D (https://cloud.google.com/compute/docs/machine-types#n2d_machine_types)
        and the image is SEV Compatible.
        """,
    )
  metastore_group = parser.add_argument_group(mutex=True)  # Mutually exclusive
  metastore_group.add_argument(
      '--dataproc-metastore',
      help=arg_parsers.UniverseHelpText(
          default="""\
          Specify the name of a Dataproc Metastore service to be used as an
          external metastore in the format:
          "projects/{project-id}/locations/{region}/services/{service-name}".
          """,
          universe_help="""\
          Dataproc Metastore is only supported in the default universe.
          """,
      ),
  )
  # Not mutually exclusive
  if alpha or beta:
    bqms_group = metastore_group.add_argument_group(help='BQMS flags')
    bqms_group.add_argument(
        '--bigquery-metastore-project-id',
        help="""\
      The project ID of the  BigQuery metastore database to be used as an external metastore.
        """,
    )
    bqms_group.add_argument(
        '--bigquery-metastore-database-location',
        help="""\
      Location of the  BigQuery metastore database to be used as an external metastore.
        """,
    )
    bqms_group.add_argument(
        '--bigquery-metastore',
        action='store_true',
        help="""\
        Indicates that BigQuery metastore is to be used.
        """,
    )

  parser.add_argument(
      '--enable-node-groups',
      hidden=True,
      help="""\
      Create cluster nodes using Dataproc NodeGroups. All the required VMs will be created using GCE MIG.
      """,
      type=bool,
  )

  autoscaling_group = parser.add_argument_group()
  flags.AddAutoscalingPolicyResourceArgForCluster(
      autoscaling_group, api_version='v1'
  )

  if include_ttl_config:
    auto_delete_idle_group = parser.add_mutually_exclusive_group()
    auto_delete_idle_group.add_argument(
        '--max-idle',
        type=arg_parsers.Duration(),
        hidden=True,
        help="""\
          The duration after the last job completes to autto-delete the
          cluster, such as "2h" or "1d".
          See $ gcloud topic datetimes for information on duration formats.
          """,
    )
    auto_delete_idle_group.add_argument(
        '--delete-max-idle',
        type=arg_parsers.Duration(),
        help="""\
          The duration after the last job completes to auto-delete the
          cluster, such as "2h" or "1d".
          See $ gcloud topic datetimes for information on duration formats.
          """,
    )

    auto_delete_group = parser.add_mutually_exclusive_group()
    auto_delete_group.add_argument(
        '--max-age',
        type=arg_parsers.Duration(),
        hidden=True,
        help="""\
          The lifespan of the cluster, with auto-deletion upon completion,
          such as "2h" or "1d".
          See $ gcloud topic datetimes for information on duration formats.
          """,
    )

    auto_delete_group.add_argument(
        '--expiration-time',
        type=arg_parsers.Datetime.Parse,
        hidden=True,
        help="""\
          The time when the cluster will be auto-deleted, such as
          "2017-08-29T18:52:51.142Z." See $ gcloud topic datetimes for
          information on time formats.
          """,
    )

    auto_delete_group.add_argument(
        '--delete-max-age',
        type=arg_parsers.Duration(),
        help="""\
          The lifespan of the cluster, with auto-deletion upon completion,
          such as "2h" or "1d".
          See $ gcloud topic datetimes for information on duration formats.
          """,
    )
    auto_delete_group.add_argument(
        '--delete-expiration-time',
        type=arg_parsers.Datetime.Parse,
        help="""\
          The time when the cluster will be auto-deleted, such as
          "2017-08-29T18:52:51.142Z."
          See $ gcloud topic datetimes for information on time formats.
          """,
    )

    parser.add_argument(
        '--stop-max-idle',
        type=arg_parsers.Duration(),
        help="""\
          The duration after the last job completes to auto-stop the cluster,
          such as "2h" or "1d".
          See $ gcloud topic datetimes for information on duration formats.
          """,
    )
    auto_stop_group = parser.add_mutually_exclusive_group()
    auto_stop_group.add_argument(
        '--stop-max-age',
        type=arg_parsers.Duration(),
        help="""\
          The lifespan of the cluster, with auto-stop upon completion,
          such as "2h" or "1d".
          See $ gcloud topic datetimes for information on duration formats.
          """,
    )
    auto_stop_group.add_argument(
        '--stop-expiration-time',
        type=arg_parsers.Datetime.Parse,
        help="""\
          The time when the cluster will be auto-stopped, such as
          "2017-08-29T18:52:51.142Z."
          See $ gcloud topic datetimes for information on time formats.
          """,
    )

  AddKerberosGroup(parser)

  if not beta:
    AddSecureMultiTenancyGroup(parser)

  flags.AddMinCpuPlatformArgs(parser, include_driver_pool_args)

  _AddAcceleratorArgs(parser, include_driver_pool_args)

  if not beta:
    _AddMetricConfigArgs(parser, dataproc)

  AddReservationAffinityGroup(
      gce_platform_group,
      group_text='Specifies the reservation for the instance.',
      affinity_text='The type of reservation for the instance.',
  )
  # Note: include_gke_platform_args is only supported in the default universe.
  # The check occurs in create.py.
  if include_gke_platform_args:
    gke_based_cluster_group = platform_group.add_argument_group(
        hidden=True,
        help=arg_parsers.UniverseHelpText(
            default="""\
                    Options for creating a GKE-based Dataproc cluster. Specifying any of these
                    will indicate that this cluster is intended to be a GKE-based cluster.
                    These options are mutually exclusive with GCE-based options.
                    """,
            universe_help="""\
                          GKE-based clusters are only supported in the default universe.
                          """,
        ),
    )
    gke_based_cluster_group.add_argument(
        '--gke-cluster',
        hidden=True,
        help="""\
            Required for GKE-based clusters. Specify the name of the GKE cluster to
            deploy this GKE-based Dataproc cluster to. This should be the short name
            and not the full path name.
            """,
    )
    gke_based_cluster_group.add_argument(
        '--gke-cluster-namespace',
        hidden=True,
        help="""\
            Optional. Specify the name of the namespace to deploy Dataproc system
            components into. This namespace does not need to already exist.
            """,
    )


def _GetValidMetricSourceChoices(dataproc):
  metric_sources_enum = dataproc.messages.Metric.MetricSourceValueValuesEnum
  return [
      arg_utils.ChoiceToEnumName(n)
      for n in sorted(metric_sources_enum.names())
      if n != 'METRIC_SOURCE_UNSPECIFIED'
  ]


def _AddMetricConfigArgs(parser, dataproc):
  """Adds DataprocMetricConfig related args to the parser."""
  metric_overrides_detailed_help = """
  List of metrics that override the default metrics enabled for the metric
  sources. Any of the
  [available OSS metrics](https://cloud.google.com/dataproc/docs/guides/monitoring#available_oss_metrics)
  and all Spark metrics, can be listed for collection as a metric override.
  Override metric values are case sensitive, and must be provided, if
  appropriate, in CamelCase format, for example:

  *sparkHistoryServer:JVM:Memory:NonHeapMemoryUsage.committed*
  *hiveserver2:JVM:Memory:NonHeapMemoryUsage.used*

  Only the specified overridden metrics will be collected from a given metric
  source. For example, if one or more *spark:executive* metrics are listed as
  metric overrides, other *SPARK* metrics will not be collected. The collection
  of default OSS metrics from other metric sources is unaffected. For example,
  if both *SPARK* and *YARN* metric sources are enabled, and overrides are
  provided for Spark metrics only, all default YARN metrics will be collected.

  The source of the specified metric override must be enabled. For example,
  if one or more *spark:driver* metrics are provided as metric overrides,
  the spark metric source must be enabled (*--metric-sources=spark*).
  """

  metric_config_group = parser.add_group()
  metric_config_group.add_argument(
      '--metric-sources',
      metavar='METRIC_SOURCE',
      type=arg_parsers.ArgList(
          arg_utils.ChoiceToEnumName,
          choices=_GetValidMetricSourceChoices(dataproc),
      ),
      required=True,
      help=(
          'Specifies a list of cluster [Metric'
          ' Sources](https://cloud.google.com/dataproc/docs/guides/monitoring#available_oss_metrics)'
          ' to collect custom metrics.'
      ),
  )
  metric_overrides_group = metric_config_group.add_mutually_exclusive_group()
  metric_overrides_group.add_argument(
      '--metric-overrides',
      type=arg_parsers.ArgList(),
      action=arg_parsers.UpdateAction,
      metavar='METRIC_SOURCE:INSTANCE:GROUP:METRIC',
      help=metric_overrides_detailed_help,
  )
  metric_overrides_group.add_argument(
      '--metric-overrides-file',
      help="""\
      Path to a file containing list of Metrics that override the default metrics enabled for the metric sources.
      The path can be a Cloud Storage URL (example: `gs://path/to/file`) or a local file system path.
      """,
  )


def _AddAcceleratorArgs(parser, include_driver_pool_args=False):
  """Adds accelerator related args to the parser."""
  accelerator_help_fmt = """\
      Attaches accelerators, such as GPUs, to the {instance_type}
      instance(s).
      """
  accelerator_help_fmt += """
      *type*::: The specific type of accelerator to attach to the instances,
      such as `nvidia-tesla-t4` for NVIDIA T4. Use `gcloud compute
      accelerator-types list` to display available accelerator types.

      *count*::: The number of accelerators to attach to each instance. The default value is 1.
      """

  parser.add_argument(
      '--master-accelerator',
      type=arg_parsers.ArgDict(
          spec={
              'type': str,
              'count': int,
          }
      ),
      metavar='type=TYPE,[count=COUNT]',
      help=accelerator_help_fmt.format(instance_type='master'),
  )

  parser.add_argument(
      '--worker-accelerator',
      type=arg_parsers.ArgDict(
          spec={
              'type': str,
              'count': int,
          }
      ),
      metavar='type=TYPE,[count=COUNT]',
      help=accelerator_help_fmt.format(instance_type='worker'),
  )

  secondary_worker_accelerator = parser.add_argument_group(mutex=True)

  secondary_worker_accelerator.add_argument(
      '--secondary-worker-accelerator',
      type=arg_parsers.ArgDict(
          spec={
              'type': str,
              'count': int,
          }
      ),
      metavar='type=TYPE,[count=COUNT]',
      help=accelerator_help_fmt.format(instance_type='secondary-worker'),
  )

  secondary_worker_accelerator.add_argument(
      '--preemptible-worker-accelerator',
      type=arg_parsers.ArgDict(
          spec={
              'type': str,
              'count': int,
          }
      ),
      metavar='type=TYPE,[count=COUNT]',
      help=accelerator_help_fmt.format(instance_type='preemptible-worker'),
      hidden=True,
      action=actions.DeprecationAction(
          '--preemptible-worker-accelerator',
          warn=(
              'The `--preemptible-worker-accelerator` flag is deprecated. '
              'Use the `--secondary-worker-accelerator` flag instead.'
          ),
      ),
  )
  if include_driver_pool_args:
    parser.add_argument(
        '--driver-pool-accelerator',
        type=arg_parsers.ArgDict(
            spec={
                'type': str,
                'count': int,
            }
        ),
        metavar='type=TYPE,[count=COUNT]',
        help=accelerator_help_fmt.format(instance_type='driver-pool'),
    )


def _AddDiskArgs(parser, include_driver_pool_args=False):
  """Adds disk related args to the parser."""
  boot_disk_size_detailed_help = """\
      The size of the boot disk. The value must be a
      whole number followed by a size unit of ``KB'' for kilobyte, ``MB''
      for megabyte, ``GB'' for gigabyte, or ``TB'' for terabyte. For example,
      `10GB` will produce a 10 gigabyte disk. The minimum boot disk size is 10 GB. Boot disk size must be a multiple of 1 GB.
      """
  parser.add_argument(
      '--master-boot-disk-size',
      type=arg_parsers.BinarySize(lower_bound='10GB'),
      help=boot_disk_size_detailed_help,
  )
  parser.add_argument(
      '--worker-boot-disk-size',
      type=arg_parsers.BinarySize(lower_bound='10GB'),
      help=boot_disk_size_detailed_help,
  )
  secondary_worker_boot_disk_size = parser.add_argument_group(mutex=True)
  secondary_worker_boot_disk_size.add_argument(
      '--preemptible-worker-boot-disk-size',
      type=arg_parsers.BinarySize(lower_bound='10GB'),
      help=boot_disk_size_detailed_help,
      hidden=True,
      action=actions.DeprecationAction(
          '--preemptible-worker-boot-disk-size',
          warn=(
              'The `--preemptible-worker-boot-disk-size` flag is deprecated. '
              'Use the `--secondary-worker-boot-disk-size` flag instead.'
          ),
      ),
  )
  secondary_worker_boot_disk_size.add_argument(
      '--secondary-worker-boot-disk-size',
      type=arg_parsers.BinarySize(lower_bound='10GB'),
      help=boot_disk_size_detailed_help,
  )
  if include_driver_pool_args:
    parser.add_argument(
        '--driver-pool-boot-disk-size',
        type=arg_parsers.BinarySize(lower_bound='10GB'),
        help=boot_disk_size_detailed_help,
    )


def _AddDiskArgsDeprecated(parser, include_driver_pool_args=False):
  """Adds deprecated disk related args to the parser."""
  master_boot_disk_size = parser.add_mutually_exclusive_group()
  worker_boot_disk_size = parser.add_mutually_exclusive_group()

  # Deprecated, to be removed at a future date.
  master_boot_disk_size.add_argument(
      '--master-boot-disk-size-gb',
      action=actions.DeprecationAction(
          '--master-boot-disk-size-gb',
          warn=(
              'The `--master-boot-disk-size-gb` flag is deprecated. '
              'Use `--master-boot-disk-size` flag with "GB" after value.'
          ),
      ),
      type=int,
      hidden=True,
      help='Use `--master-boot-disk-size` flag with "GB" after value.',
  )
  worker_boot_disk_size.add_argument(
      '--worker-boot-disk-size-gb',
      action=actions.DeprecationAction(
          '--worker-boot-disk-size-gb',
          warn=(
              'The `--worker-boot-disk-size-gb` flag is deprecated. '
              'Use `--worker-boot-disk-size` flag with "GB" after value.'
          ),
      ),
      type=int,
      hidden=True,
      help='Use `--worker-boot-disk-size` flag with "GB" after value.',
  )

  boot_disk_size_detailed_help = """\
      The size of the boot disk. The value must be a
      whole number followed by a size unit of ``KB'' for kilobyte, ``MB''
      for megabyte, ``GB'' for gigabyte, or ``TB'' for terabyte. For example,
      ``10GB'' will produce a 10 gigabyte disk. The minimum size a boot disk
      can have is 10 GB. Disk size must be a multiple of 1 GB.
      """
  master_boot_disk_size.add_argument(
      '--master-boot-disk-size',
      type=arg_parsers.BinarySize(lower_bound='10GB'),
      help=boot_disk_size_detailed_help,
  )
  worker_boot_disk_size.add_argument(
      '--worker-boot-disk-size',
      type=arg_parsers.BinarySize(lower_bound='10GB'),
      help=boot_disk_size_detailed_help,
  )
  secondary_worker_boot_disk_size = parser.add_argument_group(mutex=True)
  secondary_worker_boot_disk_size.add_argument(
      '--preemptible-worker-boot-disk-size',
      type=arg_parsers.BinarySize(lower_bound='10GB'),
      help=boot_disk_size_detailed_help,
      hidden=True,
      action=actions.DeprecationAction(
          '--preemptible-worker-boot-disk-size',
          warn=(
              'The `--preemptible-worker-boot-disk-size` flag is deprecated. '
              'Use the `--secondary-worker-boot-disk-size` flag instead.'
          ),
      ),
  )
  secondary_worker_boot_disk_size.add_argument(
      '--secondary-worker-boot-disk-size',
      type=arg_parsers.BinarySize(lower_bound='10GB'),
      help=boot_disk_size_detailed_help,
  )
  if include_driver_pool_args:
    parser.add_argument(
        '--driver-pool-boot-disk-size',
        type=arg_parsers.BinarySize(lower_bound='10GB'),
        help=boot_disk_size_detailed_help,
    )


# DEPRECATED Beta release track should no longer be used, Google Cloud
# no longer supports it.
def BetaArgsForClusterRef(parser):
  """Register beta-only flags for creating a Dataproc cluster."""
  pass


def GetClusterConfig(
    args,
    dataproc,
    project_id,
    compute_resources,
    beta=False,
    alpha=False,
    include_deprecated=True,
    include_ttl_config=False,
    include_gke_platform_args=False,
):
  """Get dataproc cluster configuration.

  Args:
    args: Arguments parsed from argparse.ArgParser.
    dataproc: Dataproc object that contains client, messages, and resources
    project_id: Dataproc project ID
    compute_resources: compute resource for cluster
    beta: use BETA only features
    alpha: use ALPHA only features
    include_deprecated: whether to include deprecated args
    include_ttl_config: whether to include Scheduled Delete(TTL) args
    include_gke_platform_args: whether to include GKE-based cluster args

  Returns:
    cluster_config: Dataproc cluster configuration
  """
  master_accelerator_type = None
  worker_accelerator_type = None
  secondary_worker_accelerator_type = None
  driver_pool_accelerator_type = None

  if args.master_accelerator:
    if 'type' in args.master_accelerator.keys():
      master_accelerator_type = args.master_accelerator['type']
    else:
      raise exceptions.ArgumentError('master-accelerator missing type!')
    master_accelerator_count = args.master_accelerator.get('count', 1)

  if args.worker_accelerator:
    if 'type' in args.worker_accelerator.keys():
      worker_accelerator_type = args.worker_accelerator['type']
    else:
      raise exceptions.ArgumentError('worker-accelerator missing type!')
    worker_accelerator_count = args.worker_accelerator.get('count', 1)

  secondary_worker_accelerator = _FirstNonNone(
      args.secondary_worker_accelerator, args.preemptible_worker_accelerator
  )
  if secondary_worker_accelerator:
    if 'type' in secondary_worker_accelerator.keys():
      secondary_worker_accelerator_type = secondary_worker_accelerator['type']
    else:
      raise exceptions.ArgumentError(
          'secondary-worker-accelerator missing type!'
      )
    secondary_worker_accelerator_count = secondary_worker_accelerator.get(
        'count', 1
    )

  if args.min_worker_fraction and (
      args.min_worker_fraction < 0 or args.min_worker_fraction > 1
  ):
    raise exceptions.ArgumentError(
        '--min-worker-fraction must be between 0 and 1'
    )

  if hasattr(args, 'driver_pool_accelerator') and args.driver_pool_accelerator:
    if 'type' in args.driver_pool_accelerator.keys():
      driver_pool_accelerator_type = args.driver_pool_accelerator['type']
    else:
      raise exceptions.ArgumentError('driver-pool-accelerator missing type!')
    driver_pool_accelerator_count = args.driver_pool_accelerator.get('count', 1)

  # Resolve non-zonal GCE resources
  # We will let the server resolve short names of zonal resources because
  # if auto zone is requested, we will not know the zone before sending the
  # request
  image_ref = args.image and compute_resources.Parse(
      args.image, params={'project': project_id}, collection='compute.images'
  )
  network_ref = args.network and compute_resources.Parse(
      args.network,
      params={'project': project_id},
      collection='compute.networks',
  )
  subnetwork_ref = args.subnet and compute_resources.Parse(
      args.subnet,
      params={
          'project': project_id,
          'region': properties.VALUES.compute.region.GetOrFail,
      },
      collection='compute.subnetworks',
  )
  timeout_str = six.text_type(args.initialization_action_timeout) + 's'
  init_actions = [
      dataproc.messages.NodeInitializationAction(
          executableFile=exe, executionTimeout=timeout_str
      )
      for exe in (args.initialization_actions or [])
  ]
  # Increase the client timeout for each initialization action.
  args.timeout += args.initialization_action_timeout * len(init_actions)

  expanded_scopes = compute_helpers.ExpandScopeAliases(args.scopes)

  software_config = dataproc.messages.SoftwareConfig(
      imageVersion=args.image_version
  )

  if include_deprecated:
    master_boot_disk_size_gb = args.master_boot_disk_size_gb
  else:
    master_boot_disk_size_gb = None
  if args.master_boot_disk_size:
    master_boot_disk_size_gb = api_utils.BytesToGb(args.master_boot_disk_size)

  if include_deprecated:
    worker_boot_disk_size_gb = args.worker_boot_disk_size_gb
  else:
    worker_boot_disk_size_gb = None
  if args.worker_boot_disk_size:
    worker_boot_disk_size_gb = api_utils.BytesToGb(args.worker_boot_disk_size)

  secondary_worker_boot_disk_size_gb = api_utils.BytesToGb(
      _FirstNonNone(
          args.secondary_worker_boot_disk_size,
          args.preemptible_worker_boot_disk_size,
      )
  )

  driver_pool_boot_disk_size_gb = None
  if (
      hasattr(args, 'driver_pool_boot_disk_size')
      and args.driver_pool_boot_disk_size
  ):
    driver_pool_boot_disk_size_gb = api_utils.BytesToGb(
        args.driver_pool_boot_disk_size
    )

  if args.single_node or args.num_workers == 0:
    # Explicitly specifying --num-workers=0 gives you a single node cluster,
    # but if --num-workers is omitted, args.num_workers is None (not 0), and
    # this property will not be set
    args.properties[constants.ALLOW_ZERO_WORKERS_PROPERTY] = 'true'

  if args.enable_node_groups is not None:
    args.properties[constants.ENABLE_NODE_GROUPS_PROPERTY] = str(
        args.enable_node_groups
    ).lower()

  if args.properties:
    software_config.properties = encoding.DictToAdditionalPropertyMessage(
        args.properties,
        dataproc.messages.SoftwareConfig.PropertiesValue,
        sort_items=True,
    )

  if args.components:
    software_config_cls = dataproc.messages.SoftwareConfig
    software_config.optionalComponents.extend(
        list(
            map(
                software_config_cls.OptionalComponentsValueListEntryValuesEnum,
                args.components,
            )
        )
    )

  gce_cluster_config = dataproc.messages.GceClusterConfig(
      networkUri=network_ref and network_ref.SelfLink(),
      subnetworkUri=subnetwork_ref and subnetwork_ref.SelfLink(),
      privateIpv6GoogleAccess=_GetPrivateIpv6GoogleAccess(
          dataproc, args.private_ipv6_google_access_type
      ),
      serviceAccount=args.service_account,
      serviceAccountScopes=expanded_scopes,
      zoneUri=properties.VALUES.compute.zone.GetOrFail(),
      autoZoneExcludeZoneUris=args.auto_zone_exclude_zones,
  )

  if args.public_ip_address:
    gce_cluster_config.internalIpOnly = not args.public_ip_address
  if args.no_address:
    gce_cluster_config.internalIpOnly = args.no_address

  reservation_affinity = GetReservationAffinity(args, dataproc)
  gce_cluster_config.reservationAffinity = reservation_affinity

  if args.tags:
    gce_cluster_config.tags = args.tags

  if args.resource_manager_tags:
    flat_tags = collections.OrderedDict()
    for entry in args.resource_manager_tags:
      for k, v in entry.items():
        flat_tags[k] = v
    gce_cluster_config.resourceManagerTags = (
        encoding.DictToAdditionalPropertyMessage(
            flat_tags,
            dataproc.messages.GceClusterConfig.ResourceManagerTagsValue,
        )
    )

  if args.metadata:
    flat_metadata = collections.OrderedDict()
    for entry in args.metadata:
      for k, v in entry.items():
        flat_metadata[k] = v
    gce_cluster_config.metadata = encoding.DictToAdditionalPropertyMessage(
        flat_metadata, dataproc.messages.GceClusterConfig.MetadataValue
    )

  master_accelerators = []
  if master_accelerator_type:
    master_accelerators.append(
        dataproc.messages.AcceleratorConfig(
            acceleratorTypeUri=master_accelerator_type,
            acceleratorCount=master_accelerator_count,
        )
    )
  worker_accelerators = []
  if worker_accelerator_type:
    worker_accelerators.append(
        dataproc.messages.AcceleratorConfig(
            acceleratorTypeUri=worker_accelerator_type,
            acceleratorCount=worker_accelerator_count,
        )
    )
  secondary_worker_accelerators = []
  if secondary_worker_accelerator_type:
    secondary_worker_accelerators.append(
        dataproc.messages.AcceleratorConfig(
            acceleratorTypeUri=secondary_worker_accelerator_type,
            acceleratorCount=secondary_worker_accelerator_count,
        )
    )
  driver_pool_accelerators = []
  if driver_pool_accelerator_type:
    driver_pool_accelerators.append(
        dataproc.messages.AcceleratorConfig(
            acceleratorTypeUri=driver_pool_accelerator_type,
            acceleratorCount=driver_pool_accelerator_count,
        )
    )

  cluster_config = dataproc.messages.ClusterConfig(
      configBucket=args.bucket,
      tempBucket=args.temp_bucket,
      clusterType=_GetCusterType(dataproc, args.cluster_type),
      clusterTier=_GetClusterTier(dataproc, args.tier),
      gceClusterConfig=gce_cluster_config,
      masterConfig=dataproc.messages.InstanceGroupConfig(
          numInstances=args.num_masters,
          imageUri=image_ref and image_ref.SelfLink(),
          machineTypeUri=args.master_machine_type,
          accelerators=master_accelerators,
          diskConfig=GetDiskConfig(
              dataproc,
              args.master_boot_disk_type,
              master_boot_disk_size_gb,
              args.num_master_local_ssds,
              args.master_local_ssd_interface,
              'Master',
              args.master_boot_disk_provisioned_iops,
              args.master_boot_disk_provisioned_throughput,
              args.master_attached_disks,
          ),
          minCpuPlatform=args.master_min_cpu_platform,
          instanceFlexibilityPolicy=GetInstanceFlexibilityPolicy(
              dataproc, None, args.master_machine_types, 'master'
          ),
      ),
      workerConfig=dataproc.messages.InstanceGroupConfig(
          numInstances=args.num_workers,
          minNumInstances=args.min_num_workers,
          imageUri=image_ref and image_ref.SelfLink(),
          machineTypeUri=args.worker_machine_type,
          accelerators=worker_accelerators,
          diskConfig=GetDiskConfig(
              dataproc,
              args.worker_boot_disk_type,
              worker_boot_disk_size_gb,
              args.num_worker_local_ssds,
              args.worker_local_ssd_interface,
              'Worker',
              args.worker_boot_disk_provisioned_iops,
              args.worker_boot_disk_provisioned_throughput,
              args.worker_attached_disks,
          ),
          minCpuPlatform=args.worker_min_cpu_platform,
          instanceFlexibilityPolicy=GetInstanceFlexibilityPolicy(
              dataproc, None, args.worker_machine_types, 'worker'
          ),
      ),
      initializationActions=init_actions,
      softwareConfig=software_config,
  )

  if args.min_worker_fraction:
    cluster_config.workerConfig.startupConfig = dataproc.messages.StartupConfig(
        requiredRegistrationFraction=args.min_worker_fraction,
    )

  if (
      args.kerberos_config_file
      or args.enable_kerberos
      or args.kerberos_root_principal_password_uri
  ):
    cluster_config.securityConfig = dataproc.messages.SecurityConfig()
    if args.kerberos_config_file:
      cluster_config.securityConfig.kerberosConfig = ParseKerberosConfigFile(
          dataproc, args.kerberos_config_file
      )
    else:
      kerberos_config = dataproc.messages.KerberosConfig()
      if args.enable_kerberos:
        kerberos_config.enableKerberos = args.enable_kerberos
      else:
        kerberos_config.enableKerberos = True
      if args.kerberos_root_principal_password_uri:
        kerberos_config.rootPrincipalPasswordUri = (
            args.kerberos_root_principal_password_uri
        )
        kerberos_kms_ref = args.CONCEPTS.kerberos_kms_key.Parse()
        if kerberos_kms_ref:
          kerberos_config.kmsKeyUri = kerberos_kms_ref.RelativeName()
      cluster_config.securityConfig.kerberosConfig = kerberos_config

  if not beta:
    if (
        constants.ENABLE_DYNAMIC_MULTI_TENANCY_PROPERTY in args.properties
        and not args.secure_multi_tenancy_user_mapping
        and not args.identity_config_file
    ):
      raise exceptions.ArgumentError(
          'If %s is enabled, either --secure-multi-tenancy-user-mapping or'
          ' --identity-config-file must be provided with user to service'
          ' account mappings.'
          % constants.ENABLE_DYNAMIC_MULTI_TENANCY_PROPERTY,
      )

    if args.identity_config_file or args.secure_multi_tenancy_user_mapping:
      if cluster_config.securityConfig is None:
        cluster_config.securityConfig = dataproc.messages.SecurityConfig()

      if args.identity_config_file:
        cluster_config.securityConfig.identityConfig = ParseIdentityConfigFile(
            dataproc, args.identity_config_file
        )
      else:
        user_service_account_mapping = (
            ParseSecureMultiTenancyUserServiceAccountMappingString(
                args.secure_multi_tenancy_user_mapping
            )
        )
        identity_config = dataproc.messages.IdentityConfig()
        identity_config.userServiceAccountMapping = (
            encoding.DictToAdditionalPropertyMessage(
                user_service_account_mapping,
                dataproc.messages.IdentityConfig.UserServiceAccountMappingValue,
            )
        )
        cluster_config.securityConfig.identityConfig = identity_config

  if args.autoscaling_policy:
    cluster_config.autoscalingConfig = dataproc.messages.AutoscalingConfig(
        policyUri=args.CONCEPTS.autoscaling_policy.Parse().RelativeName()
    )

  if args.node_group:
    gce_cluster_config.nodeGroupAffinity = dataproc.messages.NodeGroupAffinity(
        nodeGroupUri=args.node_group
    )

  if (
      args.IsSpecified('shielded_secure_boot')
      or args.IsSpecified('shielded_vtpm')
      or args.IsSpecified('shielded_integrity_monitoring')
  ):
    gce_cluster_config.shieldedInstanceConfig = (
        dataproc.messages.ShieldedInstanceConfig(
            enableSecureBoot=args.shielded_secure_boot,
            enableVtpm=args.shielded_vtpm,
            enableIntegrityMonitoring=args.shielded_integrity_monitoring,
        )
    )

  if not beta and args.IsSpecified('confidential_compute'):
    gce_cluster_config.confidentialInstanceConfig = (
        dataproc.messages.ConfidentialInstanceConfig(
            enableConfidentialCompute=args.confidential_compute
        )
    )

  if args.dataproc_metastore:
    cluster_config.metastoreConfig = dataproc.messages.MetastoreConfig(
        dataprocMetastoreService=args.dataproc_metastore
    )
  elif (alpha or beta) and (
      args.bigquery_metastore_project_id is not None
      or args.bigquery_metastore_database_location is not None
      or args.bigquery_metastore
  ):
    bigquery_metastore_config = GetBigQueryConfig(
        dataproc,
        args,
    )
    cluster_config.metastoreConfig = dataproc.messages.MetastoreConfig(
        bigqueryMetastoreConfig=bigquery_metastore_config
    )

  if include_ttl_config:
    lifecycle_config = dataproc.messages.LifecycleConfig()
    changed_config = False
    # Flags max_age, expiration_time and max_idle are hidden, but still
    # supported. They are replaced with new flags delete_max_age,
    # delete_expiration_time and delete_max_idle.
    if args.max_age is not None:
      lifecycle_config.autoDeleteTtl = six.text_type(args.max_age) + 's'
      changed_config = True
    if args.expiration_time is not None:
      lifecycle_config.autoDeleteTime = times.FormatDateTime(
          args.expiration_time
      )
      changed_config = True
    if args.max_idle is not None:
      lifecycle_config.idleDeleteTtl = six.text_type(args.max_idle) + 's'
      changed_config = True

    if args.delete_max_age is not None:
      lifecycle_config.autoDeleteTtl = (
          six.text_type(args.delete_max_age) + 's'
      )
      changed_config = True
    if args.delete_expiration_time is not None:
      lifecycle_config.autoDeleteTime = times.FormatDateTime(
          args.delete_expiration_time
      )
      changed_config = True
    if args.delete_max_idle is not None:
      lifecycle_config.idleDeleteTtl = (
          six.text_type(args.delete_max_idle) + 's'
      )
      changed_config = True
    # Process scheduled stop args.
    if args.stop_max_age is not None:
      lifecycle_config.autoStopTtl = six.text_type(args.stop_max_age) + 's'
      changed_config = True
    if args.stop_expiration_time is not None:
      lifecycle_config.autoStopTime = times.FormatDateTime(
          args.stop_expiration_time
      )
      changed_config = True
    if args.stop_max_idle is not None:
      lifecycle_config.idleStopTtl = six.text_type(args.stop_max_idle) + 's'
      changed_config = True

    if changed_config:
      cluster_config.lifecycleConfig = lifecycle_config

  encryption_config = dataproc.messages.EncryptionConfig()
  if hasattr(args.CONCEPTS, 'gce_pd_kms_key'):
    gce_pd_kms_ref = args.CONCEPTS.gce_pd_kms_key.Parse()
    if gce_pd_kms_ref:
      encryption_config.gcePdKmsKeyName = gce_pd_kms_ref.RelativeName()
    else:
      # Did user use any gce-pd-kms-key flags?
      for keyword in [
          'gce-pd-kms-key',
          'gce-pd-kms-key-project',
          'gce-pd-kms-key-location',
          'gce-pd-kms-key-keyring',
      ]:
        if getattr(args, keyword.replace('-', '_'), None):
          raise exceptions.ArgumentError(
              '--gce-pd-kms-key was not fully specified.'
          )
  if hasattr(args.CONCEPTS, 'kms_key'):
    kms_ref = args.CONCEPTS.kms_key.Parse()
    if kms_ref:
      encryption_config.kmsKey = kms_ref.RelativeName()
    else:
      for keyword in [
          'kms-key',
          'kms-project',
          'kms-location',
          'kms-keyring',
      ]:
        if getattr(args, keyword.replace('-', '_'), None):
          raise exceptions.ArgumentError('--kms-key was not fully specified.')
  if encryption_config.gcePdKmsKeyName or encryption_config.kmsKey:
    cluster_config.encryptionConfig = encryption_config

  # Secondary worker group is optional. However, users may specify
  # future pVMs configuration at creation time.
  num_secondary_workers = _FirstNonNone(
      args.num_secondary_workers, args.num_preemptible_workers
  )
  secondary_worker_boot_disk_type = _FirstNonNone(
      args.secondary_worker_boot_disk_type,
      args.preemptible_worker_boot_disk_type,
  )
  num_secondary_worker_local_ssds = _FirstNonNone(
      args.num_secondary_worker_local_ssds,
      args.num_preemptible_worker_local_ssds,
  )
  if (
      num_secondary_workers is not None
      or secondary_worker_boot_disk_size_gb is not None
      or secondary_worker_boot_disk_type is not None
      or num_secondary_worker_local_ssds is not None
      or args.worker_min_cpu_platform is not None
      or args.secondary_worker_type == 'non-preemptible'
      or args.secondary_worker_type == 'spot'
      or args.secondary_worker_machine_types is not None
      or args.min_secondary_worker_fraction is not None
  ):
    instance_flexibility_policy = GetInstanceFlexibilityPolicy(
        dataproc,
        GetProvisioningModelMix(
            dataproc,
            args.secondary_worker_standard_capacity_base,
            args.secondary_worker_standard_capacity_percent_above_base,
        ),
        args.secondary_worker_machine_types,
        'secondary-worker',
    )

    startup_config = GetStartupConfig(dataproc, args)
    cluster_config.secondaryWorkerConfig = (
        dataproc.messages.InstanceGroupConfig(
            numInstances=num_secondary_workers,
            accelerators=secondary_worker_accelerators,
            diskConfig=GetDiskConfig(
                dataproc,
                secondary_worker_boot_disk_type,
                secondary_worker_boot_disk_size_gb,
                num_secondary_worker_local_ssds,
                args.secondary_worker_local_ssd_interface,
                'Secondary worker',
                args.secondary_worker_boot_disk_provisioned_iops,
                args.secondary_worker_boot_disk_provisioned_throughput,
                args.secondary_worker_attached_disks,
            ),
            minCpuPlatform=args.worker_min_cpu_platform,
            preemptibility=_GetInstanceGroupPreemptibility(
                dataproc, args.secondary_worker_type
            ),
            instanceFlexibilityPolicy=instance_flexibility_policy,
            startupConfig=startup_config,
        )
    )

  # Add input-only auxiliaryNodeGroups
  if _AtLeastOneGceNodePoolSpecified(args, driver_pool_boot_disk_size_gb):
    cluster_config.auxiliaryNodeGroups = [
        dataproc.messages.AuxiliaryNodeGroup(
            nodeGroup=(
                dataproc.messages.NodeGroup(
                    labels=labels_util.ParseCreateArgs(
                        args, dataproc.messages.NodeGroup.LabelsValue
                    ),
                    roles=[
                        dataproc.messages.NodeGroup.RolesValueListEntryValuesEnum.DRIVER
                    ],
                    nodeGroupConfig=dataproc.messages.InstanceGroupConfig(
                        numInstances=args.driver_pool_size,
                        imageUri=image_ref and image_ref.SelfLink(),
                        machineTypeUri=args.driver_pool_machine_type,
                        accelerators=driver_pool_accelerators,
                        diskConfig=GetDiskConfig(
                            dataproc,
                            args.driver_pool_boot_disk_type,
                            driver_pool_boot_disk_size_gb,
                            args.num_driver_pool_local_ssds,
                            args.driver_pool_local_ssd_interface,
                            'Driver pool',
                        ),
                        minCpuPlatform=args.driver_pool_min_cpu_platform,
                    ),
                )
            ),
            # FE should generate the driver pool id if none is provided
            nodeGroupId=args.driver_pool_id,
        )
    ]

  if args.enable_component_gateway:
    cluster_config.endpointConfig = dataproc.messages.EndpointConfig(
        enableHttpPortAccess=args.enable_component_gateway
    )

  if include_gke_platform_args:
    if args.gke_cluster is not None:
      location = args.zone or args.region
      target_gke_cluster = 'projects/{0}/locations/{1}/clusters/{2}'.format(
          project_id, location, args.gke_cluster
      )
      cluster_config.gkeClusterConfig = dataproc.messages.GkeClusterConfig(
          namespacedGkeDeploymentTarget=dataproc.messages.NamespacedGkeDeploymentTarget(
              targetGkeCluster=target_gke_cluster,
              clusterNamespace=args.gke_cluster_namespace,
          )
      )
      cluster_config.gceClusterConfig = None
      cluster_config.masterConfig = None
      cluster_config.workerConfig = None
      cluster_config.secondaryWorkerConfig = None

  if not beta and args.metric_sources:
    _SetDataprocMetricConfig(args, cluster_config, dataproc)
  return cluster_config


def _GetMetricOverrides(args):
  """Gets metric overrides from from input file or metric_overrides list."""
  if args.metric_overrides:
    return args.metric_overrides
  if args.metric_overrides_file:
    if args.metric_overrides_file.startswith('gs://'):
      data = storage_helpers.ReadObject(args.metric_overrides_file)
    else:
      data = console_io.ReadFromFileOrStdin(
          args.metric_overrides_file, binary=False
      )
    return data.split('\n')
  return []


def _SetDataprocMetricConfig(args, cluster_config, dataproc):
  """Method to set Metric source and the corresponding optional overrides to DataprocMetricConfig.

  Metric overrides can be read from either metric-overrides or
  metric-overrides-file argument.
  We do basic validation on metric-overrides :
  * Ensure that all entries of metric-overrides are prefixed with camel case of
  the metric source.
    Example :
    "sparkHistoryServer:JVM:Memory:NonHeapMemoryUsage.used" is valid metric
    override for the metric-source spark-history-server
    but "spark-history-server:JVM:Memory:NonHeapMemoryUsage.used" is not.
  * Metric overrides are passed only for the metric sources enabled via
  args.metric_sources.

  Args:
    args: arguments passed to create cluster command.
    cluster_config: cluster configuration to be updated with
      DataprocMetricConfig.
    dataproc: Dataproc API definition.
  """

  def _GetCamelCaseMetricSource(ms):
    title_case = ms.lower().title().replace('_', '').replace('-', '')
    return title_case[0].lower() + title_case[1:]

  metric_source_to_overrides_dict = dict()
  metric_overrides = [m.strip() for m in _GetMetricOverrides(args) if m.strip()]
  if metric_overrides:
    invalid_metric_overrides = []
    valid_metric_prefixes = [
        _GetCamelCaseMetricSource(ms) for ms in args.metric_sources
    ]
    for metric in metric_overrides:
      prefix = metric.split(':')[0]
      if prefix not in valid_metric_prefixes:
        invalid_metric_overrides.append(metric)
      metric_source_to_overrides_dict.setdefault(prefix, []).append(metric)
    if invalid_metric_overrides:
      raise exceptions.ArgumentError(
          'Found invalid metric overrides: '
          + ','.join(invalid_metric_overrides)
          + '. Please ensure the metric overrides only have the following'
          ' prefixes that correspond to the metric-sources that are enabled: '
          + ','.join(valid_metric_prefixes)
      )

  cluster_config.dataprocMetricConfig = dataproc.messages.DataprocMetricConfig(
      metrics=[]
  )
  for metric_source in args.metric_sources:
    metric_source_in_camel_case = _GetCamelCaseMetricSource(metric_source)
    metric_overrides = metric_source_to_overrides_dict.get(
        metric_source_in_camel_case, []
    )
    cluster_config.dataprocMetricConfig.metrics.append(
        dataproc.messages.Metric(
            metricSource=arg_utils.ChoiceToEnum(
                metric_source,
                dataproc.messages.Metric.MetricSourceValueValuesEnum,
            ),
            metricOverrides=metric_overrides,
        )
    )


def _AtLeastOneGceNodePoolSpecified(args, driver_pool_boot_disk_size_gb):
  return hasattr(args, 'driver_pool_size') and (
      args.IsSpecified('driver_pool_size')
      or args.IsSpecified('driver_pool_machine_type')
      or args.IsSpecified('driver_pool_boot_disk_type')
      or driver_pool_boot_disk_size_gb is not None
      or args.IsSpecified('num_driver_pool_local_ssds')
      or args.IsSpecified('driver_pool_local_ssd_interface')
      or args.IsSpecified('driver_pool_min_cpu_platform')
      or args.IsSpecified('driver_pool_id')
  )


def _FirstNonNone(first, second):
  return first if first is not None else second


def _GetInstanceGroupPreemptibility(dataproc, secondary_worker_type):
  if secondary_worker_type == 'non-preemptible':
    return dataproc.messages.InstanceGroupConfig.PreemptibilityValueValuesEnum(
        'NON_PREEMPTIBLE'
    )
  elif secondary_worker_type == 'spot':
    return dataproc.messages.InstanceGroupConfig.PreemptibilityValueValuesEnum(
        'SPOT'
    )
  return None


def _GetCusterType(dataproc, cluster_type):
  """Get ClusterType enum value.

  Converts cluster_type argument value to
  ClusterType API enum value.

  Args:
    dataproc: Dataproc API definition
    cluster_type: argument value

  Returns:
    ClusterType API enum value
  """
  if cluster_type == 'standard':
    return dataproc.messages.ClusterConfig.ClusterTypeValueValuesEnum(
        'STANDARD'
    )
  if cluster_type == 'single-node':
    return dataproc.messages.ClusterConfig.ClusterTypeValueValuesEnum(
        'SINGLE_NODE'
    )
  if cluster_type == 'zero-scale':
    return dataproc.messages.ClusterConfig.ClusterTypeValueValuesEnum(
        'ZERO_SCALE'
    )
  if cluster_type is None:
    return None
  raise exceptions.ArgumentError(
      'Unsupported --cluster-type flag value: '
      + cluster_type
  )


def _GetClusterTier(dataproc, cluster_tier):
  """Get ClusterTier enum value.

  Converts cluster_tier argument value to
  ClusterTier API enum value.

  Args:
    dataproc: Dataproc API definition
    cluster_tier: argument value

  Returns:
    ClusterTier API enum value
  """
  if cluster_tier == 'premium':
    return dataproc.messages.ClusterConfig.ClusterTierValueValuesEnum(
        'CLUSTER_TIER_PREMIUM'
    )
  if cluster_tier == 'standard':
    return dataproc.messages.ClusterConfig.ClusterTierValueValuesEnum(
        'CLUSTER_TIER_STANDARD'
    )
  if cluster_tier is None:
    return None
  raise exceptions.ArgumentError(
      'Unsupported --cluster-tier flag value: ' + cluster_tier
  )


def _GetPrivateIpv6GoogleAccess(dataproc, private_ipv6_google_access_type):
  """Get PrivateIpv6GoogleAccess enum value.

  Converts private_ipv6_google_access_type argument value to
  PrivateIpv6GoogleAccess API enum value.

  Args:
    dataproc: Dataproc API definition
    private_ipv6_google_access_type: argument value

  Returns:
    PrivateIpv6GoogleAccess API enum value
  """
  if private_ipv6_google_access_type == 'inherit-subnetwork':
    return dataproc.messages.GceClusterConfig.PrivateIpv6GoogleAccessValueValuesEnum(
        'INHERIT_FROM_SUBNETWORK'
    )
  if private_ipv6_google_access_type == 'outbound':
    return dataproc.messages.GceClusterConfig.PrivateIpv6GoogleAccessValueValuesEnum(
        'OUTBOUND'
    )
  if private_ipv6_google_access_type == 'bidirectional':
    return dataproc.messages.GceClusterConfig.PrivateIpv6GoogleAccessValueValuesEnum(
        'BIDIRECTIONAL'
    )
  if private_ipv6_google_access_type is None:
    return None
  raise exceptions.ArgumentError(
      'Unsupported --private-ipv6-google-access-type flag value: '
      + private_ipv6_google_access_type
  )


def GetBigQueryConfig(dataproc, args):
  """Get BigQuery config.

  Args:
    dataproc: Dataproc object that contains client, messages, and resources
    args: arguments of the request

  Returns:
    bigquery_config: BigQuery config.
  """
  return dataproc.messages.BigqueryMetastoreConfig(
      projectId=args.bigquery_metastore_project_id,
      location=args.bigquery_metastore_database_location,
  )


def GetDiskConfig(
    dataproc,
    boot_disk_type,
    boot_disk_size,
    num_local_ssds,
    local_ssd_interface,
    node_type,
    boot_disk_provisioned_iops=None,
    boot_disk_provisioned_throughput=None,
    attached_disk_configs=None,
):
  """Get dataproc cluster disk configuration.

  Args:
    dataproc: Dataproc object that contains client, messages, and resources
    boot_disk_type: Type of the boot disk
    boot_disk_size: Size of the boot disk
    num_local_ssds: Number of the Local SSDs
    local_ssd_interface: Interface used to attach local SSDs
    node_type: Type of the dataproc node. One of Master, Worker, Secondary
      worker, Driver pool
    boot_disk_provisioned_iops: Provisioned IOPS of the boot disk
    boot_disk_provisioned_throughput: Provisioned throughput of the boot disk
    attached_disk_configs: Attached disks of the node.

  Returns:
    disk_config: Dataproc cluster disk configuration
  Raises:
    exceptions.ArgumentError: If boot_disk_provisioned_iops or
      boot_disk_provisioned_throughput is specified with boot_disk_type other
      than hyperdisk-balanced or if the value is not positive.
  """

  if boot_disk_provisioned_iops is not None:
    if boot_disk_type is not None and boot_disk_type != 'hyperdisk-balanced':
      raise exceptions.ArgumentError(
          f'{node_type} bootdisk IOPS can be specified only with bootdisk type'
          f' hyperdisk-balanced, provided {boot_disk_type}.'
      )

    if boot_disk_provisioned_iops <= 0:
      raise exceptions.ArgumentError(
          f'{node_type} bootdisk IOPS must be positive, provided'
          f' {boot_disk_provisioned_iops}.'
      )

  if boot_disk_provisioned_throughput is not None:
    if boot_disk_type is not None and boot_disk_type != 'hyperdisk-balanced':
      raise exceptions.ArgumentError(
          f'{node_type} bootdisk throughput can be specified only with bootdisk'
          f' type hyperdisk-balanced, provided {boot_disk_type}.'
      )

    if boot_disk_provisioned_throughput <= 0:
      raise exceptions.ArgumentError(
          f'{node_type} bootdisk throughput must be positive, provided'
          f' {boot_disk_provisioned_throughput}.'
      )

  attached_disk_configs_messages = []
  if attached_disk_configs:
    for attached_disk_config in attached_disk_configs:
      disk = dataproc.messages.AttachedDiskConfig()
      disk.diskType = (
          dataproc.messages.AttachedDiskConfig.DiskTypeValueValuesEnum(
              'DISK_TYPE_UNSPECIFIED'
          )
      )
      if attached_disk_config.get('type') == 'hyperdisk-balanced':
        disk.diskType = (
            dataproc.messages.AttachedDiskConfig.DiskTypeValueValuesEnum(
                'HYPERDISK_BALANCED'
            )
        )
      if attached_disk_config.get('type') == 'hyperdisk-extreme':
        disk.diskType = (
            dataproc.messages.AttachedDiskConfig.DiskTypeValueValuesEnum(
                'HYPERDISK_EXTREME'
            )
        )
      if attached_disk_config.get('type') == 'hyperdisk-ml':
        disk.diskType = (
            dataproc.messages.AttachedDiskConfig.DiskTypeValueValuesEnum(
                'HYPERDISK_ML'
            )
        )
      if attached_disk_config.get('type') == 'hyperdisk-throughput':
        disk.diskType = (
            dataproc.messages.AttachedDiskConfig.DiskTypeValueValuesEnum(
                'HYPERDISK_THROUGHPUT'
            )
        )
      disk_size = attached_disk_config.get('size')
      if disk_size:
        disk.diskSizeGb = int(disk_size)
      iops = attached_disk_config.get('iops')
      if iops:
        disk.provisionedIops = int(iops)
      throughput = attached_disk_config.get('throughput')
      if throughput:
        disk.provisionedThroughput = int(throughput)
      attached_disk_configs_messages.append(disk)

  return dataproc.messages.DiskConfig(
      bootDiskType=boot_disk_type,
      bootDiskSizeGb=boot_disk_size,
      bootDiskProvisionedIops=boot_disk_provisioned_iops,
      bootDiskProvisionedThroughput=boot_disk_provisioned_throughput,
      numLocalSsds=num_local_ssds,
      localSsdInterface=local_ssd_interface,
      attachedDiskConfigs=attached_disk_configs_messages,
  )


def GetInstanceFlexibilityPolicy(
    dataproc,
    provisioning_model_mix,
    machine_types,
    node_type,
):
  """Get instance flexibility policy.

  Args:
    dataproc: Dataproc object that contains client, messages, and resources
    provisioning_model_mix: Provisioning model mix for instance flexibility
      policy
    machine_types: Machine types with rank for instance selection
    node_type: Type of the dataproc node. One of master, worker,
      secondary-worker

  Returns:
    InstanceFlexibilityPolicy of the secondary worker group.
  """

  if provisioning_model_mix is None and machine_types is None:
    return None

  instance_selection_list = []

  if machine_types:
    instance_selection_list = GetInstanceSelectionList(
        dataproc, machine_types, node_type
    )

  return dataproc.messages.InstanceFlexibilityPolicy(
      instanceSelectionList=instance_selection_list,
      provisioningModelMix=provisioning_model_mix,
  )


def GetProvisioningModelMix(
    dataproc,
    standard_capacity_base,
    standard_capacity_percent_above_base,
):
  """Get provisioning model mix from given parameters.

  Args:
    dataproc: Dataproc object that contains client, messages, and resources
    standard_capacity_base: Standard capacity base for provisioning model mix
    standard_capacity_percent_above_base: Standard capacity percent above base
      for provisioning model mix

  Returns:
    ProvisioningModelMix of with given parameters.
  """

  if (
      standard_capacity_base is None
      and standard_capacity_percent_above_base is None
  ):
    return None

  return dataproc.messages.ProvisioningModelMix(
      standardCapacityBase=(standard_capacity_base or 0),
      standardCapacityPercentAboveBase=(
          standard_capacity_percent_above_base or 0
      ),
  )


def GetStartupConfig(dataproc, args):
  """Get startup config.

  Args:
    dataproc: Dataproc object that contains client, messages, and resources
    args: arguments of the request

  Returns:
    startup_config: Startup config of the secondary worker group.
  """
  if args.min_secondary_worker_fraction is None:
    return None
  return dataproc.messages.StartupConfig(
      requiredRegistrationFraction=args.min_secondary_worker_fraction
  )


def CreateCluster(
    dataproc,
    cluster_ref,
    cluster,
    is_async,
    timeout,
    enable_create_on_gke=False,
    action_on_failed_primary_workers=None,
):
  """Create a cluster.

  Args:
    dataproc: Dataproc object that contains client, messages, and resources
    cluster_ref: Full resource ref of cluster with name, region, and project id
    cluster: Cluster to create
    is_async: Whether to wait for the operation to complete
    timeout: Timeout used when waiting for the operation to complete
    enable_create_on_gke: Whether to enable creation of GKE-based clusters
    action_on_failed_primary_workers: Action to be performed when primary
      workers fail during cluster creation. Should be None for dataproc of
      v1beta2 version

  Returns:
    Created cluster, or None if async
  """
  # Get project id and region.
  request_id = util.GetUniqueId()
  request = dataproc.GetCreateClusterRequest(
      cluster=cluster,
      project_id=cluster_ref.projectId,
      region=cluster_ref.region,
      request_id=request_id,
      action_on_failed_primary_workers=action_on_failed_primary_workers,
  )
  operation = dataproc.client.projects_regions_clusters.Create(request)

  if is_async:
    log.status.write(
        'Creating [{0}] with operation [{1}].'.format(
            cluster_ref, operation.name
        )
    )
    return

  operation = util.WaitForOperation(
      dataproc,
      operation,
      message='Waiting for cluster creation operation',
      timeout_s=timeout,
  )

  get_request = dataproc.messages.DataprocProjectsRegionsClustersGetRequest(
      projectId=cluster_ref.projectId,
      region=cluster_ref.region,
      clusterName=cluster_ref.clusterName,
  )
  cluster = dataproc.client.projects_regions_clusters.Get(get_request)
  if cluster.status.state == (
      dataproc.messages.ClusterStatus.StateValueValuesEnum.RUNNING
  ):
    if enable_create_on_gke and cluster.config.gkeClusterConfig is not None:
      log.CreatedResource(
          cluster_ref,
          details='Cluster created on GKE cluster {0}'.format(
              cluster.config.gkeClusterConfig.namespacedGkeDeploymentTarget.targetGkeCluster
          ),
      )
    elif cluster.virtualClusterConfig is not None:
      if (
          cluster.virtualClusterConfig.kubernetesClusterConfig.gkeClusterConfig
          is not None
      ):
        log.CreatedResource(
            cluster_ref,
            details='Virtual Cluster created on GKE cluster: {0}'.format(
                cluster.virtualClusterConfig.kubernetesClusterConfig.gkeClusterConfig.gkeClusterTarget
            ),
        )
    else:
      zone_uri = cluster.config.gceClusterConfig.zoneUri
      zone_short_name = zone_uri.split('/')[-1]

      # Log the URL of the cluster
      log.CreatedResource(
          cluster_ref,
          # Also indicate which zone the cluster was placed in. This is helpful
          # if the server picked a zone (auto zone)
          details='Cluster placed in zone [{0}]'.format(zone_short_name),
      )
  else:
    # The operation didn't have an error, but the cluster is not RUNNING.
    log.error('Create cluster failed!')
    if cluster.status.detail:
      log.error('Details:\n' + cluster.status.detail)
  return cluster


def DeleteGeneratedLabels(cluster, dataproc):
  """Filter out Dataproc-generated cluster labels.

  Args:
    cluster: Cluster to filter
    dataproc: Dataproc object that contains client, messages, and resources
  """
  # Filter out Dataproc-generated labels.
  if cluster.labels:
    labels = encoding.MessageToPyValue(cluster.labels)
    labels_to_delete = []
    for label in labels:
      if label.startswith(GENERATED_LABEL_PREFIX):
        labels_to_delete.append(label)
    for label in labels_to_delete:
      del labels[label]
    if not labels:
      cluster.labels = None
    else:
      cluster.labels = encoding.DictToAdditionalPropertyMessage(
          labels, dataproc.messages.Cluster.LabelsValue
      )


def DeleteGeneratedProperties(cluster, dataproc):
  """Filter out Dataproc-generated cluster properties.

  Args:
    cluster: Cluster to filter
    dataproc: Dataproc object that contains client, messages, and resources
  """
  try:
    if cluster.config.softwareConfig.properties:
      _DeleteClusterGeneratedProperties(cluster, dataproc)
  except AttributeError:
    # The field is not set.
    pass
  try:
    if (
        cluster.virtualClusterConfig.kubernetesClusterConfig.kubernetesSoftwareConfig.properties
    ):
      _DeleteVirtualClusterGeneratedProperties(cluster, dataproc)
  except AttributeError:
    # The field is not set.
    pass


def _DeleteClusterGeneratedProperties(cluster, dataproc):
  """Removes Dataproc generated properties from GCE-based Clusters."""
  # Filter out Dataproc-generated properties.
  props = encoding.MessageToPyValue(cluster.config.softwareConfig.properties)
  # We don't currently have a nice way to tell which properties are
  # Dataproc-generated, so for now, delete a few properties that we know contain
  # cluster-specific info.
  prop_prefixes_to_delete = (
      'hdfs:dfs.namenode.lifeline.rpc-address',
      'hdfs:dfs.namenode.servicerpc-address',
  )

  prop_keys_to_delete = [
      prop_key
      for prop_key in props.keys()
      if prop_key.startswith(prop_prefixes_to_delete)
  ]

  for prop in prop_keys_to_delete:
    del props[prop]
  if not props:
    cluster.config.softwareConfig.properties = None
  else:
    cluster.config.softwareConfig.properties = (
        encoding.DictToAdditionalPropertyMessage(
            props, dataproc.messages.SoftwareConfig.PropertiesValue
        )
    )


def _DeleteVirtualClusterGeneratedProperties(cluster, dataproc):
  """Removes Dataproc generated properties from Virtual Clusters."""
  # Filter out Dataproc-generated properties.
  props = encoding.MessageToPyValue(
      cluster.virtualClusterConfig.kubernetesClusterConfig.kubernetesSoftwareConfig.properties
  )
  # We don't currently have a nice way to tell which properties are
  # Dataproc-generated, so for now, delete a few properties that we know contain
  # cluster-specific info.
  prop_prefixes_to_delete = (
      # Output only properties from DPGKE.
      'dpgke:dpgke.unstable.outputOnly.',
  )

  prop_keys_to_delete = [
      prop_key
      for prop_key in props.keys()
      if prop_key.startswith(prop_prefixes_to_delete)
  ]

  for prop in prop_keys_to_delete:
    del props[prop]

  if not props:
    cluster.virtualClusterConfig.kubernetesClusterConfig.kubernetesSoftwareConfig.properties = (
        None
    )
  else:
    cluster.virtualClusterConfig.kubernetesClusterConfig.kubernetesSoftwareConfig.properties = encoding.DictToAdditionalPropertyMessage(
        props, dataproc.messages.KubernetesSoftwareConfig.PropertiesValue
    )


def ClusterKey(cluster, key_type):
  """Return a cluster-generated public encryption key if there is one.

  Args:
    cluster: Cluster to check for an encryption key.
    key_type: Dataproc clusters publishes both RSA and ECIES public keys.

  Returns:
    The public key for the cluster if there is one, otherwise None
  """
  master_instance_refs = cluster.config.masterConfig.instanceReferences
  if not master_instance_refs:
    return None
  if key_type == 'ECIES':
    return master_instance_refs[0].publicEciesKey
  return master_instance_refs[0].publicKey


def AddReservationAffinityGroup(parser, group_text, affinity_text):
  """Adds the argument group to handle reservation affinity configurations."""
  group = parser.add_group(help=group_text)
  group.add_argument(
      '--reservation-affinity',
      choices=['any', 'none', 'specific'],
      default='any',
      help=affinity_text,
  )
  group.add_argument(
      '--reservation',
      help="""
The name of the reservation, required when `--reservation-affinity=specific`.
""",
  )


def ValidateReservationAffinityGroup(args):
  """Validates flags specifying reservation affinity."""
  affinity = getattr(args, 'reservation_affinity', None)
  if affinity == 'specific':
    if not args.IsSpecified('reservation'):
      raise exceptions.ArgumentError(
          '--reservation must be specified with --reservation-affinity=specific'
      )


def GetReservationAffinity(args, client):
  """Returns the message of reservation affinity for the instance."""
  if args.IsSpecified('reservation_affinity'):
    type_msgs = (
        client.messages.ReservationAffinity.ConsumeReservationTypeValueValuesEnum
    )

    reservation_key = None
    reservation_values = []

    if args.reservation_affinity == 'none':
      reservation_type = type_msgs.NO_RESERVATION
    elif args.reservation_affinity == 'specific':
      reservation_type = type_msgs.SPECIFIC_RESERVATION
      # Currently, the key is fixed and the value is the name of the
      # reservation.
      # The value being a repeated field is reserved for future use when user
      # can specify more than one reservation names from which the VM can take
      # capacity from.
      reservation_key = RESERVATION_AFFINITY_KEY
      reservation_values = [args.reservation]
    else:
      reservation_type = type_msgs.ANY_RESERVATION

    return client.messages.ReservationAffinity(
        consumeReservationType=reservation_type,
        key=reservation_key or None,
        values=reservation_values,
    )

  return None

universe_domain = properties.VALUES.core.universe_domain.Get()
RESERVATION_AFFINITY_KEY = f'compute.{universe_domain}/reservation-name'


def AddKerberosGroup(parser):
  """Adds the argument group to handle Kerberos configurations."""
  kerberos_group = parser.add_argument_group(
      mutex=True,
      help=arg_parsers.UniverseHelpText(
          default=(
              'Specifying these flags will enable Kerberos for the cluster.'
          ),
          universe_help='Kerberos is only supported in the default universe.',
      ),
  )
  # Not mutually exclusive
  kerberos_flag_group = kerberos_group.add_argument_group()
  kerberos_flag_group.add_argument(
      '--enable-kerberos',
      action='store_true',
      help="""\
        Enable Kerberos on the cluster.
        """,
  )
  kerberos_flag_group.add_argument(
      '--kerberos-root-principal-password-uri',
      help="""\
        Google Cloud Storage URI of a KMS encrypted file containing the root
        principal password. Must be a Cloud Storage URL beginning with 'gs://'.
        """,
  )
  # Add kerberos-kms-key args
  kerberos_kms_flag_overrides = {
      'kms-key': '--kerberos-kms-key',
      'kms-keyring': '--kerberos-kms-key-keyring',
      'kms-location': '--kerberos-kms-key-location',
      'kms-project': '--kerberos-kms-key-project',
  }
  kms_resource_args.AddKmsKeyResourceArg(
      kerberos_flag_group,
      'password',
      flag_overrides=kerberos_kms_flag_overrides,
      name='--kerberos-kms-key',
  )

  kerberos_group.add_argument(
      '--kerberos-config-file',
      help="""\
Path to a YAML (or JSON) file containing the configuration for Kerberos on the
cluster. If you pass `-` as the value of the flag the file content will be read
from stdin.

The YAML file is formatted as follows:

```
  # Optional. Flag to indicate whether to Kerberize the cluster.
  # The default value is true.
  enable_kerberos: true

  # Optional. The Google Cloud Storage URI of a KMS encrypted file
  # containing the root principal password.
  root_principal_password_uri: gs://bucket/password.encrypted

  # Optional. The URI of the Cloud KMS key used to encrypt
  # sensitive files.
  kms_key_uri:
    projects/myproject/locations/global/keyRings/mykeyring/cryptoKeys/my-key

  # Configuration of SSL encryption. If specified, all sub-fields
  # are required. Otherwise, Dataproc will provide a self-signed
  # certificate and generate the passwords.
  ssl:
    # Optional. The Google Cloud Storage URI of the keystore file.
    keystore_uri: gs://bucket/keystore.jks

    # Optional. The Google Cloud Storage URI of a KMS encrypted
    # file containing the password to the keystore.
    keystore_password_uri: gs://bucket/keystore_password.encrypted

    # Optional. The Google Cloud Storage URI of a KMS encrypted
    # file containing the password to the user provided key.
    key_password_uri: gs://bucket/key_password.encrypted

    # Optional. The Google Cloud Storage URI of the truststore
    # file.
    truststore_uri: gs://bucket/truststore.jks

    # Optional. The Google Cloud Storage URI of a KMS encrypted
    # file containing the password to the user provided
    # truststore.
    truststore_password_uri:
      gs://bucket/truststore_password.encrypted

  # Configuration of cross realm trust.
  cross_realm_trust:
    # Optional. The remote realm the Dataproc on-cluster KDC will
    # trust, should the user enable cross realm trust.
    realm: REMOTE.REALM

    # Optional. The KDC (IP or hostname) for the remote trusted
    # realm in a cross realm trust relationship.
    kdc: kdc.remote.realm

    # Optional. The admin server (IP or hostname) for the remote
    # trusted realm in a cross realm trust relationship.
    admin_server: admin-server.remote.realm

    # Optional. The Google Cloud Storage URI of a KMS encrypted
    # file containing the shared password between the on-cluster
    # Kerberos realm and the remote trusted realm, in a cross
    # realm trust relationship.
    shared_password_uri:
      gs://bucket/cross-realm.password.encrypted

  # Optional. The Google Cloud Storage URI of a KMS encrypted file
  # containing the master key of the KDC database.
  kdc_db_key_uri: gs://bucket/kdc_db_key.encrypted

  # Optional. The lifetime of the ticket granting ticket, in
  # hours. If not specified, or user specifies 0, then default
  # value 10 will be used.
  tgt_lifetime_hours: 1

  # Optional. The name of the Kerberos realm. If not specified,
  # the uppercased domain name of the cluster will be used.
  realm: REALM.NAME
```
        """,
  )


def ParseKerberosConfigFile(dataproc, kerberos_config_file):
  """Parse a kerberos-config-file into the KerberosConfig message."""
  data = console_io.ReadFromFileOrStdin(kerberos_config_file, binary=False)
  try:
    kerberos_config_data = yaml.load(data)
  except Exception as e:
    raise exceptions.ParseError('Cannot parse YAML:[{0}]'.format(e))

  ssl_config = kerberos_config_data.get('ssl', {})
  keystore_uri = ssl_config.get('keystore_uri')
  truststore_uri = ssl_config.get('truststore_uri')
  keystore_password_uri = ssl_config.get('keystore_password_uri')
  key_password_uri = ssl_config.get('key_password_uri')
  truststore_password_uri = ssl_config.get('truststore_password_uri')

  cross_realm_trust_config = kerberos_config_data.get('cross_realm_trust', {})
  cross_realm_trust_realm = cross_realm_trust_config.get('realm')
  cross_realm_trust_kdc = cross_realm_trust_config.get('kdc')
  cross_realm_trust_admin_server = cross_realm_trust_config.get('admin_server')
  cross_realm_trust_shared_password_uri = cross_realm_trust_config.get(
      'shared_password_uri'
  )
  kerberos_config_msg = dataproc.messages.KerberosConfig(
      # Unless user explicitly disable kerberos in kerberos config,
      # consider the existence of the kerberos config is enabling
      # kerberos, explicitly or implicitly.
      enableKerberos=kerberos_config_data.get('enable_kerberos', True),
      rootPrincipalPasswordUri=kerberos_config_data.get(
          'root_principal_password_uri'
      ),
      kmsKeyUri=kerberos_config_data.get('kms_key_uri'),
      kdcDbKeyUri=kerberos_config_data.get('kdc_db_key_uri'),
      tgtLifetimeHours=kerberos_config_data.get('tgt_lifetime_hours'),
      realm=kerberos_config_data.get('realm'),
      keystoreUri=keystore_uri,
      keystorePasswordUri=keystore_password_uri,
      keyPasswordUri=key_password_uri,
      truststoreUri=truststore_uri,
      truststorePasswordUri=truststore_password_uri,
      crossRealmTrustRealm=cross_realm_trust_realm,
      crossRealmTrustKdc=cross_realm_trust_kdc,
      crossRealmTrustAdminServer=cross_realm_trust_admin_server,
      crossRealmTrustSharedPasswordUri=cross_realm_trust_shared_password_uri,
  )

  return kerberos_config_msg


def AddSecureMultiTenancyGroup(parser):
  """Adds the argument group to handle Secure Multi-Tenancy configurations."""
  secure_multi_tenancy_group = parser.add_argument_group(
      mutex=True,
      help=arg_parsers.UniverseHelpText(
          default=(
              'Specifying these flags will enable Secure Multi-Tenancy for the'
              ' cluster.'
          ),
          universe_help=(
              'Secure Multi-Tenancy for clusters is only supported in the'
              ' default universe.'
          ),
      ),
  )
  secure_multi_tenancy_group.add_argument(
      '--identity-config-file',
      help="""\
Path to a YAML (or JSON) file containing the configuration for Secure Multi-Tenancy
on the cluster. The path can be a Cloud Storage URL (Example: 'gs://path/to/file')
or a local file system path. If you pass "-" as the value of the flag the file content
will be read from stdin.

The YAML file is formatted as follows:

```
  # Required. The mapping from user accounts to service accounts.
  user_service_account_mapping:
    bob@company.com: service-account-bob@project.iam.gserviceaccount.com
    alice@company.com: service-account-alice@project.iam.gserviceaccount.com
```
        """,
  )
  secure_multi_tenancy_group.add_argument(
      '--secure-multi-tenancy-user-mapping',
      help=textwrap.dedent(
          """\
          A string of user-to-service-account mappings. Mappings are separated
          by commas, and each mapping takes the form of
          "user-account:service-account". Example:
          "bob@company.com:service-account-bob@project.iam.gserviceaccount.com,alice@company.com:service-account-alice@project.iam.gserviceaccount.com"."""
      ),
  )


def ParseIdentityConfigFile(dataproc, identity_config_file):
  """Parses a identity-config-file into the IdentityConfig message."""

  if identity_config_file.startswith('gs://'):
    data = storage_helpers.ReadObject(identity_config_file)
  else:
    data = console_io.ReadFromFileOrStdin(identity_config_file, binary=False)

  try:
    identity_config_data = yaml.load(data)
  except Exception as e:
    raise exceptions.ParseError('Cannot parse YAML:[{0}]'.format(e))

  if not identity_config_data.get('user_service_account_mapping', {}):
    raise exceptions.ArgumentError(
        '--identity-config-file must contain at least one '
        'user to service account mapping.'
    )

  user_service_account_mapping = encoding.DictToAdditionalPropertyMessage(
      identity_config_data.get('user_service_account_mapping', {}),
      dataproc.messages.IdentityConfig.UserServiceAccountMappingValue,
  )

  identity_config_data_msg = dataproc.messages.IdentityConfig(
      userServiceAccountMapping=user_service_account_mapping
  )

  return identity_config_data_msg


def ParseSecureMultiTenancyUserServiceAccountMappingString(
    user_service_account_mapping_string,
):
  """Parses a secure-multi-tenancy-user-mapping string into a dictionary."""

  user_service_account_mapping = collections.OrderedDict()
  # parse the string, throw error if string is not in
  # "user1:service-account1,user2:service-account2" format
  mapping_str_list = user_service_account_mapping_string.split(',')
  for mapping_str in mapping_str_list:
    mapping = mapping_str.split(':')
    if len(mapping) != 2:
      raise exceptions.ArgumentError(
          'Invalid Secure Multi-Tenancy User Mapping.'
      )
    user_service_account_mapping[mapping[0]] = mapping[1]
  return user_service_account_mapping


def GetInstanceSelectionList(dataproc, machine_types, node_type):
  """Build List of InstanceSelection from the given flags."""
  if machine_types is None:
    return []
  instance_selection_list = []
  for machine_type_config in machine_types:
    if 'type' not in machine_type_config or not machine_type_config['type']:
      raise exceptions.ArgumentError(
          f'Missing machine type for {node_type}-machine-types'
      )
    machine_types = machine_type_config['type']

    if 'rank' not in machine_type_config:
      rank = 0
    else:
      rank = machine_type_config['rank']
      if len(rank) != 1 or not rank[0].isdigit():
        raise exceptions.ArgumentError(
            f'Invalid value for rank in {node_type}-machine-types'
        )
      rank = int(rank[0])

    instance_selection = dataproc.messages.InstanceSelection()
    instance_selection.machineTypes = machine_types
    instance_selection.rank = rank

    instance_selection_list.append(instance_selection)

  return instance_selection_list


class ArgMultiValueDict:
  """Converts argument values into multi-valued mappings.

  Values for the repeated keys are collected in a list.
  """

  def __init__(self):
    ops = '='
    key_op_value_pattern = '([^{ops}]+)([{ops}]?)(.*)'.format(ops=ops)
    self.key_op_value = re.compile(key_op_value_pattern, re.DOTALL)

  def __call__(self, arg_value):
    arg_list = [item.strip() for item in arg_value.split(',')]
    arg_dict = collections.OrderedDict()
    for arg in arg_list:
      match = self.key_op_value.match(arg)
      if not match:
        raise arg_parsers.ArgumentTypeError(
            'Invalid flag value [{0}]'.format(arg)
        )
      key, _, value = (
          match.group(1).strip(),
          match.group(2),
          match.group(3).strip(),
      )
      arg_dict.setdefault(key, []).append(value)
    return arg_dict


class DiskConfigParser(object):
  """Parses and validates disk configurations provided as a string.

  This class takes a string representing disk configurations, parses them,
  validates each configuration, and returns a list of dictionaries, where each
  dictionary represents a valid disk configuration.

  The expected format for the input string is a semi-colon separated list of
  disk configurations. Each disk configuration is a comma-separated list of
  key-value pairs, where the key and value are separated by an equals sign (=).

  Example:
      'type=pd-ssd,size=100GB,iops=2000;type=pd-standard,size=5TB'

  Attributes:
      binary_size_parser: An instance of arg_parsers.BinarySize used to parse
        and validate disk sizes. It is initialized with a lower bound of '4GB'
        and suggested scales of 'GB' and 'TB'.
  """

  def __init__(self):
    self.binary_size_parser = arg_parsers.BinarySize(
        lower_bound='4GB', suggested_binary_size_scales=['GB', 'TB']
    )

  def __call__(self, value: str) -> List[Dict[str, str]]:
    """Parses and validates a string of disk configurations.

    Args:
        value: A string containing the disk configurations, or an empty string.

    Returns:
        A list of dictionaries, where each dictionary represents a valid disk
        configuration. Returns an empty list if the input string is empty.

    Raises:
        exceptions.ArgumentError: If the input is not a string, if a disk
          configuration is invalid, or if any validation fails.
    """
    if not value:
      return []
    if not isinstance(value, str):
      raise exceptions.ArgumentError(
          f'Expected string for disk configuration, but got {type(value)}'
      )
    disk_configs = value.split(';')
    result = []
    for config in disk_configs:
      config = config.strip()
      if not config:
        continue
      try:
        disk_data = self._ParseKeyValuePairs(config)
      except ValueError as e:
        raise exceptions.ArgumentError(
            f'Invalid disk configuration: {config}. {e}'
        ) from e
      self._ValidateDiskData(disk_data)
      result.append(disk_data)
    return result

  def _ParseKeyValuePairs(self, config: str) -> Dict[str, str]:
    """Parses a single disk configuration string into a dictionary.

    Args:
        config: A string representing a single disk configuration (e.g.,
          'type=pd-ssd,size=100GB').

    Returns:
        A dictionary containing the key-value pairs of the disk configuration.

    Raises:
        ValueError: If the input string is not in the expected key=value format,
          or if a key is duplicated.
    """
    disk_data = {}
    pairs = config.split(',')
    for pair in pairs:
      if '=' not in pair:
        raise ValueError(
            f'Invalid key-value pair: {pair}. It should follow format:'
            ' key=value'
        )
      key, value = pair.split('=', 1)
      key = key.strip()
      value = value.strip()
      if key in disk_data:
        raise ValueError(f'Duplicate key: {key}')
      disk_data[key] = value
    return disk_data

  def _ValidateDiskData(self, disk_data: Dict[str, str]):
    """Validates the disk data provided for attached disks.

    This method checks that:
      - All keys in the disk_data dictionary are in the
      `ALLOWED_DISK_CONFIG_KEYSET`.
      - The 'type' key is present and has a valid value.
      - The 'size' value, if present, can be parsed by `binary_size_parser`.
      - The 'iops' value, if present, is a positive integer.
      - The 'throughput' value, if present, is a positive integer.

    Args:
        disk_data: A dictionary representing the disk configuration.

    Raises:
        exceptions.ArgumentError: If any of the validation rules fail.
    """
    for key in disk_data.keys():
      if key not in ALLOWED_DISK_CONFIG_KEYSET:
        raise exceptions.ArgumentError(
            f'Invalid key: {key}. Allowed keys are:'
            f' {", ".join(ALLOWED_DISK_CONFIG_KEYSET)}'
        )
    if 'type' not in disk_data or not disk_data['type']:
      raise exceptions.ArgumentError('Disk type is required')
    if disk_data['type'] not in VALID_DISK_TYPES:
      raise exceptions.ArgumentError(
          f"Invalid disk type: {disk_data['type']}. "
          f"It must be one of: {', '.join(VALID_DISK_TYPES)}"
      )
    if 'size' in disk_data and disk_data['size']:
      try:
        disk_data['size'] = int(
            self.binary_size_parser(disk_data['size']) / (1024 * 1024 * 1024)
        )
      except arg_parsers.ArgumentTypeError as e:
        raise exceptions.ArgumentError(
            f"Invalid disk size: {disk_data['size']}, {e}"
        )
    if (
        'iops' in disk_data
        and disk_data['iops']
        and (not disk_data['iops'].isdigit() or int(disk_data['iops']) <= 0)
    ):
      raise exceptions.ArgumentError(
          f"Invalid iops value: {disk_data['iops']}, It should be a positive"
          ' number'
      )
    if (
        'throughput' in disk_data
        and disk_data['throughput']
        and (
            not disk_data['throughput'].isdigit()
            or int(disk_data['throughput']) <= 0
        )
    ):
      raise exceptions.ArgumentError(
          f"Invalid throughput value: {disk_data['throughput']}, It should be a"
          ' positive number'
      )