HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/394/lib/googlecloudsdk/command_lib/spanner/ddl_parser.py
# -*- coding: utf-8 -*- #
# Copyright 2020 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Provides ddl preprocessing for the Spanner ddl."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import logging

from googlecloudsdk.core import exceptions


class DDLSyntaxError(exceptions.Error):
  pass


class DDLParser:
  """Parser for splitting ddl statements preserving GoogleSQL strings literals.

  DDLParse has a list of modes. If any mode is selected, control is given to the
  mode. If no mode is selected, the parser trys to enter the first mode that
  could it could enter. The parser handles splitting statements upon ';'.

  During parsing, a DDL has the following parts:
    * parts that has been processed: emitted or skipped.
    * followed by a buffer that has been matched by the current mode, which
      could be emitted or skipped by a mode. The start index of which is
      mode_start_index_.
    * followed by the next character indexed by next_index_, which could direct
      the parser to enter or exit a mode.
    * followed by the unprocessed character.

  DDLParser:
    * acts as a default mode.
    * provides utilities uesd by ParserMode to drive the parsing.
  """

  def __init__(self, ddl):
    self.ddl_ = ddl
    # Index of the current character to process
    self.next_index_ = 0
    # Mode the parser is in now.
    self.mode_ = None
    # Start index of the buffer that has been matched by a mode or the parser.
    self.mode_start_index_ = 0
    # List of modes. The first one that the parser could enter wins in case of
    # conflict.
    self.modes_ = [
        self.SkippingMode('--', ['\n', '\r']),
        # For all the string modes below, we need to escape \\. If we don't, \\"
        # will trigger mode exiting.
        # Triple double quote.
        # We need escape \", or \""" will be treated trigger mode exiting.
        self.PreservingMode('"""', ['"""'], ['\\"', '\\\\']),
        # Triple single quote.
        # We need escape \', or \''' will be treated trigger mode exiting.
        self.PreservingMode("'''", ["'''"], ["\\'", '\\\\']),
        # Single double quote.
        self.PreservingMode('"', ['"'], ['\\"', '\\\\']),
        # Single single quote.
        self.PreservingMode("'", ["'"], ["\\'", '\\\\']),
        # Single back quote.
        self.PreservingMode('`', ['`'], ['\\`', '\\\\']),
    ]
    # A list of statements. A statement is a list of ddl fragments.
    self.statements_ = []
    self.StartNewStatement()
    self.logger_ = logging.getLogger('SpannerDDLParser')

  def SkippingMode(self, enter_seq, exit_seqs):
    return DDLParserMode(self, enter_seq, exit_seqs, None, True)

  def PreservingMode(self, enter_seq, exit_seqs, escape_sequences):
    return DDLParserMode(self, enter_seq, exit_seqs, escape_sequences, False)

  def IsEof(self):
    return self.next_index_ == len(self.ddl_)

  def Advance(self, l):
    self.next_index_ += l

  def StartNewStatement(self):
    self.ddl_parts_ = []
    self.statements_.append(self.ddl_parts_)

  def EmitBuffer(self):
    if self.mode_start_index_ >= self.next_index_:
      # Buffer is empty.
      return
    self.ddl_parts_.append(
        self.ddl_[self.mode_start_index_:self.next_index_])
    self.SkipBuffer()
    self.logger_.debug('emitted: %s', self.ddl_parts_[-1])

  def SkipBuffer(self):
    self.mode_start_index_ = self.next_index_

  def EnterMode(self, mode):
    self.logger_.debug('enter mode: %s at index: %d',
                       mode.enter_seq_, self.next_index_)
    self.mode_ = mode

  def ExitMode(self):
    self.logger_.debug('exit mode: %s at index: %d',
                       self.mode_.enter_seq_, self.next_index_)
    self.mode_ = None

  def StartsWith(self, s):
    return self.ddl_[self.next_index_:].startswith(s)

  def Process(self):
    """Process the DDL."""
    while not self.IsEof():
      # Delegate to active mode if we have any.
      if self.mode_:
        self.mode_.Process()
        continue
      # Check statement break.
      if self.ddl_[self.next_index_] == ';':
        self.EmitBuffer()
        self.StartNewStatement()
        self.mode_start_index_ += 1
        self.Advance(1)
        continue
      # If we could enter any mode.
      for m in self.modes_:
        if m.TryEnter():
          self.EnterMode(m)
          break
      # No mode is found, consume the character.
      if not self.mode_:
        self.Advance(1)

    # At the end of parsing, we close the unclosed mode.
    if self.mode_ is not None:
      m = self.mode_
      if not m.is_to_skip_:
        raise DDLSyntaxError(
            'Unclosed %s start at index: %d, %s' %
            (m.enter_seq_, self.mode_start_index_, self.ddl_))
      self.mode_.Exit()
    else:
      self.EmitBuffer()
    self.logger_.debug('ddls: %s', self.statements_)
    res = [''.join(frags) for frags in self.statements_ if frags]
    # See https://stackoverflow.com/q/67857941
    if res and res[-1].isspace():
      return res[:-1]
    return res


class DDLParserMode:
  """A mode in DDLParser.

  A mode has one entering sequence, a list of exit sequences and one escape
  sequence. A mode could be:
    * skipping (e.x. comments), which skips the matched text.
    * non-skpping, (e.x. strings), which emits the matched text.
  """

  def __init__(self, parser, enter_seq, exit_seqs, escape_sequences,
               is_to_skip):
    self.parser_ = parser
    self.enter_seq_ = enter_seq
    self.exit_seqs_ = exit_seqs
    self.escape_sequences_ = escape_sequences
    self.is_to_skip_ = is_to_skip

  def TryEnter(self):
    """Trys to enter into the mode."""
    res = self.parser_.StartsWith(self.enter_seq_)
    if res:
      self.parser_.EmitBuffer()
      self.parser_.Advance(len(self.enter_seq_))
    return res

  def Exit(self):
    if self.is_to_skip_:
      self.parser_.SkipBuffer()
    else:
      self.parser_.EmitBuffer()
    self.parser_.ExitMode()

  def FindExitSeqence(self):
    """Finds a matching exit sequence."""
    for s in self.exit_seqs_:
      if self.parser_.StartsWith(s):
        return s
    return None

  def Process(self):
    """Process the ddl at the current parser index."""
    # Put escape sequence into buffer
    if self.escape_sequences_:
      for seq in self.escape_sequences_:
        if self.parser_.StartsWith(seq):
          self.parser_.Advance(len(self.escape_sequences_))
          return
    # Check if we should exit the current mode
    exit_seq = self.FindExitSeqence()
    if not exit_seq:
      self.parser_.Advance(1)
      return

    # Before exit, put exit_seq into buffer for non skipping mode
    if not self.is_to_skip_:
      self.parser_.Advance(len(exit_seq))
    self.Exit()


def PreprocessDDLWithParser(ddl_text):
  return DDLParser(ddl_text).Process()