我们祝 Debezium 社区 2018 年一切顺利!
在我们忙于 0.7.2 版本的同时,我们想发布另一篇博文,描述一个基于 Debezium 的端到端数据流用例。我们几周前已经展示了如何设置更改数据流到下游数据库。在这篇博文中,我们将采用相同的方法,将数据流式传输到 Elasticsearch 服务器,以利用其出色的全文搜索能力。但为了让事情更有趣一点,我们将把数据流式传输到 PostgreSQL 数据库和 Elasticsearch,这样我们就可以通过 SQL 查询语言以及全文搜索来优化数据访问。
拓扑
这是一个展示数据如何在我们的分布式系统中流动的图表。首先,Debezium MySQL 连接器持续捕获 MySQL 数据库中的变更,并将每个表的变更发送到单独的 Kafka 主题。然后,Confluent JDBC sink 连接器持续读取这些主题并将事件写入 PostgreSQL 数据库。同时,Confluent Elasticsearch 连接器持续读取相同的这些主题并将事件写入 Elasticsearch。
图 1:通用拓扑
我们将把这些组件部署到几个不同的进程中。在本例中,我们将所有三个连接器部署到一个 Kafka Connect 实例中,该实例将代表所有连接器读写 Kafka(在生产环境中,您可能需要将连接器分开以获得更好的性能)。
图 2:简化拓扑
配置
我们将使用这个 Docker Compose 文件来快速部署演示。部署包含以下 Docker 镜像
Debezium 源连接器和 JDBC 和 Elasticsearch 连接器的消息格式不尽相同,因为它们是分开开发的,各自关注稍有不同的目标。Debezium 发出更复杂的事件结构,以便捕获所有可用信息。特别是,变更事件包含已更改记录的旧状态和新状态。另一方面,两个 sink 连接器都期望一个简单的消息,该消息仅表示要写入的记录状态。
Debezium 的 UnwrapFromEnvelope 单条消息转换 (SMT) 将复杂的变更事件结构折叠成两个 sink 连接器所期望的相同的基于行的格式,并有效地充当上述两种格式之间的 消息转换器。
示例
让我们直接进入我们的示例,因为那里变化是可见的。首先,我们需要部署所有组件
export DEBEZIUM_VERSION=0.7
docker-compose up 当所有组件启动后,我们将注册将数据写入 Elasticsearch 实例的 Elasticsearch Sink 连接器。我们希望在源端以及 PostgreSQL 和 Elasticsearch 中使用相同的键(主 ID)。
curl -i -X POST -H "Accept:application/json" \
-H "Content-Type:application/json" https://:8083/connectors/ \
-d @es-sink.json 我们正在使用这个注册请求
{
{
"name": "elastic-sink",
"config": {
"connector.class":
"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "customers",
"connection.url": "http://elastic:9200",
"transforms": "unwrap,key",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope", (1)
"transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",(2)
"transforms.key.field": "id", (2)
"key.ignore": "false", (3)
"type.name": "customer" (4)
}
}
} 该请求配置了以下选项
-
仅从 Debezium 的变更数据消息中提取新行的状态
-
从键 _struct_ 中提取 _id_ 字段,然后源端和两个目标端都使用相同的键。这是为了解决 Elasticsearch 连接器仅支持数字类型和 _string_ 作为键的事实。如果我们不提取 _id_,连接器将因为未知键类型而过滤掉消息。
-
使用事件中的键而不是生成一个合成键
-
在 Elasticsearch 中注册事件的类型
接下来,我们将注册将数据写入 PostgreSQL 数据库的 JDBC Sink 连接器
curl -i -X POST -H "Accept:application/json" \
-H "Content-Type:application/json" https://:8083/connectors/ \
-d @jdbc-sink.json 最后,必须设置源连接器
curl -i -X POST -H "Accept:application/json" \
-H "Content-Type:application/json" https://:8083/connectors/ \
-d @source.json 让我们检查数据库和搜索服务器是否同步。在源数据库 (MySQL)、目标数据库 (PostgreSQL) 和 Elasticsearch 中都应该找到 _customers_ 表的所有行。
docker-compose exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory -e "select * from customers"'
+------+------------+-----------+-----------------------+
| id | first_name | last_name | email |
+------+------------+-----------+-----------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Walker | ed@walker.com |
| 1004 | Anne | Kretchmar | annek@noanswer.org |
+------+------------+-----------+-----------------------+ docker-compose exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from customers"'
last_name | id | first_name | email
-----------+------+------------+-----------------------
Thomas | 1001 | Sally | sally.thomas@acme.com
Bailey | 1002 | George | gbailey@foobar.com
Walker | 1003 | Edward | ed@walker.com
Kretchmar | 1004 | Anne | annek@noanswer.org curl 'https://:9200/customers/_search?pretty'
{
"took" : 42,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"failed" : 0
},
"hits" : {
"total" : 4,
"max_score" : 1.0,
"hits" : [
{
"_index" : "customers",
"_type" : "customer",
"_id" : "1001",
"_score" : 1.0,
"_source" : {
"id" : 1001,
"first_name" : "Sally",
"last_name" : "Thomas",
"email" : "sally.thomas@acme.com"
}
},
{
"_index" : "customers",
"_type" : "customer",
"_id" : "1004",
"_score" : 1.0,
"_source" : {
"id" : 1004,
"first_name" : "Anne",
"last_name" : "Kretchmar",
"email" : "annek@noanswer.org"
}
},
{
"_index" : "customers",
"_type" : "customer",
"_id" : "1002",
"_score" : 1.0,
"_source" : {
"id" : 1002,
"first_name" : "George",
"last_name" : "Bailey",
"email" : "gbailey@foobar.com"
}
},
{
"_index" : "customers",
"_type" : "customer",
"_id" : "1003",
"_score" : 1.0,
"_source" : {
"id" : 1003,
"first_name" : "Edward",
"last_name" : "Walker",
"email" : "ed@walker.com"
}
}
]
}
} 在连接器仍在运行时,我们可以向 MySQL 数据库添加新行,然后检查它是否已复制到 PostgreSQL 数据库和 Elasticsearch 中。
docker-compose exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory'
mysql> insert into customers values(default, 'John', 'Doe', 'john.doe@example.com');
Query OK, 1 row affected (0.02 sec) docker-compose exec -postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from customers"'
last_name | id | first_name | email
-----------+------+------------+-----------------------
...
Doe | 1005 | John | john.doe@example.com
(5 rows) curl 'https://:9200/customers/_search?pretty'
...
{
"_index" : "customers",
"_type" : "customer",
"_id" : "1005",
"_score" : 1.0,
"_source" : {
"id" : 1005,
"first_name" : "John",
"last_name" : "Doe",
"email" : "john.doe@example.com"
}
}
... 关于 Debezium
Debezium 是一个开源的分布式平台,可以将现有数据库转变为事件流,使应用程序能够几乎即时地看到并响应数据库中已提交的每个行级更改。Debezium 构建在 Kafka 之上,并提供了 Kafka Connect 兼容的连接器,用于监控特定的数据库管理系统。Debezium 将数据更改的历史记录在 Kafka 日志中,这样您的应用程序可以随时停止和重新启动,并可以轻松地消费在未运行时错过的所有事件,确保所有事件都被正确且完整地处理。Debezium 在 Apache 许可证 2.0 下是 开源 的。