我们祝 Debezium 社区 2018 年一切顺利!

在我们忙于 0.7.2 版本的同时,我们想发布另一篇博文,描述一个基于 Debezium 的端到端数据流用例。我们几周前已经展示了如何设置更改数据流到下游数据库。在这篇博文中,我们将采用相同的方法,将数据流式传输到 Elasticsearch 服务器,以利用其出色的全文搜索能力。但为了让事情更有趣一点,我们将把数据流式传输到 PostgreSQL 数据库和 Elasticsearch,这样我们就可以通过 SQL 查询语言以及全文搜索来优化数据访问。

拓扑

这是一个展示数据如何在我们的分布式系统中流动的图表。首先,Debezium MySQL 连接器持续捕获 MySQL 数据库中的变更,并将每个表的变更发送到单独的 Kafka 主题。然后,Confluent JDBC sink 连接器持续读取这些主题并将事件写入 PostgreSQL 数据库。同时,Confluent Elasticsearch 连接器持续读取相同的这些主题并将事件写入 Elasticsearch。

 

图 1:通用拓扑

 

我们将把这些组件部署到几个不同的进程中。在本例中,我们将所有三个连接器部署到一个 Kafka Connect 实例中,该实例将代表所有连接器读写 Kafka(在生产环境中,您可能需要将连接器分开以获得更好的性能)。

 

图 2:简化拓扑

配置

我们将使用这个 Docker Compose 文件来快速部署演示。部署包含以下 Docker 镜像

  • Apache ZooKeeper

  • Apache Kafka

  • 一个经过 增强的 Kafka Connect / Debezium 镜像,并做了一些修改

    • PostgreSQL JDBC 驱动程序放置在 _/kafka/libs_ 目录中

    • Confluent JDBC 连接器放置在 _/kafka/connect/kafka-connect-jdbc_ 目录中

  • 预先填充的 MySQL,如我们的 教程中所使用的

  • 空的 PostgreSQL

  • 空的 Elasticsearch

Debezium 源连接器和 JDBC 和 Elasticsearch 连接器的消息格式不尽相同,因为它们是分开开发的,各自关注稍有不同的目标。Debezium 发出更复杂的事件结构,以便捕获所有可用信息。特别是,变更事件包含已更改记录的旧状态和新状态。另一方面,两个 sink 连接器都期望一个简单的消息,该消息仅表示要写入的记录状态。

Debezium 的 UnwrapFromEnvelope 单条消息转换 (SMT) 将复杂的变更事件结构折叠成两个 sink 连接器所期望的相同的基于行的格式,并有效地充当上述两种格式之间的 消息转换器

示例

让我们直接进入我们的示例,因为那里变化是可见的。首先,我们需要部署所有组件

export DEBEZIUM_VERSION=0.7
docker-compose up

当所有组件启动后,我们将注册将数据写入 Elasticsearch 实例的 Elasticsearch Sink 连接器。我们希望在源端以及 PostgreSQL 和 Elasticsearch 中使用相同的键(主 ID)。

curl -i -X POST -H "Accept:application/json" \
    -H  "Content-Type:application/json" https://:8083/connectors/ \
    -d @es-sink.json

我们正在使用这个注册请求

{
  {
    "name": "elastic-sink",
    "config": {
      "connector.class":
          "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
      "tasks.max": "1",
      "topics": "customers",
      "connection.url": "http://elastic:9200",
      "transforms": "unwrap,key",
      "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",        (1)
      "transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",(2)
      "transforms.key.field": "id",                                                 (2)
      "key.ignore": "false",                                                        (3)
      "type.name": "customer"                                                       (4)
    }
  }
}

该请求配置了以下选项

  1. 仅从 Debezium 的变更数据消息中提取新行的状态

  2. 从键 _struct_ 中提取 _id_ 字段,然后源端和两个目标端都使用相同的键。这是为了解决 Elasticsearch 连接器仅支持数字类型和 _string_ 作为键的事实。如果我们不提取 _id_,连接器将因为未知键类型而过滤掉消息。

  3. 使用事件中的键而不是生成一个合成键

  4. 在 Elasticsearch 中注册事件的类型

接下来,我们将注册将数据写入 PostgreSQL 数据库的 JDBC Sink 连接器

curl -i -X POST -H "Accept:application/json" \
    -H  "Content-Type:application/json" https://:8083/connectors/ \
    -d @jdbc-sink.json

最后,必须设置源连接器

curl -i -X POST -H "Accept:application/json" \
    -H  "Content-Type:application/json" https://:8083/connectors/ \
    -d @source.json

让我们检查数据库和搜索服务器是否同步。在源数据库 (MySQL)、目标数据库 (PostgreSQL) 和 Elasticsearch 中都应该找到 _customers_ 表的所有行。

docker-compose exec mysql bash -c 'mysql -u $MYSQL_USER  -p$MYSQL_PASSWORD inventory -e "select * from customers"'
+------+------------+-----------+-----------------------+
| id   | first_name | last_name | email                 |
+------+------------+-----------+-----------------------+
| 1001 | Sally      | Thomas    | sally.thomas@acme.com |
| 1002 | George     | Bailey    | gbailey@foobar.com    |
| 1003 | Edward     | Walker    | ed@walker.com         |
| 1004 | Anne       | Kretchmar | annek@noanswer.org    |
+------+------------+-----------+-----------------------+
docker-compose exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from customers"'
 last_name |  id  | first_name |         email
-----------+------+------------+-----------------------
 Thomas    | 1001 | Sally      | sally.thomas@acme.com
 Bailey    | 1002 | George     | gbailey@foobar.com
 Walker    | 1003 | Edward     | ed@walker.com
 Kretchmar | 1004 | Anne       | annek@noanswer.org
curl 'https://:9200/customers/_search?pretty'
{
  "took" : 42,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 4,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "customers",
        "_type" : "customer",
        "_id" : "1001",
        "_score" : 1.0,
        "_source" : {
          "id" : 1001,
          "first_name" : "Sally",
          "last_name" : "Thomas",
          "email" : "sally.thomas@acme.com"
        }
      },
      {
        "_index" : "customers",
        "_type" : "customer",
        "_id" : "1004",
        "_score" : 1.0,
        "_source" : {
          "id" : 1004,
          "first_name" : "Anne",
          "last_name" : "Kretchmar",
          "email" : "annek@noanswer.org"
        }
      },
      {
        "_index" : "customers",
        "_type" : "customer",
        "_id" : "1002",
        "_score" : 1.0,
        "_source" : {
          "id" : 1002,
          "first_name" : "George",
          "last_name" : "Bailey",
          "email" : "gbailey@foobar.com"
        }
      },
      {
        "_index" : "customers",
        "_type" : "customer",
        "_id" : "1003",
        "_score" : 1.0,
        "_source" : {
          "id" : 1003,
          "first_name" : "Edward",
          "last_name" : "Walker",
          "email" : "ed@walker.com"
        }
      }
    ]
  }
}

在连接器仍在运行时,我们可以向 MySQL 数据库添加新行,然后检查它是否已复制到 PostgreSQL 数据库和 Elasticsearch 中。

docker-compose exec mysql bash -c 'mysql -u $MYSQL_USER  -p$MYSQL_PASSWORD inventory'

mysql> insert into customers values(default, 'John', 'Doe', 'john.doe@example.com');
Query OK, 1 row affected (0.02 sec)
docker-compose exec -postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from customers"'
 last_name |  id  | first_name |         email
-----------+------+------------+-----------------------
...
Doe        | 1005 | John       | john.doe@example.com
(5 rows)
curl 'https://:9200/customers/_search?pretty'
...
{
  "_index" : "customers",
  "_type" : "customer",
  "_id" : "1005",
  "_score" : 1.0,
  "_source" : {
    "id" : 1005,
    "first_name" : "John",
    "last_name" : "Doe",
    "email" : "john.doe@example.com"
  }
}
...

总结

我们设置了一个复杂的数据流管道,用于将 MySQL 数据库与另一个数据库以及 Elasticsearch 实例同步。我们设法在所有系统中保持相同的标识符,这使得我们能够关联整个系统中的记录。

将数据变更从主数据库近乎实时地传播到 Elasticsearch 等搜索引擎,可以实现许多有趣的应用场景。除了全文搜索的不同应用外,例如,还可以考虑使用 Kibana 创建仪表板和各种可视化图表,以更深入地了解数据。

如果您想自己尝试这个设置,只需从我们的 示例存储库 克隆该项目。如果您需要帮助、有功能请求或想分享您对这个管道的体验,请在下面的评论中告诉我们。

Jiri Pechanec

Jiri 是 Red Hat 的一名软件开发人员(也是前质量工程师)。他的职业生涯大部分时间都投入在 Java 和系统集成项目和任务中。他居住在捷克共和国布尔诺附近。

   


关于 Debezium

Debezium 是一个开源的分布式平台,可以将现有数据库转变为事件流,使应用程序能够几乎即时地看到并响应数据库中已提交的每个行级更改。Debezium 构建在 Kafka 之上,并提供了 Kafka Connect 兼容的连接器,用于监控特定的数据库管理系统。Debezium 将数据更改的历史记录在 Kafka 日志中,这样您的应用程序可以随时停止和重新启动,并可以轻松地消费在未运行时错过的所有事件,确保所有事件都被正确且完整地处理。Debezium 在 Apache 许可证 2.0 下是 开源 的。

参与进来

我们希望您觉得 Debezium 有趣且有用,并希望尝试一下。在 Twitter @debezium 上关注我们,在 Zulip 上与我们聊天,或加入我们的 邮件列表 与社区交流。所有代码都在 GitHub 上开源,因此请在本地构建代码,帮助我们改进现有连接器并添加更多连接器。如果您发现问题或有改进 Debezium 的想法,请告诉我们或 记录一个问题

版权所有 © Debezium 及其作者。保留所有权利。有关我们的商标详情,请访问我们的 商标政策商标列表。第三方商标属于其各自所有者,在此提及并不表示任何认可或关联。
×