最后更新于 2018年11月21日 (已更新至新的 KSQL Docker 镜像).

去年,我们见证了一个新的开源项目在 Apache Kafka 生态系统中诞生,那就是 KSQL,它是一个构建在 Kafka Streams 之上的流式 SQL 引擎。在这篇文章中,我们将尝试使用 Debezium 从 MySQL 数据库生成的数据更改事件来查询 KSQL。

我们将使用 教程 中的数据库和设置作为数据源。这次练习的结果应该与最近关于将事件聚合为 领域驱动聚合文章 类似。

实体图

首先,让我们看看数据库中的实体以及它们之间的关系。

图 1:示例实体的实体图

 

上图显示了示例 MySQL 实例中 inventory 数据库的完整 ER 图。我们将重点关注两个实体:

  • customers - 系统中的客户列表

  • orders - 系统中的订单列表

customersorders 之间存在一个 1:n 的关系,由 orders 表中的 purchaser 列建模,该列是 customers 表的外键。

配置

我们将使用一个 Docker Compose 文件来部署环境。该部署包含以下 Docker 镜像:

示例

首先,我们需要启动 Debezium 和 Kafka 基础设施。为此,请克隆 debezium-examples GitHub 存储库,并使用提供的 Compose 文件启动所需组件。

export DEBEZIUM_VERSION=0.8
git clone https://github.com/debezium/debezium-examples.git
cd debezium-examples/ksql/
docker-compose up

接下来,我们必须注册一个 Debezium MySQL 连接器实例,以监听数据库中的更改。

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" https://:8083/connectors/ -d @- <<-EOF
{
    "name": "inventory-connector",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "mysql",
        "database.port": "3306",
        "database.user": "debezium",
        "database.password": "dbz",
        "database.server.id": "184055",
        "database.server.name": "dbserver",
        "database.whitelist": "inventory",
        "database.history.kafka.bootstrap.servers": "kafka:9092",
        "database.history.kafka.topic": "schema-changes.inventory",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "false",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false"
    }
}
EOF

现在,所有组件都应该已启动并正在运行,并且初始数据更改事件已流式传输到 Kafka 主题。对于我们的用例,有几个属性尤为重要:

  • 使用了 UnwrapFromEnvelope SMT。这允许我们将更改记录的 after 部分中的字段直接映射到 KSQL 语句。如果没有它,我们就需要为要从消息的 after 部分提取的每个字段使用 EXTRACTJSONFIELD

  • JSON 转换器的模式被禁用。原因与上面相同。启用模式后,对于 JSON,记录会封装在一个 JSON 结构中,该结构包含 schema(包含模式信息)和 payload(包含实际数据本身)字段。我们仍然需要使用 EXTRACTJSONFIELD 来访问相关字段。Avro 转换器没有这个问题,因此在使用 Avro 时不需要设置此选项。

接下来,我们将启动 KSQL 命令行 shell。我们将使用 CLI 运行一个本地引擎。另请注意 --net 参数。这可以确保 KSQL 容器与 Debezium 容器运行在同一网络中,并允许正确的 DNS 解析。

docker-compose exec ksql-cli ksql http://ksql-server:8088

首先,我们将列出代理中存在的所有 Kafka 主题。

ksql> LIST TOPICS;

 Kafka Topic                         | Registered | Partitions | Partition Replicas
------------------------------------------------------------------------------------
 connect-status                      | false      | 5          | 1
 dbserver                            | false      | 1          | 1
 dbserver.inventory.addresses        | false      | 1          | 1
 dbserver.inventory.customers        | false      | 1          | 1
 dbserver.inventory.orders           | false      | 1          | 1
 dbserver.inventory.products         | false      | 1          | 1
 dbserver.inventory.products_on_hand | false      | 1          | 1
 ksql__commands                      | true       | 1          | 1
 my_connect_configs                  | false      | 1          | 1
 my_connect_offsets                  | false      | 25         | 1
 schema-changes.inventory            | false      | 1          | 1

我们感兴趣的主题是 dbserver.inventory.ordersdbserver.inventory.customers

KSQL 处理默认从 latest 偏移量开始。我们希望处理主题中已有的事件,因此我们将处理从 earliest 偏移量切换。

ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'

首先,我们需要从包含 Debezium 数据更改事件的主题创建流。在 KSQL 和 Kafka Streams 术语中,是一个没有状态的无界传入数据集。

ksql> CREATE STREAM orders_from_debezium (order_number integer, order_date string, purchaser integer, quantity integer, product_id integer) WITH (KAFKA_TOPIC='dbserver.inventory.orders',VALUE_FORMAT='json');

 Message
----------------
 Stream created
ksql>
ksql> CREATE STREAM customers_from_debezium (id integer, first_name string, last_name string, email string) WITH (KAFKA_TOPIC='dbserver.inventory.customers',VALUE_FORMAT='json');

 Message
----------------
 Stream created

分区

我们的部署每个主题只使用一个分区。在生产系统中,每个主题很可能拥有多个分区,我们需要确保属于我们聚合对象的所有事件都进入同一个分区。在我们的情况下,自然的分区是按客户 ID。我们将根据包含客户 ID 的 purchaser 字段对 orders_from_debezium 流进行重新分区。重新分区的数据写入新主题 ORDERS_REPART

ksql> CREATE STREAM orders WITH (KAFKA_TOPIC='ORDERS_REPART',VALUE_FORMAT='json',PARTITIONS=1) as SELECT * FROM orders_from_debezium PARTITION BY PURCHASER;

 Message
----------------------------
 Stream created and running
ksql> LIST TOPICS;

 Kafka Topic                         | Registered | Partitions | Partition Replicas
------------------------------------------------------------------------------------
...
 ORDERS_REPART                       | true       | 1          | 1
...

我们将对客户执行相同的操作。这有两个原因:

  • 当前键是一个包含名为 id 的字段(客户 ID)的结构。这与重新分区的订单主题不同,后者仅包含 id 值作为键,因此分区不会匹配。

  • 稍后创建 JOIN 时,有一个限制要求键的值必须与表中的键字段相同。表字段包含一个普通值,但键包含一个结构,因此它们不匹配。有关更多详细信息,请参阅 此 KSQL 问题

ksql> CREATE STREAM customers_stream WITH (KAFKA_TOPIC='CUSTOMERS_REPART',VALUE_FORMAT='json',PARTITIONS=1) as SELECT * FROM customers_from_debezium PARTITION BY ID;

 Message
----------------------------
 Stream created and running
ksql> LIST TOPICS;

 Kafka Topic                         | Registered | Partitions | Partition Replicas
------------------------------------------------------------------------------------
...
 CUSTOMERS_REPART                    | true       | 1          | 1
...

为了验证记录是否具有新键并因此被重新分区,我们可以发出几个语句来比较结果。

ksql> SELECT * FROM orders_from_debezium LIMIT 1;
1524034842810 | {"order_number":10001} | 10001 | 16816 | 1001 | 1 | 102
LIMIT reached for the partition.
Query terminated
ksql> SELECT * FROM orders LIMIT 1;
1524034842810 | 1001 | 10001 | 16816 | 1001 | 1 | 102
LIMIT reached for the partition.
Query terminated

第二列包含 ROWKEY,这是消息的键。

客户/订单 JOIN

到目前为止,我们仅将流声明为无界无状态数据集。在我们的用例中,order 实际上是一个来来往往的事件。但 customer 是一个可以更新的实体,通常是系统状态的一部分。KSQL 或 Kafka Streams 以表的形式表示这种特性。我们将从包含重新分区客户的主题创建客户表。

ksql> CREATE TABLE customers (id integer, first_name string, last_name string, email string) WITH (KAFKA_TOPIC='CUSTOMERS_REPART',VALUE_FORMAT='json',KEY='id');

 Message
---------------
 Table created

现在我们已经准备好一切,可以 JOIN 客户及其订单,并创建一个查询来监视传入的订单并列出它们及其关联的客户字段。

ksql> SELECT order_number,quantity,customers.first_name,customers.last_name FROM orders left join customers on orders.purchaser=customers.id;
10001 | 1 | Sally | Thomas
10002 | 2 | George | Bailey
10003 | 2 | George | Bailey
10004 | 1 | Edward | Walker

让我们对数据库进行一些更改,这将导致 Debezium 发出相应的 CDC 事件。

docker-compose exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory'

mysql> INSERT INTO orders VALUES(default,NOW(), 1003,5,101);
Query OK, 1 row affected, 1 warning (0.02 sec)

mysql> UPDATE customers SET first_name='Annie' WHERE id=1004;
Query OK, 1 row affected (0.02 sec)
Rows matched: 1  Changed: 1  Warnings: 0

mysql> UPDATE orders SET quantity=20 WHERE order_number=10004;
Query OK, 1 row affected (0.02 sec)
Rows matched: 1  Changed: 1  Warnings: 0

您可能会注意到,只有 orders 表中的更改触发了 JOIN 流中的更改。这是流/表 JOIN 的结果。我们需要流/流 JOIN 才能在输入流之一被修改时触发更改。

因此,在数据库修改后,select 的最终结果是:

10001 | 1 | Sally | Thomas
10002 | 2 | George | Bailey
10003 | 2 | George | Bailey
10004 | 1 | Edward | Walker
10005 | 5 | Edward | Walker
10004 | 20 | Edward | Walker

总结

我们已成功启动 KSQL 实例。我们已将 KSQL 流映射到由 Debezium 填充的 Debezium 主题,并在它们之间进行了 JOIN。我们还讨论了流应用程序中重新分区的问题。

如果您想尝试使用 Avro 编码和模式注册表来运行此示例,可以使用我们的 Avro 示例。有关更多详细信息和更高级的用法,请参阅 KSQL 语法参考

如果您需要帮助、有功能请求或想分享您对这个示例的经验,请在下面的评论中告知我们。

Jiri Pechanec

Jiri 是 Red Hat 的一名软件开发人员(也是前质量工程师)。他的职业生涯大部分时间都投入在 Java 和系统集成项目和任务中。他居住在捷克共和国布尔诺附近。

   


关于 Debezium

Debezium 是一个开源的分布式平台,可以将现有数据库转变为事件流,使应用程序能够几乎即时地看到并响应数据库中已提交的每个行级更改。Debezium 构建在 Kafka 之上,并提供了 Kafka Connect 兼容的连接器,用于监控特定的数据库管理系统。Debezium 将数据更改的历史记录在 Kafka 日志中,这样您的应用程序可以随时停止和重新启动,并可以轻松地消费在未运行时错过的所有事件,确保所有事件都被正确且完整地处理。Debezium 在 Apache 许可证 2.0 下是 开源 的。

参与进来

我们希望您觉得 Debezium 有趣且有用,并希望尝试一下。在 Twitter @debezium 上关注我们,在 Zulip 上与我们聊天,或加入我们的 邮件列表 与社区交流。所有代码都在 GitHub 上开源,因此请在本地构建代码,帮助我们改进现有连接器并添加更多连接器。如果您发现问题或有改进 Debezium 的想法,请告诉我们或 记录一个问题

版权所有 © Debezium 及其作者。保留所有权利。有关我们的商标详情,请访问我们的 商标政策商标列表。第三方商标属于其各自所有者,在此提及并不表示任何认可或关联。
×