HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/394/lib/googlecloudsdk/command_lib/container/fleet/rollouts/flags.py
# -*- coding: utf-8 -*- #
# Copyright 2023 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Functions to add flags in rollout commands."""


from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import textwrap
from typing import Iterator

from apitools.base.protorpclite import messages
from googlecloudsdk.api_lib.container.fleet import util
from googlecloudsdk.calliope import arg_parsers
from googlecloudsdk.calliope import base
from googlecloudsdk.calliope import parser_arguments
from googlecloudsdk.calliope import parser_extensions
from googlecloudsdk.command_lib.container.fleet import resources as fleet_resources
from googlecloudsdk.core import resources
from googlecloudsdk.generated_clients.apis.gkehub.v1alpha import gkehub_v1alpha_messages as fleet_messages

_BINAUTHZ_GKE_POLICY_REGEX = (
    'projects/([^/]+)/platforms/gke/policies/([a-zA-Z0-9_-]+)'
)


# TODO(b/312311133): Deduplicate shared code between fleet and rollout commands.
class RolloutFlags:
  """Add flags to the fleet rollout command surface."""

  def __init__(
      self,
      parser: parser_arguments.ArgumentInterceptor,
      release_track: base.ReleaseTrack = base.ReleaseTrack.ALPHA,
  ):
    self._parser = parser
    self._release_track = release_track

  @property
  def parser(self):
    return self._parser

  @property
  def release_track(self):
    return self._release_track

  def AddAsync(self):
    base.ASYNC_FLAG.AddToParser(self.parser)

  def AddDisplayName(self):
    self.parser.add_argument(
        '--display-name',
        type=str,
        help=textwrap.dedent("""\
            Display name of the rollout to be created (optional). 4-30
            characters, alphanumeric and [ \'"!-] only.
        """),
    )

  def AddLabels(self):
    self.parser.add_argument(
        '--labels',
        help='Labels for the rollout.',
        metavar='KEY=VALUE',
        type=arg_parsers.ArgDict(),
    )

  def AddScheduledStartTime(self):
    self.parser.add_argument(
        '--start-time',
        type=arg_parsers.Datetime.Parse,
        help=textwrap.dedent("""\
            Start time (in the future) of the rollout.

            See $ gcloud topic datetimes for information on datetime formats.
        """),
    )

  def AddScheduleOffset(self):
    self.parser.add_argument(
        '--schedule-offset',
        type=arg_parsers.Duration(),
        help=textwrap.dedent("""\
            Offset to shift the schedule by when resuming a paused rollout, e.g. `8h`, `7d12h`.

            See $ gcloud topic datetimes for information on duration formats.
        """),
    )

  def AddValidateOnly(self):
    self.parser.add_argument(
        '--validate-only',
        action='store_true',
        help=textwrap.dedent("""\
            Generate a new schedule without actually resuming the rollout.
        """),
    )

  def AddManagedRolloutConfig(self):
    managed_rollout_config_group = self.parser.add_group(
        help='Configurations for the Rollout. Waves are assigned automatically.'
    )
    self._AddSoakDuration(managed_rollout_config_group)

  def _AddSoakDuration(
      self, managed_rollout_config_group: parser_arguments.ArgumentInterceptor
  ):
    managed_rollout_config_group.add_argument(
        '--soak-duration',
        help=textwrap.dedent("""\
          Soak time before starting the next wave. e.g. `4h`, `2d6h`.
          Soak duration is required for Rollouts.

          See $ gcloud topic datetimes for information on duration formats."""),
        type=arg_parsers.Duration(),
        required=True,
    )

  def AddRolloutResourceArg(self):
    fleet_resources.AddRolloutResourceArg(
        parser=self.parser,
        api_version=util.VERSION_MAP[self.release_track],
    )

  def AddRolloutTypeConfig(self):
    rollout_type_mutex_group = self.parser.add_mutually_exclusive_group(
        help=(
            'Configuration for specific rollout types.'
        ),
    )
    self._AddVersionUpgrade(rollout_type_mutex_group)
    self._AddFeatureUpdate(rollout_type_mutex_group)

  def _AddVersionUpgrade(
      self, rollout_type_mutex_group: parser_arguments.ArgumentInterceptor
  ):
    version_upgrade_config_group = rollout_type_mutex_group.add_group(
        help='Version upgrade rollout config.',
    )
    self._AddVersionUpgradeType(version_upgrade_config_group)
    self._AddTargetVersion(version_upgrade_config_group)

  def _AddVersionUpgradeType(
      self, rollout_type_mutex_group: parser_arguments.ArgumentInterceptor
  ):
    version_upgrade_type_mutex_group = (
        rollout_type_mutex_group.add_mutually_exclusive_group(
            help='Version upgrade type to use for the Rollout.',
        )
    )

    self._AddControlPlaneUpgrade(version_upgrade_type_mutex_group)
    self._AddConfigSyncUpgrade(version_upgrade_type_mutex_group)

  def _AddControlPlaneUpgrade(
      self, version_upgrade_config_group: parser_arguments.ArgumentInterceptor
  ):
    version_upgrade_config_group.add_argument(
        '--control-plane-upgrade',
        help=textwrap.dedent("""\
          Upgrade the control plane of the clusters in the fleet."""),
        action='store_true',
    )

  def _AddConfigSyncUpgrade(
      self, version_upgrade_config_group: parser_arguments.ArgumentInterceptor
  ):
    version_upgrade_config_group.add_argument(
        '--config-sync-upgrade',
        help=textwrap.dedent("""\
          Upgrade the config sync of the clusters in the fleet."""),
        action='store_true',
    )

  def _AddTargetVersion(
      self, version_upgrade_config_group: parser_arguments.ArgumentInterceptor
  ):
    version_upgrade_config_group.add_argument(
        '--target-version',
        help=textwrap.dedent("""\
          The version to upgrade clusters from the fleet to."""),
        type=str,
        required=True,
    )

  def _AddFeatureUpdate(
      self, rollout_type_mutex_group: parser_arguments.ArgumentInterceptor
  ):
    feature_update_mutex_group = (
        rollout_type_mutex_group.add_mutually_exclusive_group(
            help='Feature config to use for Rollout.',
        )
    )

    self._AddSecurityPostureConfig(feature_update_mutex_group)
    self._AddBinaryAuthorizationConfig(feature_update_mutex_group)

  def _AddSecurityPostureConfig(
      self, feature_update_mutex_group: parser_arguments.ArgumentInterceptor
  ):
    security_posture_config_group = feature_update_mutex_group.add_group(
        help='Security posture config.',
    )
    self._AddSecurityPostureMode(security_posture_config_group)
    self._AddWorkloadVulnerabilityScanningMode(security_posture_config_group)

  def _AddSecurityPostureMode(
      self, security_posture_config_group: parser_arguments.ArgumentInterceptor
  ):
    security_posture_config_group.add_argument(
        '--security-posture',
        choices=['disabled', 'standard'],
        default=None,
        help=textwrap.dedent("""\
          To apply standard security posture to clusters in the fleet,

            $ {command} --security-posture=standard

          """),
    )

  def _AddWorkloadVulnerabilityScanningMode(
      self, security_posture_config_group: parser_arguments.ArgumentInterceptor
  ):
    security_posture_config_group.add_argument(
        '--workload-vulnerability-scanning',
        choices=['disabled', 'standard', 'enterprise'],
        default=None,
        help=textwrap.dedent("""\
            To apply standard vulnerability scanning to clusters in the fleet,

              $ {command} --workload-vulnerability-scanning=standard

            """),
    )

  def _AddBinaryAuthorizationConfig(
      self, feature_update_mutex_group: parser_arguments.ArgumentInterceptor
  ):
    binary_authorization_config_group = feature_update_mutex_group.add_group(
        help='Binary Authorization config.',
    )
    self._AddBinauthzEvaluationMode(binary_authorization_config_group)
    self._AddBinauthzPolicyBindings(binary_authorization_config_group)

  def _AddBinauthzEvaluationMode(
      self,
      binary_authorization_config_group: parser_arguments.ArgumentInterceptor,
  ):
    binary_authorization_config_group.add_argument(
        '--binauthz-evaluation-mode',
        choices=['disabled', 'policy-bindings'],
        # Convert values to lower case before checking against the list of
        # options. This allows users to pass evaluation mode in enum form.
        type=lambda x: x.replace('_', '-').lower(),
        default=None,
        help=textwrap.dedent("""\
          Configure binary authorization mode for clusters to onboard the fleet,

            $ {command} --binauthz-evaluation-mode=policy-bindings

          """),
    )

  def _AddBinauthzPolicyBindings(
      self,
      binary_authorization_config_group: parser_arguments.ArgumentInterceptor,
  ):
    platform_policy_type = arg_parsers.RegexpValidator(
        _BINAUTHZ_GKE_POLICY_REGEX,
        'GKE policy resource names have the following format: '
        '`projects/{project_number}/platforms/gke/policies/{policy_id}`',
    )
    binary_authorization_config_group.add_argument(
        '--binauthz-policy-bindings',
        default=None,
        action='append',
        metavar='name=BINAUTHZ_POLICY',
        help=textwrap.dedent("""\
          The relative resource name of the Binary Authorization policy to audit
          and/or enforce. GKE policies have the following format:
          `projects/{project_number}/platforms/gke/policies/{policy_id}`."""),
        type=arg_parsers.ArgDict(
            spec={
                'name': platform_policy_type,
            },
            required_keys=['name'],
            max_length=1,
        ),
    )


class RolloutFlagParser:
  """Parse flags during fleet rollout command runtime."""

  def __init__(
      self, args: parser_extensions.Namespace, release_track: base.ReleaseTrack
  ):
    self.args = args
    self.release_track = release_track
    self.messages = util.GetMessagesModule(release_track)

  def IsEmpty(self, message: messages.Message) -> bool:
    """Determines if a message is empty.

    Args:
      message: A message to check the emptiness.

    Returns:
      A bool indictating if the message is equivalent to a newly initialized
      empty message instance.
    """
    return message == type(message)()

  def TrimEmpty(self, message: messages.Message):
    """Trim empty messages to avoid cluttered request."""
    # TODO(b/289929895): Trim child fields at the parent level.
    if not self.IsEmpty(message):
      return message
    return None

  def Rollout(self) -> fleet_messages.Rollout:
    rollout = fleet_messages.Rollout()
    rollout.name = util.RolloutName(self.args)
    rollout.displayName = self._DisplayName()
    rollout.labels = self._Labels()
    rollout.managedRolloutConfig = self._ManagedRolloutConfig()
    rollout.versionUpgrade = self._VersionUpgrade()
    rollout.feature = self._FeatureUpdate()
    rollout.scheduledStartTime = self._ScheduledStartTime()
    return rollout

  def _DisplayName(self) -> str:
    return self.args.display_name

  def _Labels(self) -> fleet_messages.Rollout.LabelsValue:
    """Parses --labels."""
    if '--labels' not in self.args.GetSpecifiedArgs():
      return None

    labels = self.args.labels
    labels_value = fleet_messages.Rollout.LabelsValue()
    for key, value in labels.items():
      labels_value.additionalProperties.append(
          fleet_messages.Rollout.LabelsValue.AdditionalProperty(
              key=key, value=value
          )
      )
    return labels_value

  def _ScheduledStartTime(self) -> str:
    """Parses --start-time.

    Accepts ISO 8601 datetime format. To read more,
    https://cloud.google.com/sdk/gcloud/reference/topic/

    Returns:
      str, in ISO 8601 datetime format.
    """
    if '--start-time' not in self.args.GetSpecifiedArgs():
      return None
    return self.args.start_time.strftime('%Y-%m-%dT%H:%M:%SZ')

  def _ManagedRolloutConfig(self) -> fleet_messages.ManagedRolloutConfig:
    managed_rollout_config = fleet_messages.ManagedRolloutConfig()
    managed_rollout_config.soakDuration = self._SoakDuration()
    return self.TrimEmpty(managed_rollout_config)

  def _SoakDuration(self) -> str:
    """Parses --soak-duration.

    Accepts ISO 8601 durations format. To read more,
    https://cloud.google.com/sdk/gcloud/reference/topic/

    Returns:
      str, in standard duration format, in unit of seconds.
    """
    if '--soak-duration' not in self.args.GetSpecifiedArgs():
      return None
    return '{}s'.format(self.args.soak_duration)

  def _VersionUpgrade(self) -> fleet_messages.VersionUpgrade:
    """Constructs message VersionUpgrade."""
    version_upgrade = fleet_messages.VersionUpgrade()
    version_upgrade.type = self._VersionUpgradeType()
    version_upgrade.desiredVersion = self._VersionUpgradeDesiredVersion()
    return self.TrimEmpty(version_upgrade)

  def _VersionUpgradeType(
      self,
  ) -> fleet_messages.VersionUpgrade.TypeValueValuesEnum:
    """Parses --control-plane-upgrade and --config-sync-upgrade."""
    enum_type = fleet_messages.VersionUpgrade.TypeValueValuesEnum
    if (
        '--control-plane-upgrade' in self.args.GetSpecifiedArgs()
        and '--config-sync-upgrade' in self.args.GetSpecifiedArgs()
    ):
      raise ValueError(
          '--control-plane-upgrade and --config-sync-upgrade cannot be set at'
          ' the same time.'
      )
    if '--control-plane-upgrade' in self.args.GetSpecifiedArgs():
      return enum_type.TYPE_CONTROL_PLANE
    if '--config-sync-upgrade' in self.args.GetSpecifiedArgs():
      return enum_type.TYPE_CONFIG_SYNC
    return enum_type.TYPE_UNSPECIFIED

  def _VersionUpgradeDesiredVersion(self) -> str:
    """Parses --target-version."""
    if '--target-version' not in self.args.GetSpecifiedArgs():
      return None
    return self.args.target_version

  def _FeatureUpdate(self) -> fleet_messages.FeatureUpdate:
    """Constructs message FeatureUpdate."""
    feature_update = fleet_messages.FeatureUpdate()
    feature_update.securityPostureConfig = self._SecurityPostureConfig()
    feature_update.binaryAuthorizationConfig = self._BinaryAuthorzationConfig()
    return self.TrimEmpty(feature_update)

  def _SecurityPostureConfig(self) -> fleet_messages.SecurityPostureConfig:
    security_posture_config = fleet_messages.SecurityPostureConfig()
    security_posture_config.mode = self._SecurityPostureMode()
    security_posture_config.vulnerabilityMode = (
        self._VulnerabilityModeValueValuesEnum()
    )
    return self.TrimEmpty(security_posture_config)

  def _SecurityPostureMode(
      self,
  ) -> fleet_messages.SecurityPostureConfig.ModeValueValuesEnum:
    """Parses --security-posture."""
    if '--security-posture' not in self.args.GetSpecifiedArgs():
      return None

    enum_type = fleet_messages.SecurityPostureConfig.ModeValueValuesEnum
    mapping = {
        'disabled': enum_type.DISABLED,
        'standard': enum_type.BASIC,
    }
    return mapping[self.args.security_posture]

  def _VulnerabilityModeValueValuesEnum(
      self,
  ) -> fleet_messages.SecurityPostureConfig.VulnerabilityModeValueValuesEnum:
    """Parses --workload-vulnerability-scanning."""
    if '--workload-vulnerability-scanning' not in self.args.GetSpecifiedArgs():
      return None

    enum_type = (
        self.messages.SecurityPostureConfig.VulnerabilityModeValueValuesEnum
    )
    mapping = {
        'disabled': enum_type.VULNERABILITY_DISABLED,
        'standard': enum_type.VULNERABILITY_BASIC,
        'enterprise': enum_type.VULNERABILITY_ENTERPRISE,
    }
    return mapping[self.args.workload_vulnerability_scanning]

  def _BinaryAuthorzationConfig(
      self,
  ) -> fleet_messages.BinaryAuthorizationConfig:
    binary_authorization_config = fleet_messages.BinaryAuthorizationConfig()
    binary_authorization_config.evaluationMode = self._EvaluationMode()
    binary_authorization_config.policyBindings = list(self._PolicyBindings())
    return self.TrimEmpty(binary_authorization_config)

  def _EvaluationMode(
      self,
  ) -> fleet_messages.BinaryAuthorizationConfig.EvaluationModeValueValuesEnum:
    """Parses --binauthz-evaluation-mode."""
    if '--binauthz-evaluation-mode' not in self.args.GetSpecifiedArgs():
      return None

    enum_type = (
        self.messages.BinaryAuthorizationConfig.EvaluationModeValueValuesEnum
    )
    mapping = {
        'disabled': enum_type.DISABLED,
        'policy-bindings': enum_type.POLICY_BINDINGS,
    }
    return mapping[self.args.binauthz_evaluation_mode]

  def _PolicyBindings(self) -> Iterator[fleet_messages.PolicyBinding]:
    """Parses --binauthz-policy-bindings."""
    if '--binauthz-policy-bindings' not in self.args.GetSpecifiedArgs():
      return []

    policy_bindings = self.args.binauthz_policy_bindings

    return (
        fleet_messages.PolicyBinding(name=binding['name'])
        for binding in policy_bindings
    )

  def OperationRef(self) -> resources.Resource:
    """Parses resource argument operation."""
    return self.args.CONCEPTS.operation.Parse()

  def Project(self) -> str:
    return self.args.project

  def Location(self) -> str:
    return self.args.location

  def Async(self) -> bool:
    """Parses --async flag.

    The internal representation of --async is set to args.async_, defined in
    calliope/base.py file.

    Returns:
      bool, True if specified, False if unspecified.
    """
    return self.args.async_

  def ScheduleOffset(self) -> str:
    """Parses --schedule-offset.

    Accepts ISO 8601 durations format. To read more,
    https://cloud.google.com/sdk/gcloud/reference/topic/

    Returns:
      str, in standard duration format, in unit of seconds.
    """
    if '--schedule-offset' not in self.args.GetSpecifiedArgs():
      return None
    return '{}s'.format(self.args.schedule_offset)

  def ValidateOnly(self) -> bool:
    """Parses --validate-only flag."""
    return self.args.validate_only