HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/command_lib/compute/firewall_policies/flags.py
# -*- coding: utf-8 -*- #
# Copyright 2021 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Flags and helpers for the compute organization firewall policies commands."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

from googlecloudsdk.calliope import arg_parsers
from googlecloudsdk.command_lib.compute import completers as compute_completers
from googlecloudsdk.command_lib.compute import flags as compute_flags

# TODO(b/175792396): change displayName to shortName once API is rolled out
DEFAULT_LIST_FORMAT = """\
    table(
      name:label=ID,
      displayName,
      description
    )"""


class FirewallPoliciesCompleter(compute_completers.ListCommandCompleter):

  def __init__(self, **kwargs):
    super(FirewallPoliciesCompleter, self).__init__(
        collection='compute.firewallPolicies',
        list_command='compute org-firewall-policies list --uri',
        **kwargs
    )


def FirewallPolicyRuleListArgument(
    required=False, plural=False, operation=None
):
  return compute_flags.ResourceArgument(
      name='FIREWALL_POLICY',
      resource_name='firewall policy',
      completer=FirewallPoliciesCompleter,
      plural=plural,
      required=required,
      custom_plural='firewall policies',
      short_help='Short name of the firewall policy to {0}.'.format(operation),
      global_collection='compute.firewallPolicies',
  )


def FirewallPolicyArgument(required=False, plural=False, operation=None):
  return compute_flags.ResourceArgument(
      name='FIREWALL_POLICY',
      resource_name='firewall policy',
      completer=FirewallPoliciesCompleter,
      plural=plural,
      required=required,
      custom_plural='firewall policies',
      short_help='Short name or ID of the firewall policy to {0}.'.format(
          operation
      ),
      global_collection='compute.firewallPolicies',
  )


def FirewallPolicyAssociationsArgument(required=False, plural=False):
  return compute_flags.ResourceArgument(
      name='--firewall-policy',
      resource_name='firewall policy',
      plural=plural,
      required=required,
      short_help=(
          'Short name or ID of the firewall policy ID of the association.'
      ),
      global_collection='compute.firewallPolicies',
  )


def FirewallPolicyRuleArgument(
    required=False,
    plural=False,
    operation=None,
):
  return compute_flags.ResourceArgument(
      name='priority',
      resource_name='firewall policy rule',
      completer=FirewallPoliciesCompleter,
      plural=plural,
      required=required,
      global_collection='compute.firewallPolicies',
      short_help='Priority of the firewall policy rule to {}.'.format(
          operation
      ),
  )


def AddArgFirewallPolicyCreation(parser):
  """Adds the argument for firewall policy creaton."""
  parser.add_argument(
      '--short-name',
      required=True,
      help=(
          'A textual name of the firewall policy. The name must be 1-63 '
          'characters long, and comply with RFC 1035.'
      ),
  )

  group = parser.add_group(required=True, mutex=True)

  group.add_argument(
      '--organization',
      help=(
          'Organization in which the organization firewall policy'
          ' is to be created.'
      ),
  )

  group.add_argument(
      '--folder',
      help='Folder in which the organization firewall policy is to be created.',
  )

  parser.add_argument(
      '--description',
      help=(
          'An optional, textual description for the organization security'
          ' policy.'
      ),
  )


def AddArgsCloneRules(parser):
  """Adds the argument for firewall policy clone rules."""
  parser.add_argument(
      '--source-firewall-policy',
      required=True,
      help='The URL of the source firewall policy to copy the rules from.',
  )

  parser.add_argument(
      '--organization',
      help=(
          'Organization in which the organization firewall policy to copy the'
          ' rules to. Must be set if firewall-policy is short name.'
      ),
  )


def AddArgsForceStartProgressiveRollout(parser):
  """Adds the argument for firewall policy force start progressive rollout."""
  parser.add_argument(
      '--organization',
      help=(
          'Organization in which the organization firewall policy to start the'
          ' rollout of resides. Must be set if firewall-policy is short name.'
      ),
  )


def AddArgsListFirewallPolicy(parser):
  """Adds the argument for firewall policy list."""
  group = parser.add_group(required=True, mutex=True)

  group.add_argument(
      '--organization',
      help='Organization in which firewall policies are listed',
  )

  group.add_argument(
      '--folder', help='Folder in which firewall policies are listed'
  )


def AddArgsMove(parser):
  """Adds the argument for firewall policy move."""
  parser.add_argument(
      '--organization',
      help=(
          'Organization in which the organization firewall policy is to be'
          ' moved. Must be set if FIREWALL_POLICY is short name.'
      ),
  )

  parser.add_argument(
      '--folder',
      help='Folder to which the organization firewall policy is to be moved.',
  )


def AddArgsUpdateFirewallPolicy(parser):
  """Adds the argument for firewall policy update."""
  parser.add_argument(
      '--organization',
      help=(
          'Organization in which the organization firewall policy is to be'
          ' updated. Must be set if FIREWALL_POLICY is short name.'
      ),
  )
  parser.add_argument(
      '--description',
      help=(
          'An optional, textual description for the organization security'
          ' policy.'
      ),
  )


def AddPriority(parser, operation, is_plural=False):
  """Adds the priority argument to the argparse."""
  parser.add_argument(
      'name' + ('s' if is_plural else ''),
      metavar='PRIORITY',
      nargs='*' if is_plural else None,
      completer=FirewallPoliciesCompleter,
      help=(
          'Priority of the rule{0} to {1}. Rules are evaluated in order '
          'from highest priority to lowest priority where 0 is the highest '
          'priority and 2147483647 is the lowest priority.'.format(
              's' if is_plural else '', operation
          )
      ),
  )


def AddAction(parser, required=True):
  """Adds the action argument to the argparse."""
  parser.add_argument(
      '--action',
      choices=['allow', 'deny', 'goto_next', 'apply_security_profile_group'],
      type=lambda x: x.lower(),
      required=required,
      help='Action to take if the request matches the match condition.',
  )


def AddPacketMirroringAction(parser, required=True):
  """Adds the action argument to the argparse."""
  parser.add_argument(
      '--action',
      choices=['mirror', 'do_not_mirror', 'goto_next'],
      type=lambda x: x.lower(),
      required=required,
      help='Action to take if the request matches the match condition.',
  )


def AddFirewallPolicyId(parser, required=True, operation=None):
  """Adds the firewall policy ID argument to the argparse."""
  parser.add_argument(
      '--firewall-policy',
      required=required,
      help=(
          'Short name of the firewall policy into which the rule should '
          'be {}.'.format(operation)
      ),
  )


def AddOrganization(parser, required=True):
  parser.add_argument(
      '--organization',
      required=required,
      help=(
          'Organization which the organization firewall policy belongs to. '
          'Must be set if FIREWALL_POLICY is short name.'
      ),
  )


def AddSrcIpRanges(parser, required=False):
  """Adds the source IP ranges."""
  parser.add_argument(
      '--src-ip-ranges',
      type=arg_parsers.ArgList(),
      required=required,
      metavar='SRC_IP_RANGE',
      help='Source IP ranges to match for this rule.',
  )


def AddDestIpRanges(parser, required=False):
  """Adds the destination IP ranges."""
  parser.add_argument(
      '--dest-ip-ranges',
      type=arg_parsers.ArgList(),
      required=required,
      metavar='DEST_IP_RANGE',
      help='Destination IP ranges to match for this rule.',
  )


def AddLayer4Configs(parser, required=False):
  """Adds the layer4 configs."""
  parser.add_argument(
      '--layer4-configs',
      type=arg_parsers.ArgList(),
      required=required,
      metavar='LAYER4_CONFIG',
      help=(
          'A list of destination protocols and ports to which the firewall '
          'rule will apply.'
      ),
  )


def AddDirection(parser, required=False):
  """Adds the direction of the traffic to which the rule is applied."""
  parser.add_argument(
      '--direction',
      required=required,
      choices=['INGRESS', 'EGRESS'],
      help=(
          'Direction of the traffic the rule is applied. The default is to'
          ' apply on incoming traffic.'
      ),
  )


def AddEnableLogging(parser, required=False):
  """Adds the option to enable logging."""
  parser.add_argument(
      '--enable-logging',
      required=required,
      action=arg_parsers.StoreTrueFalseAction,
      help=(
          'Use this flag to enable logging of connections that allowed or '
          'denied by this rule.'
      ),
  )


def AddDisabled(parser, required=False):
  """Adds the option to disable the rule."""
  parser.add_argument(
      '--disabled',
      required=required,
      action=arg_parsers.StoreTrueFalseAction,
      help=(
          'Use this flag to disable the rule. Disabled rules will not affect '
          'traffic.'
      ),
  )


def AddNewPriority(parser, operation=None):
  """Adds the new firewall policy rule priority to the argparse."""
  parser.add_argument(
      '--new-priority',
      help=(
          'New priority for the rule to {}. Valid in [0, 65535]. '.format(
              operation
          )
      ),
  )


def AddTargetResources(parser, required=False):
  """Adds the target resources the rule is applied to."""
  parser.add_argument(
      '--target-resources',
      type=arg_parsers.ArgList(),
      metavar='TARGET_RESOURCES',
      required=required,
      help='List of URLs of target resources to which the rule is applied.',
  )


def AddTargetServiceAccounts(parser, required=False):
  """Adds the target service accounts for the rule."""
  parser.add_argument(
      '--target-service-accounts',
      type=arg_parsers.ArgList(),
      metavar='TARGET_SERVICE_ACCOUNTS',
      required=required,
      help='List of target service accounts for the rule.',
  )


def AddDescription(parser, required=False):
  """Adds the description of this rule."""
  parser.add_argument(
      '--description',
      required=required,
      help='An optional, textual description for the rule.',
  )


def AddArgsCreateAssociation(parser):
  """Adds the arguments of association creation."""
  parser.add_argument(
      '--firewall-policy',
      required=True,
      help='Security policy ID of the association.',
  )
  parser.add_argument(
      '--organization',
      help=(
          'ID of the organization in which the firewall policy is to be'
          ' associated. Must be set if FIREWALL_POLICY is short name.'
      ),
  )

  parser.add_argument(
      '--folder', help='ID of the folder with which the association is created.'
  )

  parser.add_argument(
      '--replace-association-on-target',
      action='store_true',
      default=False,
      required=False,
      help=(
          'By default, if you attempt to insert an association to an '
          'organization or folder resource that is already associated with a '
          'firewall policy the method will fail. If this is set, the existing '
          ' association will be deleted at the same time that the new '
          'association is created.'
      ),
  )

  parser.add_argument(
      '--name',
      help=(
          'Name to identify this association. If unspecified, the '
          'name will be set to "organization-{ORGANIZATION_ID}" '
          'or "folder-{FOLDER_ID}".'
      ),
  )


def AddArgsDeleteAssociation(parser):
  """Adds the arguments of association deletion."""
  parser.add_argument(
      'name',
      metavar='NAME',
      help='Name of the association to delete.',
  )

  parser.add_argument(
      '--organization',
      help=(
          'ID of the organization in which the firewall policy is to be'
          ' detached. Must be set if FIREWALL_POLICY is short name.'
      ),
  )


def AddArgsListAssociation(parser):
  """Adds the arguments of association list."""
  group = parser.add_group(required=True, mutex=True)

  group.add_argument(
      '--organization',
      help='ID of the organization with which the association is listed.',
  )

  group.add_argument(
      '--folder', help='ID of the folder with which the association is listed.'
  )


def AddSrcAddressGroups(parser):
  """Adds a source address group to this rule."""
  parser.add_argument(
      '--src-address-groups',
      type=arg_parsers.ArgList(),
      metavar='SOURCE_ADDRESS_GROUPS',
      required=False,
      help=(
          'Source address groups to match for this rule. '
          'Can only be specified if DIRECTION is ingress.'
      ),
  )


def AddDestAddressGroups(parser):
  """Adds a destination address group to this rule."""
  parser.add_argument(
      '--dest-address-groups',
      type=arg_parsers.ArgList(),
      metavar='DEST_ADDRESS_GROUPS',
      required=False,
      help=(
          'Destination address groups to match for this rule. '
          'Can only be specified if DIRECTION is egress.'
      ),
  )


def AddSrcFqdns(parser):
  """Adds source fqdns to this rule."""
  parser.add_argument(
      '--src-fqdns',
      type=arg_parsers.ArgList(),
      metavar='SOURCE_FQDNS',
      required=False,
      help=(
          'Source FQDNs to match for this rule. '
          'Can only be specified if DIRECTION is `ingress`.'
      ),
  )


def AddDestFqdns(parser):
  """Adds destination fqdns to this rule."""
  parser.add_argument(
      '--dest-fqdns',
      type=arg_parsers.ArgList(),
      metavar='DEST_FQDNS',
      required=False,
      help=(
          'Destination FQDNs to match for this rule. '
          'Can only be specified if DIRECTION is `egress`.'
      ),
  )


def AddSrcRegionCodes(parser, support_network_scopes=False):
  """Adds a source region code to this rule."""
  help_text = (
      'Source Region Code to match for this rule. Can only be specified if'
      ' DIRECTION is `ingress`. '
  )
  if support_network_scopes:
    help_text += (
        'Cannot be specified when the source network'
        ' type is NON_INTERNET, VPC_NETWORK or INTRA_VPC. '
    )
  parser.add_argument(
      '--src-region-codes',
      type=arg_parsers.ArgList(),
      metavar='SOURCE_REGION_CODES',
      required=False,
      help=help_text,
  )


def AddDestRegionCodes(parser, support_network_scopes=False):
  """Adds a destination region code to this rule."""
  help_text = (
      'Destination Region Code to match for this rule. Can only be specified if'
      ' DIRECTION is `egress`. '
  )
  if support_network_scopes:
    help_text += (
        'Cannot be specified when the source network type is NON_INTERNET. '
    )
  parser.add_argument(
      '--dest-region-codes',
      type=arg_parsers.ArgList(),
      metavar='DEST_REGION_CODES',
      required=False,
      help=help_text,
  )


def AddSrcThreatIntelligence(parser, support_network_scopes=False):
  """Adds source threat intelligence list names to this rule."""
  text_help = (
      'Source Threat Intelligence lists to match for this rule. '
      'Can only be specified if DIRECTION is `ingress`. '
      'The available lists can be found here: '
      'https://cloud.google.com/vpc/docs/firewall-policies-rule-details#threat-intelligence-fw-policy.'
  )
  if support_network_scopes:
    text_help = (
        'Source Threat Intelligence lists to match for this rule. '
        'Can only be specified if DIRECTION is `ingress`. Cannot be specified'
        ' when the source network type is NON_INTERNET, VPC_NETWORK or'
        ' INTRA_VPC. '
        'The available lists can be found here: '
        'https://cloud.google.com/vpc/docs/firewall-policies-rule-details#threat-intelligence-fw-policy.'
    )
  parser.add_argument(
      '--src-threat-intelligence',
      type=arg_parsers.ArgList(),
      metavar='SOURCE_THREAT_INTELLIGENCE_LISTS',
      required=False,
      help=text_help,
  )


def AddDestThreatIntelligence(parser, support_network_scopes=False):
  """Adds destination threat intelligence list names to this rule."""
  text_help = (
      'Destination Threat Intelligence lists to match for this rule. '
      'Can only be specified if DIRECTION is `egress`. '
      'The available lists can be found here: '
      'https://cloud.google.com/vpc/docs/firewall-policies-rule-details#threat-intelligence-fw-policy.'
  )
  if support_network_scopes:
    text_help = (
        'Destination Threat Intelligence lists to match for this rule. '
        'Can only be specified if DIRECTION is `egress`. Cannot be specified'
        ' when source network type is NON_INTERNET. '
        'The available lists can be found here: '
        'https://cloud.google.com/vpc/docs/firewall-policies-rule-details#threat-intelligence-fw-policy.'
    )
  parser.add_argument(
      '--dest-threat-intelligence',
      type=arg_parsers.ArgList(),
      metavar='DEST_THREAT_INTELLIGENCE_LISTS',
      required=False,
      help=text_help,
  )


def AddSecurityProfileGroup(parser):
  """Adds security profile group to this rule."""
  parser.add_argument(
      '--security-profile-group',
      metavar='SECURITY_PROFILE_GROUP',
      required=False,
      help=(
          'An org-based security profile group to be used with'
          ' apply_security_profile_group action. Allowed formats are: a)'
          ' http(s)://<namespace>/<api>/organizations/<org_id>/locations/global/securityProfileGroups/<profile>'
          ' b) (//)<namespace>/organizations/<org_id>/locations/global/securityProfileGroups/<profile>'
          ' c) <profile>. In case "c" `gcloud` CLI will create a reference'
          ' matching format "a", but to make it work'
          ' CLOUDSDK_API_ENDPOINT_OVERRIDES_NETWORKSECURITY property must be'
          ' set. In order to set this property, please run the command `gcloud'
          ' config set api_endpoint_overrides/networksecurity'
          ' https://<namespace>/`.'
      ),
  )


def AddMirroringSecurityProfileGroup(parser):
  """Adds security profile group to this rule."""
  parser.add_argument(
      '--security-profile-group',
      metavar='SECURITY_PROFILE_GROUP',
      required=False,
      help=(
          'An org-based security profile group to be used with mirror action.'
          ' Allowed formats are: a)'
          ' http(s)://<namespace>/<api>/organizations/<org_id>/locations/global/securityProfileGroups/<profile>'
          ' b) (//)<namespace>/organizations/<org_id>/locations/global/securityProfileGroups/<profile>'
          ' c) <profile>. In case "c" `gcloud` CLI will create a reference'
          ' matching format "a", but to make it work'
          ' CLOUDSDK_API_ENDPOINT_OVERRIDES_NETWORKSECURITY property must be'
          ' set. In order to set this property, please run the command `gcloud'
          ' config set api_endpoint_overrides/networksecurity'
          ' https://<namespace>/`.'
      ),
  )


def AddTlsInspect(parser):
  """Adds the option to turn on TLS decryption on the rule."""
  parser.add_argument(
      '--tls-inspect',
      required=False,
      action=arg_parsers.StoreTrueFalseAction,
      help=(
          'Use this flag to indicate whether TLS traffic should be inspected '
          'using the TLS inspection policy when the security profile group '
          'is applied. Default: no TLS inspection.'
      ),
  )


def AddSrcNetworkScope(parser, required=False):
  """Adds source network scope to this rule."""
  parser.add_argument(
      '--src-network-scope',
      required=required,
      hidden=True,
      help=(
          'Deprecated. Use --src-network-type instead.'
          ' Use this flag to indicate that the rule should match internet,'
          ' non-internet traffic or traffic coming from the network specified'
          ' by --src-network. It applies to ingress rules. Valid values are'
          ' INTERNET, NON_INTERNET, VPC_NETWORKS and INTRA_VPC. Use empty'
          ' string to clear the field.'
      ),
  )


def AddSrcNetworkType(parser, required=False):
  """Adds source network type to this rule."""
  parser.add_argument(
      '--src-network-type',
      required=required,
      help=(
          'Use this flag to indicate that the rule should match internet,'
          ' non-internet traffic or traffic coming from the network specified'
          ' by --src-network. It applies to ingress rules. Valid values are'
          ' INTERNET, NON_INTERNET, VPC_NETWORKS and INTRA_VPC. Use empty'
          ' string to clear the field.'
      ),
  )


def AddSrcNetworks(parser):
  """Adds source network urls list to this rule."""
  parser.add_argument(
      '--src-networks',
      type=arg_parsers.ArgList(),
      metavar='SRC_NETWORKS',
      required=False,
      help=(
          'The source VPC networks to  match for this rule.  It can only be'
          ' specified when --src-network-type is VPC_NETWORKS. It applies to '
          ' ingress rules. It accepts full or partial URLs.'
      ),
  )


def AddDestNetworkScope(parser, required=False):
  """Adds destination network scope to this rule."""
  parser.add_argument(
      '--dest-network-scope',
      required=required,
      hidden=True,
      help=(
          'Deprecated. Use --dest-network-type instead.'
          ' Use this flag to indicate that the rule should match internet or'
          ' non-internet traffic. It applies to destination traffic for egress'
          ' rules. Valid values are INTERNET and NON_INTERNET. Use'
          ' empty string to clear the field.'
      ),
  )


def AddDestNetworkType(parser, required=False):
  """Adds destination network type to this rule."""
  parser.add_argument(
      '--dest-network-type',
      required=required,
      help=(
          'Use this flag to indicate that the rule should match internet or'
          ' non-internet traffic. It applies to destination traffic for egress'
          ' rules. Valid values are INTERNET and NON_INTERNET. Use'
          ' empty string to clear the field.'
      ),
  )


def AddSrcSecureTags(parser, required=False, support_network_scopes=False):
  """Adds a  source secure tag to this rule."""
  help_text = (
      'A list of instance secure tags indicating the set of instances on the'
      ' network to which the rule applies if all other fields match. Either'
      ' --src-ip-ranges or --src-secure-tags must be specified for ingress'
      ' traffic. If both --src-ip-ranges and --src-secure-tags are specified,'
      ' an inbound connection is allowed if either the range of the source'
      ' matches --src-ip-ranges or the tag of the source matches'
      ' --src-secure-tags. Secure Tags can be assigned to instances during'
      ' instance creation.'
  )
  if support_network_scopes:
    help_text += (
        ' Secure tags cannot be specified if source network type is INTERNET.'
    )

  parser.add_argument(
      '--src-secure-tags',
      type=arg_parsers.ArgList(),
      metavar='SOURCE_SECURE_TAGS',
      required=required,
      help=help_text,
  )


def AddTargetSecureTags(parser, required=False):
  """Adds a target secure tag to this rule."""
  parser.add_argument(
      '--target-secure-tags',
      type=arg_parsers.ArgList(),
      metavar='TARGET_SECURE_TAGS',
      required=required,
      help=(
          'An optional, list of target secure tags with a name of the '
          'format tagValues/ or full namespaced name'
      ),
  )