最后更新于 2018年11月21日 (已更新至新的 KSQL Docker 镜像).
去年,我们见证了一个新的开源项目在 Apache Kafka 生态系统中诞生,那就是 KSQL,它是一个构建在 Kafka Streams 之上的流式 SQL 引擎。在这篇文章中,我们将尝试使用 Debezium 从 MySQL 数据库生成的数据更改事件来查询 KSQL。
实体图
首先,让我们看看数据库中的实体以及它们之间的关系。
图 1:示例实体的实体图
上图显示了示例 MySQL 实例中 inventory 数据库的完整 ER 图。我们将重点关注两个实体:
-
customers- 系统中的客户列表 -
orders- 系统中的订单列表
customers 和 orders 之间存在一个 1:n 的关系,由 orders 表中的 purchaser 列建模,该列是 customers 表的外键。
配置
我们将使用一个 Docker Compose 文件来部署环境。该部署包含以下 Docker 镜像:
示例
首先,我们需要启动 Debezium 和 Kafka 基础设施。为此,请克隆 debezium-examples GitHub 存储库,并使用提供的 Compose 文件启动所需组件。
export DEBEZIUM_VERSION=0.8
git clone https://github.com/debezium/debezium-examples.git
cd debezium-examples/ksql/
docker-compose up 接下来,我们必须注册一个 Debezium MySQL 连接器实例,以监听数据库中的更改。
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" https://:8083/connectors/ -d @- <<-EOF
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184055",
"database.server.name": "dbserver",
"database.whitelist": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}
}
EOF 现在,所有组件都应该已启动并正在运行,并且初始数据更改事件已流式传输到 Kafka 主题。对于我们的用例,有几个属性尤为重要:
-
使用了 UnwrapFromEnvelope SMT。这允许我们将更改记录的
after部分中的字段直接映射到 KSQL 语句。如果没有它,我们就需要为要从消息的after部分提取的每个字段使用EXTRACTJSONFIELD。 -
JSON 转换器的模式被禁用。原因与上面相同。启用模式后,对于 JSON,记录会封装在一个 JSON 结构中,该结构包含
schema(包含模式信息)和payload(包含实际数据本身)字段。我们仍然需要使用EXTRACTJSONFIELD来访问相关字段。Avro 转换器没有这个问题,因此在使用 Avro 时不需要设置此选项。
接下来,我们将启动 KSQL 命令行 shell。我们将使用 CLI 运行一个本地引擎。另请注意 --net 参数。这可以确保 KSQL 容器与 Debezium 容器运行在同一网络中,并允许正确的 DNS 解析。
docker-compose exec ksql-cli ksql http://ksql-server:8088 首先,我们将列出代理中存在的所有 Kafka 主题。
ksql> LIST TOPICS;
Kafka Topic | Registered | Partitions | Partition Replicas
------------------------------------------------------------------------------------
connect-status | false | 5 | 1
dbserver | false | 1 | 1
dbserver.inventory.addresses | false | 1 | 1
dbserver.inventory.customers | false | 1 | 1
dbserver.inventory.orders | false | 1 | 1
dbserver.inventory.products | false | 1 | 1
dbserver.inventory.products_on_hand | false | 1 | 1
ksql__commands | true | 1 | 1
my_connect_configs | false | 1 | 1
my_connect_offsets | false | 25 | 1
schema-changes.inventory | false | 1 | 1 我们感兴趣的主题是 dbserver.inventory.orders 和 dbserver.inventory.customers。
KSQL 处理默认从 latest 偏移量开始。我们希望处理主题中已有的事件,因此我们将处理从 earliest 偏移量切换。
ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest' 首先,我们需要从包含 Debezium 数据更改事件的主题创建流。在 KSQL 和 Kafka Streams 术语中,流是一个没有状态的无界传入数据集。
ksql> CREATE STREAM orders_from_debezium (order_number integer, order_date string, purchaser integer, quantity integer, product_id integer) WITH (KAFKA_TOPIC='dbserver.inventory.orders',VALUE_FORMAT='json');
Message
----------------
Stream created
ksql>
ksql> CREATE STREAM customers_from_debezium (id integer, first_name string, last_name string, email string) WITH (KAFKA_TOPIC='dbserver.inventory.customers',VALUE_FORMAT='json');
Message
----------------
Stream created 分区
我们的部署每个主题只使用一个分区。在生产系统中,每个主题很可能拥有多个分区,我们需要确保属于我们聚合对象的所有事件都进入同一个分区。在我们的情况下,自然的分区是按客户 ID。我们将根据包含客户 ID 的 purchaser 字段对 orders_from_debezium 流进行重新分区。重新分区的数据写入新主题 ORDERS_REPART。
ksql> CREATE STREAM orders WITH (KAFKA_TOPIC='ORDERS_REPART',VALUE_FORMAT='json',PARTITIONS=1) as SELECT * FROM orders_from_debezium PARTITION BY PURCHASER;
Message
----------------------------
Stream created and running
ksql> LIST TOPICS;
Kafka Topic | Registered | Partitions | Partition Replicas
------------------------------------------------------------------------------------
...
ORDERS_REPART | true | 1 | 1
... 我们将对客户执行相同的操作。这有两个原因:
-
当前键是一个包含名为
id的字段(客户 ID)的结构。这与重新分区的订单主题不同,后者仅包含id值作为键,因此分区不会匹配。 -
稍后创建 JOIN 时,有一个限制要求键的值必须与表中的键字段相同。表字段包含一个普通值,但键包含一个结构,因此它们不匹配。有关更多详细信息,请参阅 此 KSQL 问题。
ksql> CREATE STREAM customers_stream WITH (KAFKA_TOPIC='CUSTOMERS_REPART',VALUE_FORMAT='json',PARTITIONS=1) as SELECT * FROM customers_from_debezium PARTITION BY ID;
Message
----------------------------
Stream created and running
ksql> LIST TOPICS;
Kafka Topic | Registered | Partitions | Partition Replicas
------------------------------------------------------------------------------------
...
CUSTOMERS_REPART | true | 1 | 1
... 为了验证记录是否具有新键并因此被重新分区,我们可以发出几个语句来比较结果。
ksql> SELECT * FROM orders_from_debezium LIMIT 1;
1524034842810 | {"order_number":10001} | 10001 | 16816 | 1001 | 1 | 102
LIMIT reached for the partition.
Query terminated
ksql> SELECT * FROM orders LIMIT 1;
1524034842810 | 1001 | 10001 | 16816 | 1001 | 1 | 102
LIMIT reached for the partition.
Query terminated 第二列包含 ROWKEY,这是消息的键。
客户/订单 JOIN
到目前为止,我们仅将流声明为无界无状态数据集。在我们的用例中,order 实际上是一个来来往往的事件。但 customer 是一个可以更新的实体,通常是系统状态的一部分。KSQL 或 Kafka Streams 以表的形式表示这种特性。我们将从包含重新分区客户的主题创建客户表。
ksql> CREATE TABLE customers (id integer, first_name string, last_name string, email string) WITH (KAFKA_TOPIC='CUSTOMERS_REPART',VALUE_FORMAT='json',KEY='id');
Message
---------------
Table created 现在我们已经准备好一切,可以 JOIN 客户及其订单,并创建一个查询来监视传入的订单并列出它们及其关联的客户字段。
ksql> SELECT order_number,quantity,customers.first_name,customers.last_name FROM orders left join customers on orders.purchaser=customers.id;
10001 | 1 | Sally | Thomas
10002 | 2 | George | Bailey
10003 | 2 | George | Bailey
10004 | 1 | Edward | Walker 让我们对数据库进行一些更改,这将导致 Debezium 发出相应的 CDC 事件。
docker-compose exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory'
mysql> INSERT INTO orders VALUES(default,NOW(), 1003,5,101);
Query OK, 1 row affected, 1 warning (0.02 sec)
mysql> UPDATE customers SET first_name='Annie' WHERE id=1004;
Query OK, 1 row affected (0.02 sec)
Rows matched: 1 Changed: 1 Warnings: 0
mysql> UPDATE orders SET quantity=20 WHERE order_number=10004;
Query OK, 1 row affected (0.02 sec)
Rows matched: 1 Changed: 1 Warnings: 0 您可能会注意到,只有 orders 表中的更改触发了 JOIN 流中的更改。这是流/表 JOIN 的结果。我们需要流/流 JOIN 才能在输入流之一被修改时触发更改。
因此,在数据库修改后,select 的最终结果是:
10001 | 1 | Sally | Thomas
10002 | 2 | George | Bailey
10003 | 2 | George | Bailey
10004 | 1 | Edward | Walker
10005 | 5 | Edward | Walker
10004 | 20 | Edward | Walker 关于 Debezium
Debezium 是一个开源的分布式平台,可以将现有数据库转变为事件流,使应用程序能够几乎即时地看到并响应数据库中已提交的每个行级更改。Debezium 构建在 Kafka 之上,并提供了 Kafka Connect 兼容的连接器,用于监控特定的数据库管理系统。Debezium 将数据更改的历史记录在 Kafka 日志中,这样您的应用程序可以随时停止和重新启动,并可以轻松地消费在未运行时错过的所有事件,确保所有事件都被正确且完整地处理。Debezium 在 Apache 许可证 2.0 下是 开源 的。