当我们的MySQL连接器读取MySQL服务器或集群的binlog时,它会解析日志中的DDL语句,并随着时间的推移构建一个内存中的表模式模型。这个过程很重要,因为连接器会使用事件发生时表的定义为每个表生成事件。我们无法使用数据库的当前模式,因为它可能自连接器读取的时间点(或日志中的位置)以来已经发生了变化。

解析MySQL或任何其他主要关系数据库的DDL可能是一项艰巨的任务。通常,每个DBMS都有高度定制的SQL语法,尽管数据操作语言(DML)语句通常非常接近标准,但数据定义语言(DDL)语句通常不那么标准,并且涉及更多DBMS特定的功能。

那么,考虑到这一点,为什么我们自己编写了MySQL的DDL解析器呢?让我们首先看看Debezium需要DDL解析器做什么。

Debezium MySQL 连接器中解析 DDL

MySQL 二进制日志包含各种事件。例如,当一行插入到表中时,二进制日志事件包含对表的间接引用以及表中每列的值,但没有关于构成表的列的信息。二进制日志中引用表结构的唯一内容是 MySQL 在处理用户提供的 DDL 语句时生成的 SQL DDL 语句。

连接器还使用 Kafka Connect Schema 生成消息,这些 Schema 是简单的_数据结构_,用于定义每个字段的各种名称和类型以及字段的组织方式。因此,当我们为表插入生成事件消息时,我们首先必须有一个包含所有适当字段的 Kafka Connect Schema 对象,然后我们必须使用表插入事件中的字段和单个列值,将有序的列值数组转换为 Kafka Connect Struct 对象。

幸运的是,当我们遇到 DDL 语句时,我们可以更新内存中的模型,然后使用该模型生成 Schema 对象。同时,我们可以创建一个组件,该组件将使用此 Schema 对象从事件中出现的有序列值数组创建 Struct 对象。所有这些都可以一次性完成,并用于该表上的所有行事件,直到我们遇到另一条更改表模式的 DDL 语句,届时我们将再次更新模型。

因此,所有这些都需要解析所有 DDL 语句,尽管为了我们的目的,我们只需要_理解_ DDL 语法的一小部分。然后,我们必须使用该语句子集来更新我们的表内存模型。由于我们的内存表模型不是 MySQL 特定的,因此生成 Schema 对象以及将值数组转换为消息中使用的 Struct 对象的组件的其余功能都是通用的。

现有的 DDL 库

不幸的是,用于解析 MySQL、PostgreSQL 或其他流行 RDBMS 的 DDL 语句的第三方开源库实际上并不多。JSqlParser 经常被引用,但它有一个_单一的语法_,该语法是多个 DBMS 语法的组合,因此对于任何特定的 DBMS 都不是严格的解析器。通过更新复合语法来添加对其他 DBMS 的支持可能会很困难。

其他库,例如 PrestoDB,定义了自己的 SQL 语法,无法处理 MySQL DDL 语法_的复杂性和细微差别_。Antlr 解析器生成器项目有一个_MySQL 5.6 的语法_,但它仅限于一小部分 DML,并且不支持 DDL 或较新的 5.7 功能。存在_Antlr 3 的旧 SQL 相关语法_,但这些语法通常非常庞大,存在错误,并且仅限于特定的 DBMS。Teiid 项目是一个_数据虚拟化引擎_,它位于各种 DBMS 和数据源之上,并且它的工具具有_一系列 DDL 解析器_,用于在特殊存储库中构建 AST(作者实际上帮助开发了这些)。还有 Ruby 库,例如 Square 的 MySQL Parser 库。还有一个_专有商业产品_。

我们的 DDL 解析器框架

由于找不到有用的第三方开源库,我们选择创建自己的、满足我们需求的 DDL 解析器框架。

  • 解析 DDL 语句并更新我们的内存模型。

  • 专注于_消耗_那些必要的语句(例如,创建、修改和删除表和视图),同时_完全忽略_其他语句而无需解析它们。

  • 将解析器代码的结构_与 MySQL DDL 语法文档_类似,并使用与语法中的规则相对应的_方法名称_。这将使其_易于维护_。

  • _允许创建_ PostgreSQL、Oracle、SQLServer 和其他 DBMS 的解析器_按需_。

  • 支持通过_子类化_进行自定义:_能够轻松覆盖_狭窄 portions 的逻辑,而_无需复制大量代码_。

  • _易于开发、调试和测试_解析器。

生成的框架包括一个_分词器_,它将字符串中的一个或多个 DDL 语句转换为_可重置_的 token 序列,其中每个 token 代表标点符号、带引号的字符串、不区分大小写的单词和符号、数字、关键字、注释和终止字符(例如 MySQL 的 ;)。然后,DDL 解析器_遍历 token 流_,使用简单易读的_流畅 API_寻找模式,通过调用自身的方法来处理各种 token 集合。该解析器还使用内部的_数据类型解析器_来处理 SQL 数据类型表达式,例如 INTVARCHAR(64)NUMERIC(32,3)TIMESTAMP(8) WITH TIME ZONE

MySqlDdlParser 类_扩展了_一个_基类_,并提供了所有 MySQL 特定的解析逻辑。例如,DDL 语句

# Create and populate our products using a single insert with many rows
CREATE TABLE products (
  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  name VARCHAR(255) NOT NULL,
  description VARCHAR(512),
  weight FLOAT
);
ALTER TABLE products AUTO_INCREMENT = 101;

# Create and populate the products on hand using multiple inserts
CREATE TABLE products_on_hand (
  product_id INTEGER NOT NULL PRIMARY KEY,
  quantity INTEGER NOT NULL,
  FOREIGN KEY (product_id) REFERENCES products(id)
);

可以轻松地通过以下方式解析

String ddlStatements = ...
DdlParser parser = new MySqlDdlParser();
Tables tables = new Tables();
parser.parse(ddl, tables);

在这里,Tables 对象是我们命名表定义的内存表示。解析器处理 DDL 语句,并将每个语句应用于 Tables 对象中的相应表定义。

工作原理

每个 DdlParser 实现都有以下公共方法,该方法将解析所提供字符串中的语句。

    public final void parse(String ddlContent, Tables databaseTables) {
        Tokenizer tokenizer = new DdlTokenizer(!skipComments(), this::determineTokenType);
        TokenStream stream = new TokenStream(ddlContent, tokenizer, false);
        stream.start();
        parse(stream, databaseTables);
    }

在这里,该方法使用 DdlTokenizer 从内容中创建一个新的 TokenStream,该分词器知道如何将字符串中的字符_分隔_为各种类型的 token 对象。然后,它调用另一个 parse 方法来执行大部分工作。

    public final void parse(TokenStream ddlContent, Tables databaseTables)
                           throws ParsingException, IllegalStateException {
        this.tokens = ddlContent;
        this.databaseTables = databaseTables;
        Marker marker = ddlContent.mark();
        try {
            while (ddlContent.hasNext()) {
                parseNextStatement(ddlContent.mark());
                // Consume the statement terminator if it is still there ...
                tokens.canConsume(DdlTokenizer.STATEMENT_TERMINATOR);
            }
        } catch (ParsingException e) {
            ddlContent.rewind(marker);
            throw e;
        } catch (Throwable t) {
            parsingFailed(ddlContent.nextPosition(),
                          "Unexpected exception (" + t.getMessage() + ") parsing", t);
        }
    }

这会设置一些本地状态,标记当前开始点,并尝试解析 DDL 语句,直到找不到更多语句为止。如果解析逻辑未能找到匹配项,它将生成一个 ParsingException,其中包含_有问题的行和列_以及_指示发现内容和预期内容的消息_。在这种情况下,此方法会_重置 token 流_(以防调用者希望尝试_另一种不同的解析器_)。

每次调用 parseNextStatement 方法时,都会将该语句的_起始位置_传递给该方法,从而_为该语句提供了起始位置_。我们的 MySqlDdlParser 子类_覆盖了 parseNextStatement 方法_,以使用语句中的第一个 token 来确定 MySQL DDL 语法中允许的语句类型。

    @Override
    protected void parseNextStatement(Marker marker) {
        if (tokens.matches(DdlTokenizer.COMMENT)) {
            parseComment(marker);
        } else if (tokens.matches("CREATE")) {
            parseCreate(marker);
        } else if (tokens.matches("ALTER")) {
            parseAlter(marker);
        } else if (tokens.matches("DROP")) {
            parseDrop(marker);
        } else if (tokens.matches("RENAME")) {
            parseRename(marker);
        } else {
            parseUnknownStatement(marker);
        }
    }

当找到匹配的 token 时,该方法会调用_相应的方法_。例如,如果语句以 CREATE TABLE …​ 开头,则会调用 parseCreate 方法,并_传入相同的标记_,该标记标识语句的起始位置。

    @Override
    protected void parseCreate(Marker marker) {
        tokens.consume("CREATE");
        if (tokens.matches("TABLE") || tokens.matches("TEMPORARY", "TABLE")) {
            parseCreateTable(marker);
        } else if (tokens.matches("VIEW")) {
            parseCreateView(marker);
        } else if (tokens.matchesAnyOf("DATABASE", "SCHEMA")) {
            parseCreateUnknown(marker);
        } else if (tokens.matchesAnyOf("EVENT")) {
            parseCreateUnknown(marker);
        } else if (tokens.matchesAnyOf("FUNCTION", "PROCEDURE")) {
            parseCreateUnknown(marker);
        } else if (tokens.matchesAnyOf("UNIQUE", "FULLTEXT", "SPATIAL", "INDEX")) {
            parseCreateIndex(marker);
        } else if (tokens.matchesAnyOf("SERVER")) {
            parseCreateUnknown(marker);
        } else if (tokens.matchesAnyOf("TABLESPACE")) {
            parseCreateUnknown(marker);
        } else if (tokens.matchesAnyOf("TRIGGER")) {
            parseCreateUnknown(marker);
        } else {
            // It could be several possible things (including more
            // elaborate forms of those matches tried above),
            sequentially(this::parseCreateView,
                         this::parseCreateUnknown);
        }
    }

在这里,该方法首先消耗 CREATE 字面量 token,然后尝试_匹配_ token 与各种 token 字面量模式。如果找到匹配项,此方法会将控制权_委托给_其他更_具体的解析方法_。请注意,框架的流畅 API_使得匹配模式非常易于理解_。

我们再往前进一步。假设我们的 DDL 语句以 CREATE TABLE products ( 开头,那么解析器将调用 parseCreateTable 方法,同样_使用相同的标记_来_表示语句的开始_。

    protected void parseCreateTable(Marker start) {
        tokens.canConsume("TEMPORARY");
        tokens.consume("TABLE");
        boolean onlyIfNotExists = tokens.canConsume("IF", "NOT", "EXISTS");
        TableId tableId = parseQualifiedTableName(start);
        if ( tokens.canConsume("LIKE")) {
            TableId originalId = parseQualifiedTableName(start);
            Table original = databaseTables.forTable(originalId);
            if ( original != null ) {
                databaseTables.overwriteTable(tableId, original.columns(),
                                              original.primaryKeyColumnNames());
            }
            consumeRemainingStatement(start);
            debugParsed(start);
            return;
        }
        if (onlyIfNotExists && databaseTables.forTable(tableId) != null) {
            // The table does exist, so we should do nothing ...
            consumeRemainingStatement(start);
            debugParsed(start);
            return;
        }
        TableEditor table = databaseTables.editOrCreateTable(tableId);

        // create_definition ...
        if (tokens.matches('(')) parseCreateDefinitionList(start, table);
        // table_options ...
        parseTableOptions(start, table);
        // partition_options ...
        if (tokens.matches("PARTITION")) {
            parsePartitionOptions(start, table);
        }
        // select_statement
        if (tokens.canConsume("AS") || tokens.canConsume("IGNORE", "AS")
            || tokens.canConsume("REPLACE", "AS")) {
            parseAsSelectStatement(start, table);
        }

        // Update the table definition ...
        databaseTables.overwriteTable(table.create());
        debugParsed(start);
    }

此方法试图_模仿 MySQL CREATE TABLE 语法规则_,这些规则以以下内容开始:

CREATE [TEMPORARY] TABLE [IF NOT EXISTS] tbl_name
    (create_definition,...)
    [table_options]
    [partition_options]

CREATE [TEMPORARY] TABLE [IF NOT EXISTS] tbl_name
    [(create_definition,...)]
    [table_options]
    [partition_options]
    select_statement

CREATE [TEMPORARY] TABLE [IF NOT EXISTS] tbl_name
    { LIKE old_tbl_name | (LIKE old_tbl_name) }

create_definition:
    ...

在我们的 parseCreateTable 开始之前,CREATE 字面量已经被消耗掉了,因此它首先尝试消耗 TEMPORARY 字面量(如果存在)、TABLE 字面量、IF NOT EXISTS 片段(如果存在),然后消耗并解析表的_限定名称_。如果语句包含 LIKE otherTable,它将使用 databaseTables(它是对我们 Tables 对象的引用)来_用_引用表的定义_覆盖_命名表的定义。否则,它会_获取一个新表的编辑器_,然后(像语法规则一样)解析_create_definition 片段列表_,后面跟着 _table_options_、_partition_options_,以及可能一个 _select_statement_。

请查看完整的 MySqlDdlParser 类以了解更多详细信息。

总结

这篇博文详细介绍了 MySQL 连接器_为什么使用二进制日志中的 DDL 语句_,尽管我们_仅粗略地介绍了_连接器_如何使用其框架解析 DDL_,以及_如何在将来的其他 DBMS 方言的解析器中重用它_。

尝试我们的教程,了解 MySQL 连接器如何工作,并_请继续关注_更多连接器、版本和新闻。

Randall Hauch

Randall 是 Red Hat 的开源软件开发者,近 20 年来一直从事数据集成工作。他是 Debezium 的创始人,并参与过其他几个开源项目。他住在伊利诺伊州爱德华兹维尔,靠近圣路易斯。

     


关于 Debezium

Debezium 是一个开源的分布式平台,可以将现有数据库转变为事件流,使应用程序能够几乎即时地看到并响应数据库中已提交的每个行级更改。Debezium 构建在 Kafka 之上,并提供了 Kafka Connect 兼容的连接器,用于监控特定的数据库管理系统。Debezium 将数据更改的历史记录在 Kafka 日志中,这样您的应用程序可以随时停止和重新启动,并可以轻松地消费在未运行时错过的所有事件,确保所有事件都被正确且完整地处理。Debezium 在 Apache 许可证 2.0 下是 开源 的。

参与进来

我们希望您觉得 Debezium 有趣且有用,并希望尝试一下。在 Twitter @debezium 上关注我们,在 Zulip 上与我们聊天,或加入我们的 邮件列表 与社区交流。所有代码都在 GitHub 上开源,因此请在本地构建代码,帮助我们改进现有连接器并添加更多连接器。如果您发现问题或有改进 Debezium 的想法,请告诉我们或 记录一个问题

版权所有 © Debezium 及其作者。保留所有权利。有关我们的商标详情,请访问我们的 商标政策商标列表。第三方商标属于其各自所有者,在此提及并不表示任何认可或关联。
×