HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/396/lib/surface/compute/vpn_tunnels/create.py
# -*- coding: utf-8 -*- #
# Copyright 2019 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Command for creating VPN tunnels."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import argparse
import re

from googlecloudsdk.api_lib.compute import base_classes
from googlecloudsdk.api_lib.compute.vpn_tunnels import vpn_tunnels_utils
from googlecloudsdk.calliope import arg_parsers
from googlecloudsdk.calliope import base
from googlecloudsdk.calliope import exceptions
from googlecloudsdk.command_lib.compute import flags as compute_flags
from googlecloudsdk.command_lib.compute import resource_manager_tags_utils
from googlecloudsdk.command_lib.compute.external_vpn_gateways import flags as external_vpn_gateway_flags
from googlecloudsdk.command_lib.compute.routers import flags as router_flags
from googlecloudsdk.command_lib.compute.target_vpn_gateways import flags as target_vpn_gateway_flags
from googlecloudsdk.command_lib.compute.vpn_gateways import flags as vpn_gateway_flags
from googlecloudsdk.command_lib.compute.vpn_tunnels import flags
import six


_PRINTABLE_CHARS_PATTERN = r'[ -~]+'

_ROUTER_ARG = router_flags.RouterArgumentForVpnTunnel(required=False)
_VPN_TUNNEL_ARG = flags.VpnTunnelArgument()


class DeprecatedArgumentException(exceptions.ToolException):

  def __init__(self, arg, msg):
    super(DeprecatedArgumentException, self).__init__(
        '{0} is deprecated. {1}'.format(arg, msg))


def ValidateSimpleSharedSecret(possible_secret):
  """ValidateSimpleSharedSecret checks its argument is a vpn shared secret.

  ValidateSimpleSharedSecret(v) returns v iff v matches [ -~]+.

  Args:
    possible_secret: str, The data to validate as a shared secret.

  Returns:
    The argument, if valid.

  Raises:
    ArgumentTypeError: The argument is not a valid vpn shared secret.
  """

  if not possible_secret:
    raise argparse.ArgumentTypeError(
        '--shared-secret requires a non-empty argument.')

  if re.match(_PRINTABLE_CHARS_PATTERN, possible_secret):
    return possible_secret

  raise argparse.ArgumentTypeError(
      'The argument to --shared-secret is not valid it contains '
      'non-printable charcters.')


@base.ReleaseTracks(base.ReleaseTrack.GA)
@base.UniverseCompatible
class CreateGA(base.CreateCommand):
  """Create a VPN tunnel.

    *{command}* is used to create a Classic VPN tunnel between a target VPN
  gateway in Google Cloud Platform and a peer address; or create Highly
  Available VPN tunnel between HA VPN gateway and another HA VPN gateway, or
  Highly Available VPN tunnel between HA VPN gateway and an external VPN
  gateway.
  """

  _TARGET_VPN_GATEWAY_ARG = (
      target_vpn_gateway_flags.TargetVpnGatewayArgumentForVpnTunnel(
          required=False))
  _VPN_GATEWAY_ARG = (
      vpn_gateway_flags.GetVpnGatewayArgumentForOtherResource(required=False))

  _EXTERNAL_VPN_GATEWAY_ARG = (
      external_vpn_gateway_flags.ExternalVpnGatewayArgumentForVpnTunnel(
          required=False))

  _PEER_GCP_GATEWAY_ARG = (
      vpn_gateway_flags.GetPeerVpnGatewayArgumentForOtherResource(
          required=False))

  _support_cipher_suite = True
  _support_tagging_at_creation = False

  @classmethod
  def _AddCommonFlags(cls, parser):
    _ROUTER_ARG.AddArgument(parser)

    parser.add_argument(
        '--description',
        help='An optional, textual description for the VPN tunnel.')

    parser.add_argument(
        '--ike-version',
        choices=[1, 2],
        type=int,
        help='Internet Key Exchange protocol version number. Default is 2.')

    parser.add_argument(
        '--shared-secret',
        type=ValidateSimpleSharedSecret,
        required=True,
        help="""\
        Shared secret consisting of printable characters.  Valid
        arguments match the regular expression """ + _PRINTABLE_CHARS_PATTERN)

    parser.add_argument(
        '--ike-networks',
        type=arg_parsers.ArgList(min_length=1),
        hidden=True,
        help='THIS ARGUMENT NEEDS HELP TEXT.')

  @classmethod
  def _AddCipherSuiteFlags(cls, parser):
    parser.add_argument('--phase1-encryption',
                        metavar='ALGORITHMS',
                        type=arg_parsers.ArgList(min_length=1),
                        help='Phase 1 encryption algorithms.')
    parser.add_argument('--phase1-integrity',
                        metavar='ALGORITHMS',
                        type=arg_parsers.ArgList(min_length=1),
                        help='Phase 1 integrity algorithms.')
    parser.add_argument('--phase1-prf',
                        metavar='PSEUDORANDOM FUNCTIONS',
                        type=arg_parsers.ArgList(min_length=1),
                        help='Phase 1 pseudorandom functions.')
    parser.add_argument('--phase1-dh',
                        metavar='GROUPS',
                        type=arg_parsers.ArgList(min_length=1),
                        help='Phase 1 Diffie-Hellman groups.')
    parser.add_argument('--phase2-encryption',
                        metavar='ALGORITHMS',
                        type=arg_parsers.ArgList(min_length=1),
                        help='Phase 2 encryption algorithms.')
    parser.add_argument('--phase2-integrity',
                        metavar='ALGORITHMS',
                        type=arg_parsers.ArgList(min_length=1),
                        help='Phase 2 integrity algorithms.')
    parser.add_argument('--phase2-pfs',
                        metavar='ALGORITHMS',
                        type=arg_parsers.ArgList(min_length=1),
                        help='Phase 2 perfect forward secerecy algorithms.')

  @classmethod
  def Args(cls, parser):
    """Adds arguments to the supplied parser."""
    # TODO(b/129011963): add e2e tests for HA VPN tunnels
    parser.display_info.AddFormat(flags.HA_VPN_LIST_FORMAT)

    _VPN_TUNNEL_ARG.AddArgument(parser, operation_type='create')
    vpn_gateway_group_parser = parser.add_mutually_exclusive_group(
        required=True)
    cls._TARGET_VPN_GATEWAY_ARG.AddArgument(vpn_gateway_group_parser)
    cls._VPN_GATEWAY_ARG.AddArgument(vpn_gateway_group_parser)

    peer_vpn_gateway_group_parser = parser.add_mutually_exclusive_group(
        required=True)
    cls._EXTERNAL_VPN_GATEWAY_ARG.AddArgument(peer_vpn_gateway_group_parser)
    cls._PEER_GCP_GATEWAY_ARG.AddArgument(peer_vpn_gateway_group_parser)
    peer_vpn_gateway_group_parser.add_argument(
        '--peer-address',
        required=False,
        help='Valid IPV4 address representing the remote tunnel endpoint, '
        'the peer address must be specified when creating Classic VPN '
        'tunnels from Classic Target VPN gateway')

    cls._AddCommonFlags(parser)

    parser.add_argument(
        '--local-traffic-selector',
        type=arg_parsers.ArgList(min_length=1),
        metavar='CIDR',
        help=("""\
        Traffic selector is an agreement between IKE peers to permit traffic
        through a tunnel if the traffic matches a specified pair of local and
        remote addresses.

        --local-traffic-selector allows to configure the local addresses that are
        permitted. The value should be a comma separated list of CIDR formatted
        strings. Example: 192.168.0.0/16,10.0.0.0/24.

        Local traffic selector must be specified only for VPN tunnels that
        do not use dynamic routing with a Cloud Router. Omit this flag when
        creating a tunnel using dynamic routing, including a tunnel for a
        Highly Available VPN gateway."""))

    parser.add_argument(
        '--remote-traffic-selector',
        type=arg_parsers.ArgList(min_length=1),
        metavar='CIDR',
        help=("""\
        Traffic selector is an agreement between IKE peers to permit traffic
        through a tunnel if the traffic matches a specified pair of local and
        remote addresses.

        --remote-traffic-selector allows to configure the remote addresses that
        are permitted. The value should be a comma separated list of CIDR
        formatted strings. Example: 192.168.0.0/16,10.0.0.0/24.

        Remote traffic selector must be specified for VPN tunnels that do
        not use dynamic routing with a Cloud Router. Omit this flag when
        creating a tunnel using dynamic routing, including a tunnel for a
        Highly Available VPN gateway."""))

    parser.add_argument(
        '--interface',
        choices=[0, 1],
        type=int,
        required=False,
        help="""\
        Numeric interface ID of the VPN gateway with which this VPN tunnel
        is associated. This flag is required if the tunnel is being attached
        to a Highly Available VPN gateway. This option is only available
        for use with Highly Available VPN gateway and must be omitted if the
        tunnel is going to be connected to a Classic VPN gateway.""")

    parser.add_argument(
        '--peer-external-gateway-interface',
        choices=[0, 1, 2, 3],
        type=int,
        required=False,
        help="""\
        Interface ID of the external VPN gateway to which this VPN tunnel
        is connected to.
        This flag is required if the tunnel is being created from
        a Highly Available VPN gateway to an External Vpn Gateway.""")

    if(cls._support_cipher_suite):
      cls._AddCipherSuiteFlags(parser)

    if cls._support_tagging_at_creation:
      parser.add_argument(
          '--resource-manager-tags',
          type=arg_parsers.ArgDict(),
          metavar='KEY=VALUE',
          help="""\
            A comma-separated list of Resource Manager tags to apply to the VPN tunnel.
        """,
      )

    parser.display_info.AddCacheUpdater(flags.VpnTunnelsCompleter)

  def _ValidateHighAvailabilityVpnArgs(self, args):
    if args.IsSpecified('vpn_gateway'):
      if not args.IsSpecified('interface'):
        raise exceptions.InvalidArgumentException(
            '--interface',
            'When creating Highly Available VPN tunnels, the VPN gateway '
            'interface must be specified using the --interface flag.')
      if not args.IsSpecified('router'):
        raise exceptions.InvalidArgumentException(
            '--router',
            'When creating Highly Available VPN tunnels, a Cloud Router '
            'must be specified using the --router flag.')
      if not args.IsSpecified('peer_gcp_gateway') and not args.IsSpecified(
          'peer_external_gateway'):
        raise exceptions.InvalidArgumentException(
            '--peer-gcp-gateway',
            'When creating Highly Available VPN tunnels, either '
            '--peer-gcp-gateway or --peer-external-gateway must be specified.')
      if args.IsSpecified('peer_external_gateway') and not args.IsSpecified(
          'peer_external_gateway_interface'):
        raise exceptions.InvalidArgumentException(
            '--peer-external-gateway-interface',
            'The flag --peer-external-gateway-interface must be specified along'
            ' with --peer-external-gateway.')
      if args.IsSpecified('local_traffic_selector'):
        raise exceptions.InvalidArgumentException(
            '--local-traffic-selector',
            'Cannot specify local traffic selector with Highly Available '
            'VPN tunnels.')
      if args.IsSpecified('remote_traffic_selector'):
        raise exceptions.InvalidArgumentException(
            '--remote-traffic-selector',
            'Cannot specify remote traffic selector with Highly Available '
            'VPN tunnels.')
      if args.IsSpecified('peer_address'):
        raise exceptions.InvalidArgumentException(
            '--peer-address',
            'Cannot specify the flag peer address with Highly Available '
            'VPN tunnels.')

  def _ValidateClassicVpnArgs(self, args):
    if args.IsSpecified('target_vpn_gateway'):
      if not args.IsSpecified('peer_address'):
        raise exceptions.InvalidArgumentException(
            '--peer-address',
            'When creating Classic VPN tunnels, the peer address '
            'must be specified.')
      if args.IsSpecified('router'):
        raise exceptions.InvalidArgumentException(
            '--router',
            'Cannot specify router with Classic VPN tunnels.',
        )

  def _GetPeerGcpGateway(self, api_resource_registry, args):
    if args.IsSpecified('peer_gcp_gateway'):
      peer_gcp_gateway = self._PEER_GCP_GATEWAY_ARG.ResolveAsResource(
          args, api_resource_registry).SelfLink()
      return peer_gcp_gateway
    return None

  def _GetPeerExternalGateway(self, api_resource_registry, args):
    if args.IsSpecified('peer_external_gateway'):
      peer_external_gateway = self._EXTERNAL_VPN_GATEWAY_ARG.ResolveAsResource(
          args, api_resource_registry).SelfLink()
      return peer_external_gateway
    return None

  def _Run(self, args, is_vpn_gateway_supported):
    holder = base_classes.ComputeApiHolder(self.ReleaseTrack())
    client = holder.client
    helper = vpn_tunnels_utils.VpnTunnelHelper(holder)

    # TODO(b/38253176) Add test coverage
    if args.ike_networks is not None:
      raise DeprecatedArgumentException(
          '--ike-networks',
          'It has been renamed to --local-traffic-selector.')

    vpn_tunnel_ref = _VPN_TUNNEL_ARG.ResolveAsResource(
        args,
        holder.resources,
        scope_lister=compute_flags.GetDefaultScopeLister(client))

    router_link = None
    if args.IsSpecified('router'):
      args.router_region = vpn_tunnel_ref.region
      router_ref = _ROUTER_ARG.ResolveAsResource(args, holder.resources)
      router_link = router_ref.SelfLink()

    target_vpn_gateway = None
    vpn_gateway = None
    vpn_gateway_interface = None
    peer_external_gateway = None
    peer_external_gateway_interface = None
    peer_gcp_gateway = None
    resource_manager_tags = None

    if is_vpn_gateway_supported and args.IsSpecified('vpn_gateway'):
      self._ValidateHighAvailabilityVpnArgs(args)
      args.vpn_gateway_region = vpn_tunnel_ref.region
      vpn_gateway = self._VPN_GATEWAY_ARG.ResolveAsResource(
          args, holder.resources
      ).SelfLink()
      vpn_gateway_interface = args.interface
      peer_external_gateway = self._GetPeerExternalGateway(
          holder.resources, args
      )
      peer_external_gateway_interface = args.peer_external_gateway_interface
      peer_gcp_gateway = self._GetPeerGcpGateway(holder.resources, args)
    else:
      self._ValidateClassicVpnArgs(args)
      args.target_vpn_gateway_region = vpn_tunnel_ref.region
      target_vpn_gateway = self._TARGET_VPN_GATEWAY_ARG.ResolveAsResource(
          args, holder.resources
      ).SelfLink()

    if self._support_tagging_at_creation:
      if args.resource_manager_tags is not None:
        resource_manager_tags = self._CreateVpnTunnelParams(
            client.messages, args.resource_manager_tags
        )

    if target_vpn_gateway:
      if self._support_cipher_suite:
        phase1_algo = helper.GetVpnTunnelPhase1Algorithms(
            phase1_encryption=args.phase1_encryption,
            phase1_integrity=args.phase1_integrity,
            phase1_dh=args.phase1_dh,
            phase1_prf=args.phase1_prf,
        )
        phase2_algo = helper.GetVpnTunnelPhase2Algorithms(
            phase2_encryption=args.phase2_encryption,
            phase2_integrity=args.phase2_integrity,
            phase2_pfs=args.phase2_pfs,
        )
        cipher_suite = client.messages.VpnTunnelCipherSuite()
        if phase1_algo:
          cipher_suite.phase1 = phase1_algo
        if phase2_algo:
          cipher_suite.phase2 = phase2_algo
        if not cipher_suite.phase1 and not cipher_suite.phase2:
          cipher_suite = None
        vpn_tunnel_to_insert = (
            helper.GetClassicVpnTunnelForInsertWithCipherSuite(
                name=vpn_tunnel_ref.Name(),
                description=args.description,
                ike_version=args.ike_version,
                peer_ip=args.peer_address,
                shared_secret=args.shared_secret,
                target_vpn_gateway=target_vpn_gateway,
                local_traffic_selector=args.local_traffic_selector,
                remote_traffic_selector=args.remote_traffic_selector,
                cipher_suite=cipher_suite,
                params=resource_manager_tags,
                support_tagging_at_creation=self._support_tagging_at_creation,
            )
        )
      else:
        vpn_tunnel_to_insert = helper.GetClassicVpnTunnelForInsert(
            name=vpn_tunnel_ref.Name(),
            description=args.description,
            ike_version=args.ike_version,
            peer_ip=args.peer_address,
            shared_secret=args.shared_secret,
            target_vpn_gateway=target_vpn_gateway,
            local_traffic_selector=args.local_traffic_selector,
            remote_traffic_selector=args.remote_traffic_selector,
            params=resource_manager_tags,
            support_tagging_at_creation=self._support_tagging_at_creation,
        )
    else:
      if(self._support_cipher_suite):
        phase1_algo = helper.GetVpnTunnelPhase1Algorithms(
            phase1_encryption=args.phase1_encryption,
            phase1_integrity=args.phase1_integrity,
            phase1_dh=args.phase1_dh,
            phase1_prf=args.phase1_prf,
        )
        phase2_algo = helper.GetVpnTunnelPhase2Algorithms(
            phase2_encryption=args.phase2_encryption,
            phase2_integrity=args.phase2_integrity,
            phase2_pfs=args.phase2_pfs,
        )
        cipher_suite = client.messages.VpnTunnelCipherSuite()
        if phase1_algo:
          cipher_suite.phase1 = phase1_algo
        if phase2_algo:
          cipher_suite.phase2 = phase2_algo
        if not cipher_suite.phase1 and not cipher_suite.phase2:
          cipher_suite = None
        vpn_tunnel_to_insert = (
            helper.GetHighAvailabilityVpnTunnelForInsertWithCipherSuite(
                name=vpn_tunnel_ref.Name(),
                description=args.description,
                ike_version=args.ike_version,
                peer_ip=args.peer_address,
                shared_secret=args.shared_secret,
                vpn_gateway=vpn_gateway,
                vpn_gateway_interface=vpn_gateway_interface,
                router=router_link,
                peer_external_gateway=peer_external_gateway,
                peer_external_gateway_interface=peer_external_gateway_interface,
                peer_gcp_gateway=peer_gcp_gateway,
                cipher_suite=cipher_suite,
                params=resource_manager_tags,
                support_tagging_at_creation=self._support_tagging_at_creation,
            )
        )
      else:
        vpn_tunnel_to_insert = helper.GetHighAvailabilityVpnTunnelForInsert(
            name=vpn_tunnel_ref.Name(),
            description=args.description,
            ike_version=args.ike_version,
            # TODO(b/127839209): remove peer_ip for HA
            # tunnels once peer gateway
            # feature is enabled in Arcus.
            peer_ip=args.peer_address,
            shared_secret=args.shared_secret,
            vpn_gateway=vpn_gateway,
            vpn_gateway_interface=vpn_gateway_interface,
            router=router_link,
            peer_external_gateway=peer_external_gateway,
            peer_external_gateway_interface=peer_external_gateway_interface,
            peer_gcp_gateway=peer_gcp_gateway,
            params=resource_manager_tags,
            support_tagging_at_creation=self._support_tagging_at_creation,
        )

    operation_ref = helper.Create(vpn_tunnel_ref, vpn_tunnel_to_insert)
    return helper.WaitForOperation(vpn_tunnel_ref, operation_ref,
                                   'Creating VPN tunnel')

  def Run(self, args):
    """Issues API requests to construct VPN Tunnels."""
    return self._Run(args, is_vpn_gateway_supported=True)

  def _CreateVpnTunnelParams(self, messages, resource_manager_tags):
    resource_manager_tags_map = (
        resource_manager_tags_utils.GetResourceManagerTags(
            resource_manager_tags
        )
    )
    params = messages.VpnTunnelParams
    additional_properties = [
        params.ResourceManagerTagsValue.AdditionalProperty(key=key, value=value)
        for key, value in sorted(six.iteritems(resource_manager_tags_map))
    ]
    return params(
        resourceManagerTags=params.ResourceManagerTagsValue(
            additionalProperties=additional_properties
        )
    )


@base.ReleaseTracks(base.ReleaseTrack.BETA)
class CreateBeta(CreateGA):
  """Create a VPN tunnel.

    *{command}* is used to create a Classic VPN tunnel between a target VPN
  gateway in Google Cloud Platform and a peer address; or create Highly
  Available VPN tunnel between HA VPN gateway and another HA VPN gateway, or
  Highly Available VPN tunnel between HA VPN gateway and an external VPN
  gateway.
  """

  _support_cipher_suite = True
  _support_tagging_at_creation = False


@base.ReleaseTracks(base.ReleaseTrack.ALPHA)
class CreateAlpha(CreateBeta):
  """Create a VPN tunnel.

    *{command}* is used to create a Classic VPN tunnel between a target VPN
  gateway in Google Cloud Platform and a peer address; or create Highly
  Available VPN tunnel between HA VPN gateway and another HA VPN gateway, or
  Highly Available VPN tunnel between HA VPN gateway and an external VPN
  gateway.
  """
  _support_cipher_suite = True
  _support_tagging_at_creation = True