HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/command_lib/compute/routers/nats/nats_utils.py
# -*- coding: utf-8 -*- #
# Copyright 2018 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Code that's shared between multiple NAT subcommands."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

from googlecloudsdk.calliope import exceptions as calliope_exceptions
from googlecloudsdk.command_lib.compute import flags as compute_flags
from googlecloudsdk.command_lib.compute import scope as compute_scope
from googlecloudsdk.command_lib.compute.networks.subnets import flags as subnet_flags
from googlecloudsdk.command_lib.compute.routers.nats import flags as nat_flags
from googlecloudsdk.core import exceptions as core_exceptions
from googlecloudsdk.core import yaml
from googlecloudsdk.core.util import files
import six


class NatNotFoundError(core_exceptions.Error):
  """Raised when a NAT is not found."""

  def __init__(self, name):
    self.name = name
    msg = 'NAT `{0}` not found'.format(name)
    super(NatNotFoundError, self).__init__(msg)


class IpAllocateOptionShouldNotBeSpecifiedError(core_exceptions.Error):
  """Raised when IP Allocation option is specified for private NAT."""

  def __init__(self):
    msg = ('--nat-external-ip-pool and --auto-allocate-nat-external-ips '
           'cannot be specified for Private NAT.')
    super(IpAllocateOptionShouldNotBeSpecifiedError, self).__init__(msg)


class IpAllocationUnspecifiedError(core_exceptions.Error):
  """Raised when IP Allocation option is not specified for public NAT."""

  def __init__(self):
    msg = ('Either --nat-external-ip-pool or --auto-allocate-nat-external-ips '
           'must be specified for Public NAT.')
    super(IpAllocationUnspecifiedError, self).__init__(msg)


class SubnetOptionOrSubnet64OptionShouldBeSpecified(core_exceptions.Error):
  """Raised when not ipv4 nor ipv6 subnet option is specified."""

  def __init__(self):
    msg = (
        'At least one of: --nat-all-subnet-ip-ranges,'
        ' --nat-primary-subnet-ip-ranges, --nat-custom-subnet-ip-ranges,'
        ' --nat64-all-v6-subnet-ip-ranges, --nat64-custom-v6-subnet-ip-ranges'
        ' should be specified.'
    )
    super(SubnetOptionOrSubnet64OptionShouldBeSpecified, self).__init__(msg)


def FindNatOrRaise(router, nat_name):
  """Returns the nat with the given name in the given router."""
  for nat in router.nats:
    if nat_name == nat.name:
      return nat
  raise NatNotFoundError(nat_name)


def CreateNatMessage(args, compute_holder):
  """Creates a NAT message from the specified arguments."""
  params = {'name': args.name}

  _AddSubnetOptionsToParams(args, compute_holder, params)

  if args.type is not None:
    params['type'] = (
        compute_holder.client.messages.RouterNat.TypeValueValuesEnum(args.type))

  is_private = args.type == 'PRIVATE'
  is_ip_allocation_specified = (
      args.auto_allocate_nat_external_ips or args.nat_external_ip_pool)
  if is_private:
    if is_ip_allocation_specified:
      raise IpAllocateOptionShouldNotBeSpecifiedError()
  else:
    if not is_ip_allocation_specified:
      raise IpAllocationUnspecifiedError()
    option, nat_ips = _ParseNatIpFields(args, compute_holder)
    params['natIpAllocateOption'] = option
    params['natIps'] = nat_ips

  if args.auto_network_tier is not None:
    params['autoNetworkTier'] = (
        compute_holder.client.messages.RouterNat.AutoNetworkTierValueValuesEnum(
            args.auto_network_tier))

  if args.endpoint_types is not None:
    params['endpointTypes'] = [
        compute_holder.client.messages.RouterNat.EndpointTypesValueListEntryValuesEnum(
            endpoint_type
        )
        for endpoint_type in args.endpoint_types
    ]

  params['udpIdleTimeoutSec'] = args.udp_idle_timeout
  params['icmpIdleTimeoutSec'] = args.icmp_idle_timeout
  params['tcpEstablishedIdleTimeoutSec'] = args.tcp_established_idle_timeout
  params['tcpTransitoryIdleTimeoutSec'] = args.tcp_transitory_idle_timeout
  params['tcpTimeWaitTimeoutSec'] = args.tcp_time_wait_timeout
  params['minPortsPerVm'] = args.min_ports_per_vm
  params['maxPortsPerVm'] = args.max_ports_per_vm
  params['enableDynamicPortAllocation'] = args.enable_dynamic_port_allocation

  if args.enable_logging is not None or args.log_filter is not None:
    log_config = compute_holder.client.messages.RouterNatLogConfig()

    log_config.enable = args.enable_logging
    if args.log_filter is not None:
      log_config.filter = _TranslateLogFilter(args.log_filter, compute_holder)

    params['logConfig'] = log_config

  if args.enable_endpoint_independent_mapping is not None:
    params['enableEndpointIndependentMapping'] = (
        args.enable_endpoint_independent_mapping)

  if args.rules:
    params['rules'] = _ParseRulesFromYamlFile(args.rules, compute_holder)

  return compute_holder.client.messages.RouterNat(**params)


def _AddSubnetOptionsToParams(args, compute_holder, params):
  """Adds subnet options to the params dict."""
  source_ipv4_subnets_to_nat, ipv4_subnets = _ParseIpv4SubnetFields(
      args, compute_holder
  )
  source_ipv6_subnets_to_nat, ipv6_subnets = _ParseIpv6SubnetFields(
      args, compute_holder
  )

  if not source_ipv4_subnets_to_nat and not source_ipv6_subnets_to_nat:
    raise SubnetOptionOrSubnet64OptionShouldBeSpecified()

  if source_ipv4_subnets_to_nat:
    params['sourceSubnetworkIpRangesToNat'] = source_ipv4_subnets_to_nat
  if ipv4_subnets:
    params['subnetworks'] = ipv4_subnets
  if source_ipv6_subnets_to_nat:
    params['sourceSubnetworkIpRangesToNat64'] = source_ipv6_subnets_to_nat
  if ipv6_subnets:
    params['nat64Subnetworks'] = ipv6_subnets


def UpdateNatMessage(nat, args, compute_holder):
  """Updates a NAT message with the specified arguments."""
  if (
      args.subnet_option
      in [
          nat_flags.SubnetOption.ALL_RANGES,
          nat_flags.SubnetOption.PRIMARY_RANGES,
      ]
      or args.nat_custom_subnet_ip_ranges
  ):
    ranges_to_nat, subnetworks = _ParseSubnetFields(args, compute_holder)
    nat.sourceSubnetworkIpRangesToNat = ranges_to_nat
    nat.subnetworks = subnetworks

  if (
      args.subnet_ipv6_option is nat_flags.SubnetIpv6Option.ALL_IPV6_SUBNETS
      or args.nat64_custom_v6_subnet_ip_ranges
  ):
    ranges_to_nat, subnetworks = _ParseIpv6SubnetFields(args, compute_holder)
    nat.sourceSubnetworkIpRangesToNat64 = ranges_to_nat
    nat.nat64Subnetworks = subnetworks

  if args.clear_nat_subnet_ip_ranges:
    nat.sourceSubnetworkIpRangesToNat = None
    nat.subnetworks = []

  if args.clear_nat64_subnet_ip_ranges:
    nat.sourceSubnetworkIpRangesToNat64 = None
    nat.nat64Subnetworks = []

  if args.nat_external_drain_ip_pool:
    drain_nat_ips = nat_flags.DRAIN_NAT_IP_ADDRESSES_ARG.ResolveAsResource(
        args, compute_holder.resources)
    nat.drainNatIps = [six.text_type(ip) for ip in drain_nat_ips]

    # Remove a IP from nat_ips if it is going to be drained.
    if not args.nat_external_ip_pool:
      nat.natIps = [
          ip for ip in nat.natIps if not _ContainIp(drain_nat_ips, ip)
      ]

  if args.clear_nat_external_drain_ip_pool:
    nat.drainNatIps = []

  if args.auto_allocate_nat_external_ips or args.nat_external_ip_pool:
    option, nat_ips = _ParseNatIpFields(args, compute_holder)
    nat.natIpAllocateOption = option
    nat.natIps = nat_ips

  if args.auto_network_tier is not None:
    nat.autoNetworkTier = (
        compute_holder.client.messages.RouterNat.AutoNetworkTierValueValuesEnum(
            args.auto_network_tier))

  if args.udp_idle_timeout is not None:
    nat.udpIdleTimeoutSec = args.udp_idle_timeout
  elif args.clear_udp_idle_timeout:
    nat.udpIdleTimeoutSec = None

  if args.icmp_idle_timeout is not None:
    nat.icmpIdleTimeoutSec = args.icmp_idle_timeout
  elif args.clear_icmp_idle_timeout:
    nat.icmpIdleTimeoutSec = None

  if args.tcp_established_idle_timeout is not None:
    nat.tcpEstablishedIdleTimeoutSec = args.tcp_established_idle_timeout
  elif args.clear_tcp_established_idle_timeout:
    nat.tcpEstablishedIdleTimeoutSec = None

  if args.tcp_transitory_idle_timeout is not None:
    nat.tcpTransitoryIdleTimeoutSec = args.tcp_transitory_idle_timeout
  elif args.clear_tcp_transitory_idle_timeout:
    nat.tcpTransitoryIdleTimeoutSec = None

  if args.tcp_time_wait_timeout is not None:
    nat.tcpTimeWaitTimeoutSec = args.tcp_time_wait_timeout
  elif args.clear_tcp_time_wait_timeout:
    nat.tcpTimeWaitTimeoutSec = None

  if args.min_ports_per_vm is not None:
    nat.minPortsPerVm = args.min_ports_per_vm
  elif args.clear_min_ports_per_vm:
    nat.minPortsPerVm = None

  if args.max_ports_per_vm is not None:
    nat.maxPortsPerVm = args.max_ports_per_vm
  elif args.clear_max_ports_per_vm:
    nat.maxPortsPerVm = None

  if args.enable_dynamic_port_allocation is not None:
    nat.enableDynamicPortAllocation = args.enable_dynamic_port_allocation

  if args.enable_logging is not None or args.log_filter is not None:
    nat.logConfig = (
        nat.logConfig or compute_holder.client.messages.RouterNatLogConfig())
  if args.enable_logging is not None:
    nat.logConfig.enable = args.enable_logging
  if args.log_filter is not None:
    nat.logConfig.filter = _TranslateLogFilter(args.log_filter, compute_holder)

  if args.enable_endpoint_independent_mapping is not None:
    nat.enableEndpointIndependentMapping = (
        args.enable_endpoint_independent_mapping)

  if args.rules:
    nat.rules = _ParseRulesFromYamlFile(args.rules, compute_holder)

  return nat


class SubnetUsage(object):
  """Helper object to store what ranges of a subnetwork are used for NAT."""

  def __init__(self):
    self.using_primary = False
    self.using_all = False
    self.secondary_ranges = list()


def _ParseSubnetFields(args, compute_holder):
  """Parses arguments related to subnets to use for NAT."""
  subnetworks = list()
  messages = compute_holder.client.messages
  if args.subnet_option == nat_flags.SubnetOption.ALL_RANGES:
    ranges_to_nat = (
        messages.RouterNat.SourceSubnetworkIpRangesToNatValueValuesEnum
        .ALL_SUBNETWORKS_ALL_IP_RANGES)
  elif args.subnet_option == nat_flags.SubnetOption.PRIMARY_RANGES:
    ranges_to_nat = (
        messages.RouterNat.SourceSubnetworkIpRangesToNatValueValuesEnum
        .ALL_SUBNETWORKS_ALL_PRIMARY_IP_RANGES)
  else:
    ranges_to_nat = (
        messages.RouterNat.SourceSubnetworkIpRangesToNatValueValuesEnum
        .LIST_OF_SUBNETWORKS)

    # Mapping of subnet names to SubnetUsage.
    subnet_usages = dict()

    for custom_subnet_arg in args.nat_custom_subnet_ip_ranges:
      colons = custom_subnet_arg.count(':')
      range_option = None
      if colons > 1:
        raise calliope_exceptions.InvalidArgumentException(
            '--nat-custom-subnet-ip-ranges',
            ('Each specified subnet must be of the form SUBNETWORK '
             'or SUBNETWORK:RANGE_NAME'))
      elif colons == 1:
        subnet_name, range_option = custom_subnet_arg.split(':')
      else:
        subnet_name = custom_subnet_arg

      if subnet_name not in subnet_usages:
        subnet_usages[subnet_name] = SubnetUsage()

      if range_option is not None:
        if range_option == 'ALL':
          subnet_usages[subnet_name].using_all = True
        else:
          subnet_usages[subnet_name].secondary_ranges.append(range_option)
      else:
        subnet_usages[subnet_name].using_primary = True

    for subnet_name in subnet_usages:
      subnet_ref = subnet_flags.SubnetworkResolver().ResolveResources(
          [subnet_name],
          compute_scope.ScopeEnum.REGION,
          args.region,
          compute_holder.resources,
          scope_lister=compute_flags.GetDefaultScopeLister(
              compute_holder.client))

      subnet_usage = subnet_usages[subnet_name]

      options = []
      if subnet_usage.using_all:
        options.append(
            messages.RouterNatSubnetworkToNat
            .SourceIpRangesToNatValueListEntryValuesEnum.ALL_IP_RANGES)
      if subnet_usage.using_primary:
        options.append(
            messages.RouterNatSubnetworkToNat
            .SourceIpRangesToNatValueListEntryValuesEnum.PRIMARY_IP_RANGE)
      if subnet_usage.secondary_ranges:
        options.append(messages.RouterNatSubnetworkToNat
                       .SourceIpRangesToNatValueListEntryValuesEnum
                       .LIST_OF_SECONDARY_IP_RANGES)

      subnetworks.append({
          'name': six.text_type(subnet_ref[0]),
          'sourceIpRangesToNat': options,
          'secondaryIpRangeNames': subnet_usage.secondary_ranges
      })
  # Sorted for test stability.
  return (ranges_to_nat, sorted(subnetworks, key=lambda subnet: subnet['name']))


def _ParseIpv4SubnetFields(args, compute_holder):
  """Parses arguments related to ipv4 subnets to use for NAT."""
  if (
      args.subnet_option is nat_flags.SubnetOption.CUSTOM_RANGES
      and not args.nat_custom_subnet_ip_ranges
  ):
    return None, []
  return _ParseSubnetFields(args, compute_holder)


def _ParseIpv6SubnetFields(args, compute_holder):
  """Parses arguments related to ipv6 subnets to use for NAT."""
  if (
      args.subnet_ipv6_option is nat_flags.SubnetIpv6Option.LIST_OF_IPV6_SUBNETS
      and not args.nat64_custom_v6_subnet_ip_ranges
  ):
    return None, []

  subnets = []
  messages = compute_holder.client.messages
  if args.subnet_ipv6_option is nat_flags.SubnetIpv6Option.ALL_IPV6_SUBNETS:
    return (
        messages.RouterNat.SourceSubnetworkIpRangesToNat64ValueValuesEnum.ALL_IPV6_SUBNETWORKS,
        [],
    )

  for subnet_name in args.nat64_custom_v6_subnet_ip_ranges:
    subnet_ref = subnet_flags.SubnetworkResolver().ResolveResources(
        [subnet_name],
        compute_scope.ScopeEnum.REGION,
        args.region,
        compute_holder.resources,
        scope_lister=compute_flags.GetDefaultScopeLister(compute_holder.client),
    )
    subnets.append({'name': six.text_type(subnet_ref[0])})

  return (
      messages.RouterNat.SourceSubnetworkIpRangesToNat64ValueValuesEnum.LIST_OF_IPV6_SUBNETWORKS,
      sorted(subnets, key=lambda subnet: subnet['name']),
  )


def _ParseNatIpFields(args, compute_holder):
  messages = compute_holder.client.messages
  if args.auto_allocate_nat_external_ips:
    return (messages.RouterNat.NatIpAllocateOptionValueValuesEnum.AUTO_ONLY,
            list())
  return (messages.RouterNat.NatIpAllocateOptionValueValuesEnum.MANUAL_ONLY, [
      six.text_type(address)
      for address in nat_flags.IP_ADDRESSES_ARG.ResolveAsResource(
          args, compute_holder.resources)
  ])


def _TranslateLogFilter(filter_str, compute_holder):
  """Translates the specified log filter to the enum value."""
  if filter_str == 'ALL':
    return (compute_holder.client.messages.RouterNatLogConfig
            .FilterValueValuesEnum.ALL)
  if filter_str == 'TRANSLATIONS_ONLY':
    return (compute_holder.client.messages.RouterNatLogConfig
            .FilterValueValuesEnum.TRANSLATIONS_ONLY)
  if filter_str == 'ERRORS_ONLY':
    return (compute_holder.client.messages.RouterNatLogConfig
            .FilterValueValuesEnum.ERRORS_ONLY)

  raise calliope_exceptions.InvalidArgumentException(
      '--log-filter', ('--log-filter must be ALL, TRANSLATIONS_ONLY '
                       'or ERRORS_ONLY'))


def _ContainIp(ip_list, target_ip):
  """Returns true if target ip is in the list."""
  for ip in ip_list:
    if ip.RelativeName() in target_ip:
      return True
  return False


def _ParseRulesFromYamlFile(file_path, compute_holder):
  """Parses NAT Rules from the given YAML file."""
  with files.FileReader(file_path) as import_file:
    rules_yaml = yaml.load(import_file)
    if 'rules' not in rules_yaml:
      raise calliope_exceptions.InvalidArgumentException(
          '--rules', 'The YAML file must contain the \'rules\' attribute')
    return [
        _CreateRule(rule_yaml, compute_holder)
        for rule_yaml in rules_yaml['rules']
    ]


def _CreateRule(rule_yaml, compute_holder):
  """Creates a Rule object from the given parsed YAML."""
  rule = compute_holder.client.messages.RouterNatRule()
  if 'ruleNumber' in rule_yaml:
    rule.ruleNumber = rule_yaml['ruleNumber']
  if 'match' in rule_yaml:
    rule.match = rule_yaml['match']
  if 'action' in rule_yaml:
    action_yaml = rule_yaml['action']
    rule.action = compute_holder.client.messages.RouterNatRuleAction()
    if 'sourceNatActiveIps' in action_yaml:
      rule.action.sourceNatActiveIps = action_yaml['sourceNatActiveIps']
    if 'sourceNatDrainIps' in action_yaml:
      rule.action.sourceNatDrainIps = action_yaml['sourceNatDrainIps']
    if 'sourceNatActiveRanges' in action_yaml:
      rule.action.sourceNatActiveRanges = action_yaml['sourceNatActiveRanges']
    if 'sourceNatDrainRanges' in action_yaml:
      rule.action.sourceNatDrainRanges = action_yaml['sourceNatDrainRanges']

  return rule