HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/surface/compute/firewall_rules/update.py
# -*- coding: utf-8 -*- #
# Copyright 2014 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Command for updating firewall rules."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

from apitools.base.py import encoding

from googlecloudsdk.api_lib.compute import base_classes
from googlecloudsdk.api_lib.compute import firewalls_utils
from googlecloudsdk.api_lib.compute import utils as compute_api
from googlecloudsdk.api_lib.util import apis
from googlecloudsdk.calliope import base
from googlecloudsdk.calliope import exceptions as calliope_exceptions
from googlecloudsdk.command_lib.compute import exceptions
from googlecloudsdk.command_lib.compute.firewall_rules import flags


@base.ReleaseTracks(base.ReleaseTrack.GA)
class UpdateFirewall(base.UpdateCommand):
  """Update a firewall rule."""

  with_egress_firewall = True
  with_service_account = True

  FIREWALL_RULE_ARG = None

  @classmethod
  def Args(cls, parser):
    messages = apis.GetMessagesModule('compute',
                                      compute_api.COMPUTE_GA_API_VERSION)
    cls.FIREWALL_RULE_ARG = flags.FirewallRuleArgument()
    cls.FIREWALL_RULE_ARG.AddArgument(parser, operation_type='update')
    firewalls_utils.AddCommonArgs(
        parser,
        for_update=True,
        with_egress_support=cls.with_egress_firewall,
        with_service_account=cls.with_service_account)
    firewalls_utils.AddArgsForServiceAccount(parser, for_update=True)
    flags.AddEnableLogging(parser)
    flags.AddLoggingMetadata(parser, messages)

  def ValidateArgument(self, messages, args):
    self.new_allowed = firewalls_utils.ParseRules(
        args.allow, messages, firewalls_utils.ActionType.ALLOW)

    args_unset = all(
        x is None
        for x in (args.allow, args.description, args.source_ranges,
                  args.source_tags, args.target_tags))
    if self.with_egress_firewall:
      args_unset = args_unset and all(
          x is None
          for x in (args.destination_ranges, args.priority, args.rules))
    if self.with_service_account:
      args_unset = args_unset and all(
          x is None
          for x in (args.source_service_accounts, args.target_service_accounts))
    args_unset = args_unset and args.disabled is None
    args_unset = (args_unset and args.enable_logging is None)
    args_unset = args_unset and not args.logging_metadata
    if args_unset:
      raise exceptions.UpdatePropertyError(
          'At least one property must be modified.')

    if args.rules and args.allow:
      raise firewalls_utils.ArgumentValidationError(
          'Can NOT specify --rules and --allow in the same request.')

  def Run(self, args):
    """Issues requests necessary to update the Firewall rules."""
    holder = base_classes.ComputeApiHolder(self.ReleaseTrack())
    client = holder.client

    self.ValidateArgument(client.messages, args)
    # Set the resource reference which is used in composing resource-get
    # request.
    resource_reference = self.FIREWALL_RULE_ARG.ResolveAsResource(
        args, holder.resources)
    get_request = self._GetGetRequest(client, resource_reference)
    cleared_fields = []
    objects = client.MakeRequests([get_request])

    new_object = self.Modify(client, args, objects[0], cleared_fields)

    # If existing object is equal to the proposed object or if
    # Modify() returns None, then there is no work to be done, so we
    # print the resource and exit.
    if not new_object or objects[0] == new_object:
      return objects[0]

    with client.apitools_client.IncludeFields(cleared_fields):
      resource_list = client.MakeRequests(
          [self._GetSetRequest(client, resource_reference, new_object)])

    return resource_list

  def _GetGetRequest(self, client, resource_reference):
    """Returns the request for the existing Firewall resource."""
    return (client.apitools_client.firewalls, 'Get',
            client.messages.ComputeFirewallsGetRequest(
                firewall=resource_reference.Name(),
                project=resource_reference.project))

  def _GetSetRequest(self, client, resource_reference, replacement):
    request = client.messages.ComputeFirewallsPatchRequest(
        firewall=replacement.name,
        firewallResource=replacement,
        project=resource_reference.project)
    return (client.apitools_client.firewalls, 'Patch', request)

  def _AddLoggingMetadata(self, messages, args, log_config):
    if args.IsSpecified('logging_metadata'):
      if log_config is None or not log_config.enable:
        raise calliope_exceptions.InvalidArgumentException(
            '--logging-metadata',
            'cannot toggle logging metadata if logging is not enabled.')

      log_config.metadata = flags.GetLoggingMetadataArg(
          messages).GetEnumForChoice(args.logging_metadata)

  def Modify(self, client, args, existing, cleared_fields):
    """Returns a modified Firewall message and included fields."""
    if args.allow:
      allowed = self.new_allowed
    elif args.allow is None:
      allowed = existing.allowed
    else:
      cleared_fields.append('allowed')
      allowed = []

    if args.description:
      description = args.description
    elif args.description is None:
      description = existing.description
    else:
      cleared_fields.append('description')
      description = None

    if args.source_ranges:
      source_ranges = args.source_ranges
    elif args.source_ranges is None:
      source_ranges = existing.sourceRanges
    else:
      cleared_fields.append('sourceRanges')
      source_ranges = []

    if args.source_tags:
      source_tags = args.source_tags
    elif args.source_tags is None:
      source_tags = existing.sourceTags
    else:
      cleared_fields.append('sourceTags')
      source_tags = []

    if args.target_tags:
      target_tags = args.target_tags
    elif args.target_tags is None:
      target_tags = existing.targetTags
    else:
      cleared_fields.append('targetTags')
      target_tags = []

    denied = []
    if args.rules:
      if existing.allowed:
        allowed = firewalls_utils.ParseRules(args.rules, client.messages,
                                             firewalls_utils.ActionType.ALLOW)
      else:
        denied = firewalls_utils.ParseRules(args.rules, client.messages,
                                            firewalls_utils.ActionType.DENY)
    elif args.rules is not None:
      if existing.allowed:
        cleared_fields.append('allowed')
        allowed = []
      else:
        cleared_fields.append('denied')
        denied = []

    direction = existing.direction

    if args.priority is None:
      priority = existing.priority
    else:
      priority = args.priority

    destination_ranges = []
    if args.destination_ranges:
      destination_ranges = args.destination_ranges
    elif args.destination_ranges is None:
      destination_ranges = existing.destinationRanges
    else:
      cleared_fields.append('destinationRanges')

    source_service_accounts = []
    if args.source_service_accounts:
      source_service_accounts = args.source_service_accounts
    elif args.source_service_accounts is None:
      source_service_accounts = existing.sourceServiceAccounts
    else:
      cleared_fields.append('sourceServiceAccounts')

    target_service_accounts = []
    if args.target_service_accounts:
      target_service_accounts = args.target_service_accounts
    elif args.target_service_accounts is None:
      target_service_accounts = existing.targetServiceAccounts
    else:
      cleared_fields.append('targetServiceAccounts')

    if args.IsSpecified('enable_logging'):
      log_config = client.messages.FirewallLogConfig(enable=args.enable_logging)
      self._AddLoggingMetadata(client.messages, args, log_config)
    else:
      log_config = (
          encoding.CopyProtoMessage(existing.logConfig)
          if existing.logConfig is not None else None)
      self._AddLoggingMetadata(client.messages, args, log_config)

    new_firewall = client.messages.Firewall(
        name=existing.name,
        direction=direction,
        priority=priority,
        allowed=allowed,
        denied=denied,
        description=description,
        network=existing.network,
        sourceRanges=source_ranges,
        sourceTags=source_tags,
        destinationRanges=destination_ranges,
        targetTags=target_tags,
        sourceServiceAccounts=source_service_accounts,
        targetServiceAccounts=target_service_accounts,
        logConfig=log_config)

    if args.disabled is not None:
      new_firewall.disabled = args.disabled

    return new_firewall


@base.ReleaseTracks(base.ReleaseTrack.BETA)
class BetaUpdateFirewall(UpdateFirewall):
  """Update a firewall rule."""

  @classmethod
  def Args(cls, parser):
    messages = apis.GetMessagesModule('compute',
                                      compute_api.COMPUTE_BETA_API_VERSION)
    cls.FIREWALL_RULE_ARG = flags.FirewallRuleArgument()
    cls.FIREWALL_RULE_ARG.AddArgument(parser, operation_type='update')
    firewalls_utils.AddCommonArgs(
        parser,
        for_update=True,
        with_egress_support=cls.with_egress_firewall,
        with_service_account=cls.with_service_account)
    firewalls_utils.AddArgsForServiceAccount(parser, for_update=True)
    flags.AddEnableLogging(parser)
    flags.AddLoggingMetadata(parser, messages)


@base.ReleaseTracks(base.ReleaseTrack.ALPHA)
class AlphaUpdateFirewall(BetaUpdateFirewall):
  """Update a firewall rule."""

  @classmethod
  def Args(cls, parser):
    messages = apis.GetMessagesModule('compute',
                                      compute_api.COMPUTE_ALPHA_API_VERSION)
    cls.FIREWALL_RULE_ARG = flags.FirewallRuleArgument()
    cls.FIREWALL_RULE_ARG.AddArgument(parser, operation_type='update')
    firewalls_utils.AddCommonArgs(
        parser,
        for_update=True,
        with_egress_support=cls.with_egress_firewall,
        with_service_account=cls.with_service_account)
    firewalls_utils.AddArgsForServiceAccount(parser, for_update=True)
    flags.AddEnableLogging(parser)
    flags.AddLoggingMetadata(parser, messages)


UpdateFirewall.detailed_help = {
    'brief':
        'Update a firewall rule.',
    'DESCRIPTION':
        """\
        *{command}* is used to update firewall rules that allow/deny
        incoming/outgoing traffic. The firewall rule will only be updated for
        arguments that are specifically passed. Other attributes will remain
        unaffected. The `action` flag (whether to allow or deny matching
        traffic) cannot be defined when updating a firewall rule; use
        `gcloud compute firewall-rules delete` to remove the rule instead.
        """,
    'EXAMPLES': """\
        To update the firewall rule ``RULE'' to enable logging, run:

          $ {command} RULE --enable-logging
        """,
}