Kafka Streams 是一个用于开发基于 Apache Kafka 的流处理应用程序的库。引用其文档,“Kafka Streams 应用程序通过拓扑实时处理记录流,以逐条记录的方式持续、并发地处理数据”。Kafka Streams DSL 提供了一系列流处理操作,例如 map、filter、join 和 aggregate。
Kafka Streams 中的非键连接
Debezium 的 CDC 源连接器可以轻松地捕获数据库中的数据变更,并近乎实时地将其推送到 Elasticsearch 等接收系统。默认情况下,这会在源数据库的表、相应的 Kafka 主题以及接收端的数据表示(例如 Elasticsearch 中的搜索索引)之间产生一对一的关系。
在 1:n 关系的情况下,例如客户表和地址表之间,消费者通常对一种数据视图感兴趣,该视图是单个、嵌套的数据结构,例如表示客户及其所有地址的单个 Elasticsearch 文档。
这就是 KIP-213(“Kafka 改进提案”)及其外键连接能力的作用所在:它是在 Apache Kafka 2.4 中引入的,“以弥合 Streams 中的 KTables 与关系数据库中的表之间的语义差距”。在 KIP-213 之前,为了连接两个 Debezium 变更事件主题的消息,您通常需要手动重新键入至少一个主题,以确保连接的双方使用相同的键。
得益于 KIP-213,这不再需要了,因为它允许在从 Kafka 消息值中提取的字段上连接两个 Kafka 主题,以完全透明的方式自动处理所需的重新键入。与之前的方法相比,这大大减少了从 Debezium 的 CDC 事件创建聚合事件的工作量。
非键连接,或者更准确地说,外键连接,与 SQL 中的连接类似,例如以下示例:
SELECT * FROM CUSTOMER JOIN ADDRESS ON CUSTOMER.ID = ADDRESS.CUSTOMER_ID 在 Kafka Streams 的术语中,这种连接的输出是一个新的 KTable,其中包含连接结果。
数据库概述
继续我们之前关于客户和地址的示例,让我们考虑一个具有以下数据模型的应用程序:
这两个实体,客户和地址,从地址到客户共享一个外键关系,也就是说,一个客户可以有多个地址。如上所述,默认情况下 Debezium 会在不同的主题上为每个表发出事件。使用 Kafka Streams,这两个表的更改事件主题将被加载到两个 KTable 中,它们通过客户 ID 进行连接。Kafka Streams 应用程序将处理来自这两个 Kafka 主题的数据。每当其中任何一个主题上出现新的 CDC 事件(由记录的插入、更新或删除触发)时,连接就会重新执行。
作为 Kafka Streams 应用程序的运行时,我们将使用 Quarkus,这是一个用于构建云原生微服务的堆栈,它(除其他外)还提供了 Kafka Streams 的扩展。虽然可以通过一个普通的 main() 方法来运行 Kafka Streams 拓扑,但使用 Quarkus 和此扩展作为基础具有许多优势:
-
拓扑管理(例如,等待所有输入主题创建完成)
-
通过环境变量、系统属性等进行可配置性
-
公开运行状况检查
-
公开指标
-
开发模式,一种在代码更改后自动热重载流拓扑的工作方式
-
支持通过 GraalVM 将 Kafka Streams 管道执行为原生二进制文件,从而显著减少内存消耗和启动时间
这张图展示了我们解决方案的概览。
使用 Quarkus Kafka Streams 扩展创建应用程序
要创建带有 Kafka Streams 扩展的新 Quarkus 项目,请运行以下命令:
mvn io.quarkus:quarkus-maven-plugin:1.12.2.Final:create \
-DprojectGroupId=org.acme \
-DprojectArtifactId=customer-addresses-aggregator \
-Dextensions="kafka-streams"
cd customer-addresses-aggregator 理解流处理拓扑
我们有一个聚合器应用程序,它将读取两个 Kafka 主题并在流处理管道中进行处理。
-
两个主题通过客户 ID 进行连接。
-
每个客户都将与其地址进行丰富。
-
此聚合数据将被写入第三个主题
customersWithAddressesTopic。
当使用 Quarkus Kafka Streams 扩展时,我们所需要做的就是声明一个 CDI producer 方法,它返回我们流处理应用程序的拓扑。此方法必须用 @Produces 注释,并且必须返回一个 Topology 实例。Quarkus 扩展负责配置、启动和停止 Kafka Streams 引擎。现在让我们来看看实际的流查询实现本身。
@ApplicationScoped
public class TopologyProducer {
@ConfigProperty(name = "customers.topic") (1)
String customersTopic;
@ConfigProperty(name = "addresses.topic")
String addressesTopic;
@ConfigProperty(name = "customers.with.addresses.topic")
String customersWithAddressesTopic;
@Produces
public Topology buildTopology() {
StreamsBuilder builder = new StreamsBuilder(); (2)
Serde<Long> adressKeySerde = DebeziumSerdes.payloadJson(Long.class);
adressKeySerde.configure(Collections.emptyMap(), true);
Serde<Address> addressSerde = DebeziumSerdes.payloadJson(Address.class);
addressSerde.configure(Collections.singletonMap("from.field", "after"), false);
Serde<Integer> customersKeySerde = DebeziumSerdes.payloadJson(Integer.class);
customersKeySerde.configure(Collections.emptyMap(), true);
Serde<Customer> customersSerde = DebeziumSerdes.payloadJson(Customer.class);
customersSerde.configure(Collections.singletonMap("from.field", "after"), false);
JsonbSerde<AddressAndCustomer> addressAndCustomerSerde =
new JsonbSerde<>(AddressAndCustomer.class); (3)
JsonbSerde<CustomerWithAddresses> customerWithAddressesSerde =
new JsonbSerde<>(CustomerWithAddresses.class);
KTable<Long, Address> addresses = builder.table( (4)
addressesTopic,
Consumed.with(adressKeySerde, addressSerde)
);
KTable<Integer, Customer> customers = builder.table(
customersTopic,
Consumed.with(customersKeySerde, customersSerde)
);
KTable<Integer, CustomerWithAddresses> customersWithAddresses = addresses.join( (5)
customers,
address -> address.customer_id,
AddressAndCustomer::new,
Materialized.with(Serdes.Long(), addressAndCustomerSerde)
)
.groupBy( (6)
(addressId, addressAndCustomer) -> KeyValue.pair(
addressAndCustomer.customer.id, addressAndCustomer),
Grouped.with(Serdes.Integer(), addressAndCustomerSerde)
)
.aggregate( (7)
CustomerWithAddresses::new,
(customerId, addressAndCustomer, aggregate) -> aggregate.addAddress(
addressAndCustomer),
(customerId, addressAndCustomer, aggregate) -> aggregate.removeAddress(
addressAndCustomer),
Materialized.with(Serdes.Integer(), customerWithAddressesSerde)
);
customersWithAddresses.toStream() (8)
.to(
customersWithAddressesTopic,
Produced.with(Serdes.Integer(), customerWithAddressesSerde)
);
return builder.build();
}
} | 1 | 主题名称使用 MicroProfile Config API 进行注入,值在 Quarkus 的 application.properties 配置文件中提供(例如,它们可以通过环境变量覆盖)。 |
| 2 | 创建 StreamsBuilder 实例,它帮助我们构建拓扑。 |
| 3 | 为了将流处理管道中使用的 Java 类型序列化/反序列化为/从 JSON,Quarkus 提供了 class io.quarkus.kafka.client.serialization.JsonbSerde;基于的 Serde 实现基于 JSON-B。 |
| 4 | 使用 KTable-KTable 外键连接功能来提取 customer#id 并执行连接;StreamsBuilder#table() 用于分别将两个 Kafka 主题读入 KTable addresses 和 customers。 |
| 5 | 来自 addresses 主题的消息与相应的 customers 主题进行连接;连接结果包含客户及其一个地址的数据。 |
| 6 | groupBy() 操作将根据 customer#id 对记录进行分组。 |
| 7 | 为了生成一个客户及其所有地址的嵌套结构,对每个记录组(客户-地址元组)应用 aggregate() 操作,为每个客户更新一个 CustomerWithAddresses。 |
| 8 | 管道的结果被写入 customersWithAddressesTopic 主题。 |
CustomerWithAddresses 类在事件在流处理管道中处理时跟踪聚合值。
public class CustomerWithAddresses {
public Customer customer;
public List<Address> addresses = new ArrayList<>();
public CustomerWithAddresses addAddress(AddressAndCustomer addressAndCustomer) {
customer = addressAndCustomer.customer;
addresses.add(addressAndCustomer.address);
return this;
}
public CustomerWithAddresses removeAddress(AddressAndCustomer addressAndCustomer) {
Iterator<Address> it = addresses.iterator();
while (it.hasNext()) {
Address a = it.next();
if (a.id == addressAndCustomer.address.id) {
it.remove();
break;
}
}
return this;
}
} Kafka Streams 扩展通过 Quarkus 配置文件 application.properties 进行配置。除了主题名称之外,此文件还包含有关 Kafka 引导服务器和多个 streams 选项的信息。
customers.topic=dbserver1.inventory.customers
addresses.topic=dbserver1.inventory.addresses
customers.with.addresses.topic=customers-with-addresses
quarkus.kafka-streams.bootstrap-servers=localhost:9092
quarkus.kafka-streams.application-id=kstreams-fkjoin-aggregator
quarkus.kafka-streams.application-server=${hostname}:8080
quarkus.kafka-streams.topics=${customers.topic},${addresses.topic}
# streams options
kafka-streams.cache.max.bytes.buffering=10240
kafka-streams.commit.interval.ms=1000
kafka-streams.metadata.max.age.ms=500
kafka-streams.auto.offset.reset=earliest
kafka-streams.metrics.recording.level=DEBUG
kafka-streams.consumer.session.timeout.ms=150
kafka-streams.consumer.heartbeat.interval.ms=100 构建和运行应用程序
您现在可以像这样构建应用程序:
mvn clean package
为了运行应用程序及其所有相关组件(Kafka、Kafka Connect with Debezium、Postgres 数据库),我们创建了一个 Docker Compose 文件,您可以在 debezium-examples 仓库中找到它。要启动所有容器,还需要构建聚合器容器镜像,请运行以下命令:
export DEBEZIUM_VERSION=1.4 docker-compose up --build
要将 Debezium Connector 注册到 Kafka Connect,您需要指定配置属性,例如连接器名称、数据库主机名、用户、密码、端口、数据库名称等。创建一个名为 register-postgres.json 的文件,其中包含以下内容:
{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"database.server.name": "dbserver1",
"schema.include": "inventory",
"decimal.handling.mode" : "string",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
} 配置 Debezium Connector
http PUT https://:8083/connectors/inventory-connector/config < register-postgres.json
现在运行一个 debezium/tooling 容器镜像实例。
docker run --tty --rm \
--network kstreams-fk-join-network \
debezium/tooling:1.1 \ 此镜像提供了几个有用的工具,例如 kafkacat。在 tooling 容器内,运行 kafkacat 以检查流处理管道的结果。
kafkacat -b kafka:9092 -C -o beginning -q \
-t customers-with-addresses | jq . 您应该会看到类似以下内容的记录,每条记录都包含一个客户及其所有地址的数据。
{
"addresses": [
{
"city": "Hamburg",
"country": "Canada",
"customer_id": 1001,
"id": 100001,
"street": "42 Main Street",
"zipcode": "90210"
},
{
"city": "Berlin",
"country": "Canada",
"customer_id": 1001,
"id": 100002,
"street": "11 Post Dr.",
"zipcode": "90211"
}
],
"customer": {
"email": "sally.thomas@acme.com",
"first_name": "Sally",
"id": 1001,
"last_name": "Thomas"
}
} 获取数据库的 shell,插入、更新或删除一些记录,连接将自动重新处理。
$ docker run --tty --rm -i \
--network kstreams-fk-join-network \
debezium/tooling:1.1 \
bash -c 'pgcli postgresql://postgres:postgres@postgres:5432/postgres'
# in pgcli, e.g. to update a customer record:
> update inventory.customers set first_name = 'Sarah' where id = 1001; 原生运行
Kafka Streams 应用程序可以轻松扩展,即负载将在应用程序的多个实例之间共享,每个实例处理输入主题的分区子集。当 Quarkus 应用程序通过 GraalVM 编译为原生代码时,它占用的内存会大大减少,并且启动时间非常快。在无需担心内存管理的情况下,您可以并行启动 Kafka Streams 管道的多个实例。
如果您想以 native 模式运行此应用程序,请将 QUARKUS_MODE 设置为 native 并运行以下命令(请确保已安装所需的 GraalVM 工具):
mvn clean package -Pnative
要了解有关将 Kafka Streams 应用程序作为原生二进制文件运行的更多信息,请参阅参考指南。
关于 Kafka Streams 扩展的更多深入信息
Quarkus 扩展还可以帮助您满足构建流处理微服务时的一些常见需求。例如,要在线上运行 Kafka Streams 应用程序,您可以轻松地为数据管道添加运行状况检查和指标。
Micrometer Metrics 提供有关您的 Quarkus 应用程序的丰富指标,即您的应用程序内部发生了什么,以及它的性能特征。Quarkus 允许您通过 HTTP 以 JSON 格式或 OpenMetrics 格式公开这些指标。从那里,它们可以被 Prometheus 等工具抓取并存储以供分析和可视化。
应用程序启动后,指标将在 q/metrics 下公开,默认以 OpenMetrics 格式返回数据。
# HELP kafka_producer_node_request_total The total number of requests sent
# TYPE kafka_producer_node_request_total counter
kafka_producer_node_request_total{client_id="kstreams-fkjoin-aggregator-b4ac1384-0e0a-4f19-8d52-8cc1ee4c6dfe-StreamThread-1-producer",kafka_version="2.5.0",node_id="node--1",status="up",} 83.0
# HELP kafka_producer_record_send_rate The average number of records sent per second.
# TYPE kafka_producer_record_send_rate gauge
kafka_producer_record_send_rate{client_id="kstreams-fkjoin-aggregator-b4ac1384-0e0a-4f19-8d52-8cc1ee4c6dfe-StreamThread-1-producer",kafka_version="2.5.0",status="up",} 0.0
# HELP jvm_gc_memory_allocated_bytes_total Incremented for an increase in the size of the (young) heap memory pool after one GC to before the next
# TYPE jvm_gc_memory_allocated_bytes_total counter
jvm_gc_memory_allocated_bytes_total 1.1534336E8
# ...
# HELP http_requests_total
# TYPE http_requests_total counter
http_requests_total{status="up",uri="/api/customers",} 0.0
# ... 如果您不使用 Prometheus,您还有其他一些选项,如 Datadog、Stackdriver 等。有关详细指南,请查看 Quarkiverse Extensions。
另一方面,我们有 MicroProfile Health 规范,它提供了有关应用程序的活性信息,即指示您的应用程序是否正在运行以及它是否能够处理请求。要监控现有 Quarkus 应用程序的运行状况,您可以添加 smallrye-health 扩展。
mvn quarkus:add-extension -Dextensions="smallrye-health"
Quarkus 将通过 HTTP 在 q/health 下公开所有运行状况检查,在我们的例子中,它显示了管道的状态以及任何丢失的主题。
{
"status": "DOWN",
"checks": [
{
"name": "Kafka Streams topics health check",
"status": "DOWN",
"data": {
"missing_topics": "dbserver1.inventory.customers,dbserver1.inventory.addresses"
}
}
]
} 总结
Quarkus Kafka Streams 扩展提供了在 JVM 和原生模式下运行流处理管道所需的一切,以及执行运行状况检查、指标等附加功能。例如,您可以轻松地使用 Quarkus REST 支持公开用于交互式查询的 REST API,可能使用 MicroProfile REST client API 从扩展的 Kafka Streams 应用程序的其他实例检索数据。
在本文中,我们讨论了 Kafka Streams 中外键连接的流处理拓扑,以及如何使用 Quarkus Kafka Streams 扩展来运行和构建 JVM 模式下的应用程序。您可以在 Debezium 示例仓库中找到该实现的完整源代码。如果您有任何问题或反馈,请在下方评论中告诉我们。我们期待您的建议!
关于 Debezium
Debezium 是一个开源的分布式平台,可以将现有数据库转变为事件流,使应用程序能够几乎即时地看到并响应数据库中已提交的每个行级更改。Debezium 构建在 Kafka 之上,并提供了 Kafka Connect 兼容的连接器,用于监控特定的数据库管理系统。Debezium 将数据更改的历史记录在 Kafka 日志中,这样您的应用程序可以随时停止和重新启动,并可以轻松地消费在未运行时错过的所有事件,确保所有事件都被正确且完整地处理。Debezium 在 Apache 许可证 2.0 下是 开源 的。