HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/command_lib/compute/vpc_troubleshooter.py
# -*- coding: utf-8 -*- #
# Copyright 2021 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Troubleshoot VPC setting for ssh connection."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

from googlecloudsdk.api_lib.util import apis
from googlecloudsdk.command_lib.compute import ssh_troubleshooter
from googlecloudsdk.core import log

_API_COMPUTE_CLIENT_NAME = 'compute'
_API_CLIENT_VERSION_V1 = 'v1'

IAP_MESSAGE = (
    'There is an issue with your IAP configuration\n'
    '\n'
    'Check the following items:\n'
    ' - The IAP firewall rule is valid.\n'
    ' - IAP tunneling is enabled.\n'
    ' - You are connecting using an IAP token.\n'
    ' - You have the IAM role of Project Owner, IAP-Secured Tunnel User, or '
    'iap.tunnelInstances.accessViaIAP (preferred)\n'
    ' - Your organization hasn\'t blocked access to external IP addresses. '
    'IAP changes the source traffic to 35.235.240.0/20 and the tunnel to '
    'https://tunnel.cloudproxy.app.\n'
    ' - If your organization blocks access to public IP addresses, try '
    'connecting through a bastion server.\n'
    '\n'
    'Help for IAP port forwarding: '
    'https://cloud.google.com/iap/docs/using-tcp-forwarding\n'
    'https://cloud.google.com/iap/docs/faq#what_domain_does_for_tcp_use'
    '\n'
    'Help for bastion server: '
    'https://cloud.google.com/compute/docs/instances/connecting-advanced#bastion_host\n')   # pylint: disable=line-too-long


DEFAULT_SSH_PORT_MESSAGE = (
    'No ingress firewall rule allowing SSH found.\n'
    '\n'
    'If the project uses the default ingress firewall rule for SSH, connections'
    ' to all VMs are allowed on TCP port 22.\n'
    'If the VPC network that the VM\'s network interface is in has a custom '
    'firewall rule, make sure that custom rule allows ingress traffic on the '
    'VM\'s SSH TCP port (usually, this is TCP port 22).\n'
    'Help for default firewall rule: '
    'https://cloud.google.com/vpc/docs/vpc#default-network\n'
    'Help for custom firewall rule: '
    'https://cloud.google.com/network-connectivity/docs/vpn/how-to/configuring-firewall-rules\n'  # pylint: disable=line-too-long
    '\n'
    "If you need to investigate further, enable the VM's serial console. "
    "Then connect through the VM serial port, find the SSH server's listen "
    "port, and make sure the port number in the VM's firewall rules matches"
    " the SSH server's listen port.\n"
    'Help for serial console: https://cloud.google.com/compute/docs/instances/interacting-with-serial-console\n'  # pylint: disable=line-too-long
    'Help for serial port: https://cloud.google.com/compute/docs/instances/interacting-with-serial-console\n'  # pylint: disable=line-too-long
    'Help for firewall rules: https://cloud.google.com/vpc/docs/using-firewalls\n')  # pylint: disable=line-too-long


class VPCTroubleshooter(ssh_troubleshooter.SshTroubleshooter):
  """Check VPC setting."""

  project = None
  zone = None
  instance = None
  iap_tunnel_args = None

  def __init__(self, project, zone, instance, iap_tunnel_args):
    self.project = project
    self.zone = zone
    self.instance = instance
    self.iap_tunnel_args = iap_tunnel_args
    self.compute_client = apis.GetClientInstance(_API_COMPUTE_CLIENT_NAME,
                                                 _API_CLIENT_VERSION_V1)
    self.compute_message = apis.GetMessagesModule(_API_COMPUTE_CLIENT_NAME,
                                                  _API_CLIENT_VERSION_V1)
    self.issues = {}

  def check_prerequisite(self):
    return

  def cleanup_resources(self):
    return

  def troubleshoot(self):
    log.status.Print('---- Checking VPC settings ----')
    self._CheckDefaultSSHPort()
    if self.iap_tunnel_args:
      self._CheckIAPTunneling()
    log.status.Print('VPC settings: {0} issue(s) found.\n'.format(
        len(self.issues)))
    for message in self.issues.values():
      log.status.Print(message)
    return

  def _CheckIAPTunneling(self):
    firewall_list = self._ListInstanceEffectiveFirewall()
    for firewall in firewall_list:
      if self._HasValidateIAPTunnelingRule(firewall):
        return
    self.issues['iap'] = IAP_MESSAGE

  def _CheckDefaultSSHPort(self):
    firewall_list = self._ListInstanceEffectiveFirewall()
    for firewall in firewall_list:
      if self._HasSSHProtocalAndPort(firewall):
        return
    self.issues['default_ssh_port'] = DEFAULT_SSH_PORT_MESSAGE

  def _ListInstanceEffectiveFirewall(self):
    req = self.compute_message.ComputeInstancesGetEffectiveFirewallsRequest(
        instance=self.instance.name,
        networkInterface='nic0',
        project=self.project.name,
        zone=self.zone)
    return self.compute_client.instances.GetEffectiveFirewalls(req).firewalls

  def _HasValidateIAPTunnelingRule(self, firewall):
    if firewall.direction != self.compute_message.Firewall.DirectionValueValuesEnum.INGRESS:
      return False
    if all(range != '35.235.240.0/20' for range in firewall.sourceRanges):
      return False
    if not self._HasSSHProtocalAndPort(firewall):
      return False
    return True

  def _HasSSHProtocalAndPort(self, firewall):
    for allow_rule in firewall.allowed:
      if allow_rule.IPProtocol == 'tcp' and any(
          port == '22' for port in allow_rule.ports):
        return True
    return False