HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/394/lib/googlecloudsdk/api_lib/compute/iap_tunnel_websocket_helper.py
# -*- coding: utf-8 -*- #
# Copyright 2018 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""WebSocket helper class for tunneling with Cloud IAP."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import logging
import ssl
import sys
import threading
import traceback

from googlecloudsdk.api_lib.compute import iap_tunnel_lightweight_websocket as iap_websocket
from googlecloudsdk.api_lib.compute import iap_tunnel_websocket_utils as utils
from googlecloudsdk.core import context_aware
from googlecloudsdk.core import exceptions
from googlecloudsdk.core import log
from googlecloudsdk.core.util import encoding
import six
import websocket

TUNNEL_CLOUDPROXY_ORIGIN = 'bot:iap-tunneler'


class WebSocketConnectionClosed(exceptions.Error):
  pass


class WebSocketInvalidOpcodeError(exceptions.Error):
  pass


class WebSocketSendError(exceptions.Error):
  pass


class IapTunnelWebSocketHelper(object):
  """Helper class for common operations on websocket and related metadata."""

  def __init__(self, url, headers, ignore_certs, proxy_info, on_data, on_close,
               should_use_new_websocket, conn_id=0):
    self._on_data = on_data
    self._on_close = on_close
    self._proxy_info = proxy_info
    self._conn_id = conn_id
    self._receiving_thread = None

    ca_certs = utils.CheckCACertsFile(ignore_certs)
    self._sslopt = {'cert_reqs': ssl.CERT_REQUIRED,
                    'ca_certs': ca_certs}
    if ignore_certs:
      self._sslopt['cert_reqs'] = ssl.CERT_NONE
      self._sslopt['check_hostname'] = False

    caa_config = context_aware.Config()

    if caa_config:
      if (caa_config.config_type is
          context_aware.ConfigType.ENTERPRISE_CERTIFICATE):
        log.debug('Using enterprise certificate for mTLS')
      else:  # context_aware.ConfigType.ON_DISK_CERTIFICATE
        cert_path = caa_config.encrypted_client_cert_path
        log.debug('Using client certificate %s', cert_path)
        self._sslopt['certfile'] = cert_path
        self._sslopt['password'] = caa_config.encrypted_client_cert_password

    # Disable most of random logging in websocket library itself except in DEBUG
    if log.GetVerbosity() != logging.DEBUG:
      logging.getLogger('websocket').setLevel(logging.CRITICAL)

    self._is_closed = False
    self._error_msg = ''
    self._should_use_new_websocket = should_use_new_websocket
    if self._should_use_new_websocket:
      self._websocket = iap_websocket.IapLightWeightWebsocket(
          url,
          header=headers,
          on_close=self._OnClose,
          on_data=self._OnData,
          on_error=self._OnError,
          subprotocols=[utils.SUBPROTOCOL_NAME])
    else:
      self._websocket = websocket.WebSocketApp(
          url,
          header=headers,
          on_close=self._OnClose,
          on_data=self._OnData,
          on_error=self._OnError,
          subprotocols=[utils.SUBPROTOCOL_NAME])

  def __del__(self):
    self.Close()

  def Close(self, msg=''):
    """Close the WebSocket."""
    if not self._is_closed:
      try:
        self._websocket.close()
      except:  # pylint: disable=bare-except
        pass
      if not self._error_msg:
        self._error_msg = msg
      self._is_closed = True

  def IsClosed(self):
    """Check to see if WebSocket has closed."""
    return (self._is_closed or
            (self._receiving_thread and not self._receiving_thread.is_alive()))

  def ErrorMsg(self):
    return self._error_msg

  def Send(self, send_data):
    """Send data on WebSocket connection."""
    try:
      # Needed since the gcloud logging methods will log to file regardless
      # of the verbosity level set by the user.
      if log.GetVerbosity() == logging.DEBUG:
        log.debug('[%d] SEND data_len [%d] send_data[:20] %r',
                  self._conn_id, len(send_data), send_data[:20],)
      self._websocket.send(send_data, opcode=websocket.ABNF.OPCODE_BINARY)
    except EnvironmentError:
      self.Close()
      raise
    except websocket.WebSocketConnectionClosedException:
      self.Close()
      raise WebSocketConnectionClosed()
    except Exception as e:  # pylint: disable=broad-except
      log.debug('[%d] Error during WebSocket send of Data message.',
                self._conn_id, exc_info=True)
      # Convert websocket library errors and any others into one based on
      # exceptions.Error
      tb = sys.exc_info()[2]
      self.Close()
      exceptions.reraise(
          WebSocketSendError(traceback.format_exception_only(type(e), e),
                             tb=tb))

  def SendClose(self):
    """Send WebSocket Close message if possible."""
    # Save self._websocket.sock, because some other thread could set it to None
    # while this function is executing.
    if self._should_use_new_websocket:
      sock = self._websocket
    else:
      sock = self._websocket.sock

    if sock:
      log.debug('[%d] CLOSE', self._conn_id)
      try:
        sock.send_close()
      except (EnvironmentError,
              websocket.WebSocketConnectionClosedException) as e:
        log.info('[%d] Unable to send WebSocket Close message [%s].',
                 self._conn_id, six.text_type(e))
        self.Close()
      except:  # pylint: disable=bare-except
        log.info('[%d] Error during WebSocket send of Close message.',
                 self._conn_id, exc_info=True)
        self.Close()

  def StartReceivingThread(self):
    if not self._is_closed:
      self._receiving_thread = threading.Thread(
          target=self._ReceiveFromWebSocket)
      self._receiving_thread.daemon = True
      self._receiving_thread.start()

  def _OnClose(self, close_code, close_reason):
    """Callback for WebSocket Close messages."""

    if close_code is None and close_reason is None:
      # This indicates a local close event and not an actual Close message, call
      # Close() but skip the rest of the processing.
      self.Close()
      return

    close_msg = '%r: %r' % (close_code, close_reason)
    log.info('[%d] Received WebSocket Close message [%s].',
             self._conn_id, close_msg)
    self.Close(msg=close_msg)

    if close_code == 4004:
      # This is a resumable error indicating that reauthentication is required.
      # Call self.Close() so that the class that owns us knows we're no longer
      # active, and can create a brand new IapTunnelWebSocketHelper for a
      # reconnect attempt. But avoid calling self._on_close() because that
      # indicates that the entire session is dead and a reconnect shouldn't be
      # attempted.
      return

    try:
      self._on_close()
    except (EnvironmentError, exceptions.Error):
      log.info('[%d] Error while processing Close message',
               self._conn_id, exc_info=True)
      raise

  def _OnData(self, binary_data, opcode, unused_finished=0):
    """Callback for WebSocket Data messages."""
    # Needed since the gcloud logging methods will log to file regardless
    # of the verbosity level set by the user.
    if log.GetVerbosity() == logging.DEBUG:
      log.debug('[%d] RECV opcode [%r] data_len [%d] binary_data[:20] [%r]',
                self._conn_id, opcode, len(binary_data), binary_data[:20])
    try:
      # Even though we will only be processing BINARY messages, a bug in the
      # underlying websocket library will report the last opcode in a
      # multi-frame message instead of the first opcode - so CONT instead of
      # BINARY.
      if opcode not in (websocket.ABNF.OPCODE_CONT,
                        websocket.ABNF.OPCODE_BINARY):
        raise WebSocketInvalidOpcodeError('Unexpected WebSocket opcode [%r].' %
                                          opcode)
      self._on_data(binary_data)
    except EnvironmentError as e:
      log.info('[%d] Error [%s] while sending to client.', self._conn_id,
               six.text_type(e))
      self.Close()
      raise
    except:  # pylint: disable=bare-except
      log.info('[%d] Error while processing Data message.', self._conn_id,
               exc_info=True)
      self.Close()
      raise

  def _OnError(self, exception_obj):
    # Do not call Close() from here as it may generate callbacks in some error
    # conditions that can create a feedback loop with this function.
    if not self._is_closed:
      log.debug('[%d] Error during WebSocket processing.',
                self._conn_id, exc_info=True)
      log.info('[%d] Error during WebSocket processing:\n' +
               ''.join(traceback.format_exception_only(type(exception_obj),
                                                       exception_obj)),
               self._conn_id)
      self._error_msg = six.text_type(exception_obj)

  def _ReceiveFromWebSocket(self):
    """Receive data from WebSocket connection."""
    try:
      if self._proxy_info:
        http_proxy_auth = None
        if self._proxy_info.proxy_user or self._proxy_info.proxy_pass:
          # The websocket library ultimately expects the proxy username and
          # password to be strings, unlike httplib2's ProxyInfo which encodes
          # these as bytes. So we need to ensure they're decoded here before
          # calling run_forever.
          http_proxy_auth = (encoding.Decode(self._proxy_info.proxy_user),
                             encoding.Decode(self._proxy_info.proxy_pass))
        self._websocket.run_forever(
            origin=TUNNEL_CLOUDPROXY_ORIGIN, sslopt=self._sslopt,
            http_proxy_host=self._proxy_info.proxy_host,
            http_proxy_port=self._proxy_info.proxy_port,
            http_proxy_auth=http_proxy_auth)
      else:
        self._websocket.run_forever(origin=TUNNEL_CLOUDPROXY_ORIGIN,
                                    sslopt=self._sslopt)
    except:  # pylint: disable=bare-except
      try:
        log.info('[%d] Error while receiving from WebSocket.',
                 self._conn_id, exc_info=True)
      except:
        # This is a daemon thread, so it could be running while the interpreter
        # is exiting, so logging could fail. At that point the only thing to do
        # is ignore the exception. Ideally we would make this a non-daemon
        # thread.
        pass
    try:
      self.Close()
    except:  # pylint: disable=bare-except
      try:
        log.info('[%d] Error while closing in receiving thread.',
                 self._conn_id, exc_info=True)
      except:
        pass