HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/command_lib/firestore/connection_util.py
# -*- coding: utf-8 -*- #
# Copyright 2025 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utility for validating Firestore Mongo connection strings."""

import dataclasses
import re
import socket
import ssl
import time
from typing import List

from googlecloudsdk.core import log
from googlecloudsdk.core.console import console_attr


@dataclasses.dataclass(frozen=True)
class ValidationResults:
  """Container class for results of validating a connection string."""

  headers: List[str]
  info: List[str]
  warnings: List[str]
  errors: List[str]
  footers: List[str]

  def __str__(self):
    return '\n'.join(
        self.headers + self.info + self.warnings + self.errors + self.footers
    )


def ValidateConnectionString(
    connection_string,
    db_uid=None,
    db_location_id=None,
    database_id=None,
):
  """Validate the specified connection_string for the specified database."""

  headers = [
      '-' * 80,
      f'Evaluating connection string: {connection_string}',
      '-' * 80,
  ]
  info = []
  warnings = []
  errors = []
  footers = ['-' * 80]
  user = None
  password = None

  # Helper method for checking k=v params
  def CheckParam(param_name, expected_value, error_description=''):
    if param_name not in extra_params:
      errors.append(
          f'{error_description}The connection string must specify'
          f' {param_name}={expected_value}.'
      )
    else:
      actual_value = extra_params[param_name]
      del extra_params[param_name]
      if actual_value != expected_value:
        errors.append(
            f'{error_description}The parameter {param_name} is set to '
            f'{actual_value}. The connection string must specify '
            f'{param_name}={expected_value}.'
        )
      else:
        info.append(f'{param_name}={expected_value}.')

  # Scan the connection string left-to-right and emit recommendations.
  while True:
    # Check that the connection string starts with the appropriate prefix
    if not connection_string.startswith('mongodb://'):
      errors.append('The connection string must start with mongodb://')
      break
    # Strip off mongodb:// and continue evaluation
    connection_string = connection_string[len('mongodb://') :]

    # Check for the presense of a user/password (optional)
    match = re.match(r'^([^:]*):([^@]*)@', connection_string)
    if match:
      user = match.group(1)
      password = '*' * len(match.group(2))
      info.append(
          f'The connection string specifies user: {user} '
          f'and password: {password}'
      )
      # Strip off the user+password and continue evaluation
      connection_string = connection_string[len(user) + len(password) + 2 :]

    # Check that the database address begins with a valid UUID
    match = re.match(
        r'^([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})\.',
        connection_string,
    )
    if not match:
      errors.append((
          'The database address must start with a valid UUID. '
          f'For database {database_id}, use the value {db_uid}'
      ))
      errors.append(
          'NOTE: for password based authentication, the connection string '
          'can also start with mongodb://username:password@UUID.'
      )
      break
    if match.group(1) != db_uid:
      errors.append(
          f'the UUID {match.group(1)} in the connection string does not '
          f'match the UUID {db_uid} for the current database {database_id}.'
      )
    else:
      info.append(f'The UUID {db_uid} is correct.')
    # Strip off the 36 characters of the UUID + . and continue evaluation
    connection_string = connection_string[37:]

    # Check that the UUID is followed by a valid database location
    match = re.match(
        r'^([^\.]+)\.',
        connection_string,
    )
    if not match:
      errors.append(
          'The database address must have the form: '
          'UUID.location.firestore.goog:443'
      )
      break
    if match.group(1) != db_location_id:
      errors.append(
          f'The location {match.group(1)} in the connection string does '
          f'not match the location {db_location_id} for the '
          f'current database {database_id}.'
      )
    else:
      info.append(f'The location {db_location_id} is correct.')
    # Strip off the location + . and continue evaluation
    connection_string = connection_string[len(match.group(1)) + 1 :]

    # Check that the location is followed by the valid domain and port number
    if not connection_string.startswith('firestore.goog:443/'):
      errors.append(
          'The database address must end with firestore.goog:443 as '
          'the domain name and port.'
      )
      break
    # Strip off the rest of the address and continue evaluation
    connection_string = connection_string[len('firestore.goog:443/') :]

    # Check that the string contains a valid database name.
    match = re.match(r'^([^\?]*)\?', connection_string)
    if not match:
      errors.append(
          'The connection string must specify the database id. '
          f'For the current database {database_id} it should have the form '
          f'UUID.location.firestore.goog:443/{database_id}?'
      )
      break
    if match.group(1) != database_id:
      if match.group(1):
        errors.append(
            f'The database name {match.group(1)} in the connection'
            f' string does not match the current database {database_id}.'
        )
      else:
        errors.append(
            'The database name in the connection string is empty. '
            'It is recommended to explicitly specify the database name '
            f'{database_id}, e.g. firestore.goog:443/{database_id}?'
        )
    else:
      info.append(f'The database name {database_id} is correct.')
    # Stip off the rest of database id + '?' and continue evaluation
    connection_string = connection_string[len(match.group(1)) + 1 :]

    # Validate additional parameters, which should come in as k=v pairs.
    extra_params = {}
    entries = connection_string.split('&')
    for entry in entries:
      if not entry:
        continue
      parts = entry.split('=')
      if len(parts) != 2:
        errors.append(
            f'The parameter {entry} appears malformed. Expected'
            ' something in the form key=value.'
        )
      else:
        extra_params[parts[0]] = parts[1]

    # Check for always-required params
    CheckParam('loadBalanced', 'true')
    CheckParam('tls', 'true')
    CheckParam('retryWrites', 'false')

    # Check for params that require extra validation
    if 'authMechanism' in extra_params:
      auth_mechanism = extra_params['authMechanism']
      del extra_params['authMechanism']
      if auth_mechanism == 'PLAIN':
        CheckParam(
            'authSource',
            '$external',
            error_description='Using PLAIN authentication. ',
        )
        if not user:
          errors.append(
              'The username and an access token should be specified in '
              'the connection string when PLAIN authentication is enabled.'
          )
        else:
          info.append(
              'username and access token specified for PLAIN authentication.'
          )
      elif auth_mechanism == 'SCRAM-SHA-256':
        if not user:
          errors.append(
              'The username and password should be specified in the '
              'connection string when SCRAM-SHA-256 is enabled.'
          )
        else:
          info.append('username and password specified for SCRAM-SHA-256.')
      elif auth_mechanism == 'MONGODB-OIDC':
        if user:
          errors.append(
              'The username should not be specified when using the '
              'MONGODB-OIDC authentication mechanism.'
          )
        CheckParam(
            'authMechanismProperties',
            'ENVIRONMENT:gcp,TOKEN_RESOURCE:FIRESTORE',
            error_description='Using MONGODB-OIDC authentication. ',
        )
      else:
        errors.append(f'Unsupported authentication mechanism {auth_mechanism}.')
    else:
      if user:
        errors.append(
            f'Since the connection string specified user: {user} and'
            f' password: {password}, the connection must also be configured'
            ' with an appropriate authentication mechanism, e.g.'
            ' authMechanism=SCRAM-SHA-256'
        )
      else:
        errors.append(
            'No authMechanism specified. The connection string must '
            'specify one of the supported authentication mechanisms.'
        )

    # Check for any unconsumed parameters
    for k, v in extra_params.items():
      # Emit these was warnings. We don't know how they'll affect the client.
      warnings.append(f'Unknown parameter {k}={v}.')

    break

  if not errors:
    footers.append('Did not detect any errors in this connection string.')
  else:
    footers.append(
        "TIP: You can use 'gcloud firestore databases connection-string "
        f"--database={database_id}' to construct valid connection strings."
    )
  return ValidationResults(
      headers=headers,
      info=info,
      warnings=warnings,
      errors=errors,
      footers=footers,
  )


def PrettyPrintValidationResults(validation_results: ValidationResults):
  """Renders the connection string validation results to the console."""

  con = console_attr.GetConsoleAttr()
  for header in validation_results.headers:
    log.status.Print(header)
  for info in validation_results.info:
    log.status.Print(f"{con.Colorize('INFO:', 'green')} {info}")
  for warning in validation_results.warnings:
    log.status.Print(f"{con.Colorize('WARNING:', 'yellow')} {warning}")
  for error in validation_results.errors:
    log.status.Print(f"{con.Colorize('ERROR:', 'red')} {error}")
  for footer in validation_results.footers:
    log.status.Print(footer)

# Byte encoding of the Bson "hello" command document:
# {"hello": 1, "helloOk": True, "loadBalanced": True}
_HELLO_HEX = (
    '340000000100000000000000DD07000000000000001F0000001068656C6C6'
    'F0001000000086C6F616442616C616E636564000100'
)
# Byte encoding of the Bson "ping" command document: {"ping": 1}
_PING_HEX = (
    '240000000000000000000000dd07000000000000000f0000001070696e67000100000000'
)
_MAX_CONNECTION_WAIT_TIME = 20.0
_MAX_PING_WAIT_TIME = 5.0


def Ping(ssock):
  """Sends a Mongo ping message via specified socket."""
  ping_complete = False
  ping_start = time.perf_counter()
  ssock.sendall(bytes.fromhex(_PING_HEX))
  while True:
    data = ssock.recv(1024)
    ping_time = time.perf_counter() - ping_start
    if not data:
      break
    if data.find(b'ok') != -1:
      ping_complete = True
      break
    # Give up after enough time has passed.
    if ping_time > _MAX_PING_WAIT_TIME:
      break
  if ping_complete:
    print(f'{ping_time:.3f}s ', end='')
  else:
    print('N/A    ', end='')
  return ping_time if ping_complete else None


def Hello(ssock):
  """Sends a Mongo hello message via specified socket."""
  handshake_complete = False
  connection_start = time.perf_counter()
  ssock.sendall(bytes.fromhex(_HELLO_HEX))
  while True:
    data = ssock.recv(1024)
    connect_time = time.perf_counter() - connection_start
    if not data:
      break
    if data.find(b'isWritablePrimary') != -1:
      print(f'Connection established in {connect_time:.3f} seconds')
      handshake_complete = True
      break
    # Give up after enough time has passed.
    if connect_time > _MAX_CONNECTION_WAIT_TIME:
      break
  return connect_time if handshake_complete else None


def ConnectAndPing(hostname, num_pings):
  """Opens an SSL connection and sends timed Mongo commands to the server."""
  context = ssl.create_default_context()
  ping_times = []
  with socket.create_connection((hostname, 443)) as sock:
    with context.wrap_socket(sock, server_hostname=hostname) as ssock:
      connect_time = Hello(ssock)
      print(f'Sending {num_pings} pings ...: ', end='')
      for _ in range(num_pings):
        ping_times.append(Ping(ssock))
      print()
      return (connect_time, ping_times)