Debezium 的一个典型用例是使用变更数据捕获将一个遗留系统与组织中的其他系统集成。有多种方法可以实现此目标
-
使用 Debezium 将数据写入 Kafka,然后通过 Kafka Streams 流水线和 Kafka Connect 连接器的组合将变更传递到其他系统
-
在 Java 独立应用程序中使用Debezium Embedded engine,并使用纯 Java 编写集成代码;这通常用于将变更事件发送到其他消息基础设施,例如 Amazon Kinesis、Google Pub/Sub 等。
-
使用现有的集成框架或服务总线来表达流水线逻辑
本文重点介绍第三种选择——专用的集成框架。
Apache Camel
这种组合允许开发人员轻松连接到目标系统,并使用声明式 DSL 来表达集成管道。
Camel 和 Debezium
Camel 3 已于 2019 年底发布,除了主要的架构重构外,还增加了新的 Debezium 组件。它还使 Camel 能够用作 Kafka Connect 运行时中的 连接器。
本文仅关注 Debezium 组件的使用,后者选项将在未来的文章中介绍。
如您所见,每个*非孵化*的 Debezium 连接器都由其专用的组件表示。此解决方案的优势在于依赖关系的完全隔离以及连接器实例的类型安全配置。
内部,该组件公开了一个 Debezium 端点,其中包含一个事件驱动的 Camel 消费者,该消费者封装了 Debezium 嵌入式引擎 的实例。
一个例子
作为示例,我们构建了一个简单的问答 (Q&A) 应用程序,该应用程序 loosely 受到 StackOverflow 等的启发。REST API 允许发布新问题以及现有问题的答案,这些问题存储在数据库中。
应用程序生成的任何数据更改(例如,如果创建了新问题或答案)都通过 Debezium 捕获并传递到 Camel 管道,该管道通过 SMTP 服务器发送电子邮件,并在提供的 Twitter 帐户上发布相应的推文。
您可以在 GitHub 上找到该示例的完整源代码。
拓扑
解决方案拓扑中有多个组件
图 1. 部署拓扑
-
问答应用程序是使用 Quarkus 堆栈实现的,并公开了一个 REST API 来创建问题和答案。
-
该应用程序将其数据存储在 PostgreSQL 数据库中。
-
Camel 路由作为纯 Java 应用程序运行,使用嵌入式 Infinispan 存储来持久化其状态(用于构建链接问题及其答案的聚合对象),并通过电子邮件将有关已回答问题的消息发送到关联的 Twitter 帐户。
-
一个在容器中运行的 MailHog SMTP 服务器,用于发送电子邮件。
问答应用程序
源应用程序是基于 Quarkus 的简单 REST 服务。它管理两个实体 `Question` 和 `Answer`,它们之间存在存储在 PostgreSQL 数据库中的 `1:n` 关系。
图 2. 问答后端服务实体关系图
这些实体是通过 REST API 创建的,并且它们之间的关联会自动建立。
Camel 管道
Camel 管道是对以下业务规则的表达:
-
每当创建或更新一个问题时,就向问题的创建者发送一封电子邮件。
-
每当创建或更新一个答案时,就向问题和答案的创建者都发送一封电子邮件。
-
当一个问题有三个答案时,就在指定的 Twitter 帐户上发布一条关于它的推文。
业务需求被转化为这个 EIP 图表中描述的管道。
图 4. Camel 管道
代码详解
要使用 Debezium Camel 组件,我们需要在 `pom.xml` 文件中至少添加以下依赖项。
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-bom</artifactId>
<version>${version.camel}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!-- Use required Debezium version -->
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-postgres</artifactId>
<version>${version.debezium}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<version>${version.debezium}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<version>${version.debezium}</version>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-debezium-postgres</artifactId>
</dependency>
</dependencies> 管道逻辑本身定义在 QaDatabaseUserNotifier 类中。它的主路由如下:
public class QaDatabaseUserNotifier extends RouteBuilder {
@Override
public void configure() throws Exception {
from("debezium-postgres:localhost?"
+ "databaseHostname={{database.hostname}}"
+ "&databasePort={{database.port}}"
+ "&databaseUser={{database.user}}"
+ "&databasePassword={{database.password}}"
+ "&databaseDbname=postgres"
+ "&databaseServerName=qa"
+ "&schemaWhitelist={{database.schema}}"
+ "&tableWhitelist={{database.schema}}.question,{{database.schema}}.answer"
+ "&offsetStorage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore")
.routeId(QaDatabaseUserNotifier.class.getName() + ".DatabaseReader") (1)
.log(LoggingLevel.DEBUG, "Incoming message ${body} with headers ${headers}")
.choice() (2)
.when(isQuestionEvent)
.filter(isCreateOrUpdateEvent) (3)
.convertBodyTo(Question.class) (4)
.log(LoggingLevel.TRACE, "Converted to logical class ${body}")
.bean(store, "readFromStoreAndUpdateIfNeeded") (5)
.to(ROUTE_MAIL_QUESTION_CREATE) (6)
.endChoice()
.when(isAnswerEvent)
.filter(isCreateOrUpdateEvent)
.convertBodyTo(Answer.class)
.log(LoggingLevel.TRACE, "Converted to logical class ${body}")
.bean(store, "readFromStoreAndAddAnswer")
.to(ROUTE_MAIL_ANSWER_CHANGE)
.filter(hasManyAnswers) (7)
.setBody().simple("Question '${exchangeProperty[aggregate].text}' has " +
"many answers (generated at " + Instant.now() + ")")
.to(TWITTER_SERVER)
.end()
.endChoice()
.otherwise()
.log(LoggingLevel.WARN, "Unknown type ${headers[" +
DebeziumConstants.HEADER_IDENTIFIER + "]}")
.endParent();
from(ROUTE_MAIL_QUESTION_CREATE) (6)
.routeId(QaDatabaseUserNotifier.class.getName() + ".QuestionNotifier")
.setHeader("To").simple("${body.email}")
.setHeader("Subject").simple("Question created/edited")
.setBody().simple("Question '${body.text}' was created or edited")
.to(SMTP_SERVER);
}
@Converter
public static class Converters {
@Converter
public static Question questionFromStruct(Struct struct) { (4)
return new Question(struct.getInt64("id"), struct.getString("text"),
struct.getString("email"));
}
@Converter
public static Answer answerFromStruct(Struct struct) { (4)
return new Answer(struct.getInt64("id"), struct.getString("text"),
struct.getString("email"), struct.getInt64("question_id"));
}
}
} | 1 | `from` 是 Debezium 源端点。URI 部分直接映射到连接器配置选项。 |
| 2 | 管道逻辑根据更改事件类型进行拆分。识别基于 `CamelDebeziumIdentifier` 头,其中包含源表的标识符(` |
| 3 | 管道现在只能处理更新和删除。识别基于 `CamelDebeziumOperation` 头,该头包含消息 `Envelope` 的 `op` 字段。 |
| 4 | Kafka Connect 的 `Struct` 类型被转换为管道中使用的逻辑类型。转换是由自定义 Camel 转换器执行的。可以使用开箱即用的 `DebeziumTypeConverter` 将 `Struct` 转换为 `Map`,但这会将管道逻辑与表结构紧密耦合。 |
| 5 | 调用一个补充路由,该路由与基于 Infinispan 缓存的消息存储进行通信,以构建消息聚合。消息存储会检查它是否已存储问题。如果没有,则创建一个新的聚合并存储,否则使用新数据更新已存储的聚合。 |
| 6 | 调用一个补充路由,该路由格式化邮件消息并通过 SMTP 端点将其传递给问题的创建者。 |
| 7 | 与答案消息类型相关的路由部分非常相似(答案被添加到问题聚合中)。主要区别在于当聚合包含三个答案时发布 Twitter 消息。 |
顺便说一句,为了简单起见,该示例目前使用易失性内存来存储 Debezium 偏移量。对于持久化存储,您可以选择使用基于文件的偏移量存储,或者基于 Infinispan 创建自定义偏移量存储实现,将偏移量的存储委托给底层缓存。
演示
要运行演示,您需要一个拥有相应 API 密钥和密码的 Twitter 开发者帐户。
转到应用程序目录并构建所有组件。
$ mvn clean install 启动服务(提供您自己的 Twitter API 凭据)。
$ env TWITTER_CONSUMER_KEY=<...> TWITTER_CONSUMER_SECRET=<...> TWITTER_ACCESS_TOKEN=<...> TWITTER_ACCESS_TOKEN_SECRET=<...> docker-compose up 在另一个终端中创建一个问题并向其添加三个答案。
$ curl -v -X POST -H 'Content-Type: application/json' http://0.0.0.0:8080/question/ -d @src/test/resources/messages/create-question.json
$ curl -v -X POST -H 'Content-Type: application/json' http://0.0.0.0:8080/question/1/answer -d @src/test/resources/messages/create-answer1.json
$ curl -v -X POST -H 'Content-Type: application/json' http://0.0.0.0:8080/question/1/answer -d @src/test/resources/messages/create-answer2.json
$ curl -v -X POST -H 'Content-Type: application/json' http://0.0.0.0:8080/question/1/answer -d @src/test/resources/messages/create-answer3.json Twitter 帐户应包含一条新推文,文本类似“问题‘狗有几条腿?’有很多答案(生成于 2020-02-17T08:02:33.744Z)”。此外,MailHog 服务器 UI 应显示类似以下的消息。
图 4. MailHog 消息
结论
Apache Camel 是实现系统集成场景的一个非常有趣的选择。
无需任何外部消息传递基础设施,即可轻松部署带有 Debezium 组件的独立 Camel 路由,从而能够捕获数据更改并对其执行复杂的路由和转换操作。Camel 为开发人员提供了完整的企业集成模式实现工具集,以及一百多个用于不同系统的连接器,这些连接器可以包含在复杂服务编排中。
完整示例的源代码可在 GitHub 上找到。
关于 Debezium
Debezium 是一个开源的分布式平台,可以将现有数据库转变为事件流,使应用程序能够几乎即时地看到并响应数据库中已提交的每个行级更改。Debezium 构建在 Kafka 之上,并提供了 Kafka Connect 兼容的连接器,用于监控特定的数据库管理系统。Debezium 将数据更改的历史记录在 Kafka 日志中,这样您的应用程序可以随时停止和重新启动,并可以轻松地消费在未运行时错过的所有事件,确保所有事件都被正确且完整地处理。Debezium 在 Apache 许可证 2.0 下是 开源 的。