Debezium 服务器

如果您在使用此功能时遇到任何问题,请告知我们。如果您有特定接收器需要 Debezium Server 支持,或者甚至有兴趣贡献所需的实现,也请联系我们。

Debezium 提供了一个即用型应用程序,可将数据库的变更事件流式传输到消息传递基础架构,如 Amazon Kinesis、Google Cloud Pub/Sub、Apache Pulsar、Redis (Stream) 或 NATS JetStream。要将变更事件流式传输到 Apache Kafka,建议通过 Kafka Connect 部署 Debezium 连接器。

安装

要安装服务器,请下载并解压服务器发行版存档

将创建一个名为 debezium-server 的目录,其中包含这些内容

debezium-server/
|-- CHANGELOG.md
|-- config
|-- CONTRIBUTE.md
|-- COPYRIGHT.txt
|-- debezium-server-3.3.1.Final-runner.jar
|-- lib
|-- LICENSE-3rd-PARTIES.txt
|-- LICENSE.txt
|-- README.md
`-- run.sh

服务器使用 run.sh 脚本启动,依赖项存储在 lib 目录中,config 目录包含配置文件。

如果您使用 Oracle 连接器,您必须将 ORACLE JDBC 驱动程序添加到 lib 目录(如果使用 XStream,也包括 XStream API 文件),如下所述:获取 Oracle JDBC 驱动程序和 XStream API 文件

配置

Debezium Server 使用 MicroProfile 配置 进行配置。这意味着应用程序可以从不同的源进行配置,如配置文件、环境变量、系统属性等。

主配置文件是 config/application.properties。有多个配置部分

  • debezium.source 用于源连接器配置;每个 Debezium Server 实例只运行一个连接器

  • debezium.sink 用于接收器系统配置

  • debezium.format 用于输出序列化格式配置

  • debezium.transforms 用于消息转换的配置

  • debezium.predicates 用于消息转换谓词的配置

配置文件示例可能如下所示

debezium.sink.type=kinesis
debezium.sink.kinesis.region=eu-central-1
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=localhost
debezium.source.database.port=5432
debezium.source.database.user=postgres
debezium.source.database.password=postgres
debezium.source.database.dbname=postgres
debezium.source.topic.prefix=tutorial
debezium.source.schema.include.list=inventory

在此配置文件示例中

  • 接收器配置为 AWS Kinesis,区域为 eu-central-1

  • 源连接器配置为 PostgreSQL,使用默认的 Debezium decoderbufs 插件。如果使用 PostgreSQL 的内置 pgoutput 插件,请将 debezium.source.plugin.name=pgoutput 设置为 pgoutput

  • 源连接器设置为捕获名为 inventory 的模式中的事件。如果要捕获数据库中的所有更改,请删除此行。否则,请将此行更新为与您偏好的模式或表相对应。

  • 源偏移量将存储在 data 目录中名为 offsets.dat 的文件中。请注意,您可能需要创建此目录以防止启动时出现错误。

服务器启动时,会生成一系列日志消息,如下所示

__  ____  __  _____   ___  __ ____  ______
 --/ __ \/ / / / _ | / _ \/ //_/ / / / __/
 -/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/
2020-05-15 11:33:12,189 INFO  [io.deb.ser.kin.KinesisChangeConsumer] (main) Using 'io.debezium.server.kinesis.KinesisChangeConsumer$$Lambda$119/0x0000000840130c40@f58853c' stream name mapper
2020-05-15 11:33:12,628 INFO  [io.deb.ser.kin.KinesisChangeConsumer] (main) Using default KinesisClient 'software.amazon.awssdk.services.kinesis.DefaultKinesisClient@d1f74b8'
2020-05-15 11:33:12,628 INFO  [io.deb.ser.DebeziumServer] (main) Consumer 'io.debezium.server.kinesis.KinesisChangeConsumer' instantiated
2020-05-15 11:33:12,754 INFO  [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
	converter.type = key
	decimal.format = BASE64
	schemas.cache.size = 1000
	schemas.enable = true

2020-05-15 11:33:12,757 INFO  [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
	converter.type = value
	decimal.format = BASE64
	schemas.cache.size = 1000
	schemas.enable = false

2020-05-15 11:33:12,763 INFO  [io.deb.emb.EmbeddedEngine$EmbeddedConfig] (main) EmbeddedConfig values:
	access.control.allow.methods =
	access.control.allow.origin =
	admin.listeners = null
	bootstrap.servers = [localhost:9092]
	client.dns.lookup = default
	config.providers = []
	connector.client.config.override.policy = None
	header.converter = class org.apache.kafka.connect.storage.SimpleHeaderConverter
	internal.key.converter = class org.apache.kafka.connect.json.JsonConverter
	internal.value.converter = class org.apache.kafka.connect.json.JsonConverter
	key.converter = class org.apache.kafka.connect.json.JsonConverter
	listeners = null
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	offset.flush.interval.ms = 0
	offset.flush.timeout.ms = 5000
	offset.storage.file.filename = data/offsets.dat
	offset.storage.partitions = null
	offset.storage.replication.factor = null
	offset.storage.topic =
	plugin.path = null
	rest.advertised.host.name = null
	rest.advertised.listener = null
	rest.advertised.port = null
	rest.extension.classes = []
	rest.host.name = null
	rest.port = 8083
	ssl.client.auth = none
	task.shutdown.graceful.timeout.ms = 5000
	topic.tracking.allow.reset = true
	topic.tracking.enable = true
	value.converter = class org.apache.kafka.connect.json.JsonConverter

2020-05-15 11:33:12,763 INFO  [org.apa.kaf.con.run.WorkerConfig] (main) Worker configuration property 'internal.key.converter' is deprecated and may be removed in an upcoming release. The specified value 'org.apache.kafka.connect.json.JsonConverter' matches the default, so this property can be safely removed from the worker configuration.
2020-05-15 11:33:12,763 INFO  [org.apa.kaf.con.run.WorkerConfig] (main) Worker configuration property 'internal.value.converter' is deprecated and may be removed in an upcoming release. The specified value 'org.apache.kafka.connect.json.JsonConverter' matches the default, so this property can be safely removed from the worker configuration.
2020-05-15 11:33:12,765 INFO  [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
	converter.type = key
	decimal.format = BASE64
	schemas.cache.size = 1000
	schemas.enable = true

2020-05-15 11:33:12,765 INFO  [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
	converter.type = value
	decimal.format = BASE64
	schemas.cache.size = 1000
	schemas.enable = true

2020-05-15 11:33:12,767 INFO  [io.deb.ser.DebeziumServer] (main) Engine executor started
2020-05-15 11:33:12,773 INFO  [org.apa.kaf.con.sto.FileOffsetBackingStore] (pool-3-thread-1) Starting FileOffsetBackingStore with file data/offsets.dat
2020-05-15 11:33:12,835 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) Starting PostgresConnectorTask with configuration:
2020-05-15 11:33:12,837 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1)    connector.class = io.debezium.connector.postgresql.PostgresConnector
2020-05-15 11:33:12,837 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1)    offset.flush.interval.ms = 0
2020-05-15 11:33:12,838 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1)    database.user = postgres
2020-05-15 11:33:12,838 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1)    database.dbname = postgres
2020-05-15 11:33:12,838 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1)    offset.storage.file.filename = data/offsets.dat
2020-05-15 11:33:12,838 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1)    database.hostname = localhost
2020-05-15 11:33:12,838 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1)    database.password = ********
2020-05-15 11:33:12,839 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1)    name = kinesis
2020-05-15 11:33:12,839 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1)    topic.prefix = tutorial
2020-05-15 11:33:12,839 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1)    database.port = 5432
2020-05-15 11:33:12,839 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1)    schema.include.list = inventory
2020-05-15 11:33:12,908 INFO  [io.quarkus] (main) debezium-server 1.2.0-SNAPSHOT (powered by Quarkus 1.4.1.Final) started in 1.198s. Listening on: http://0.0.0.0:8080
2020-05-15 11:33:12,911 INFO  [io.quarkus] (main) Profile prod activated.
2020-05-15 11:33:12,911 INFO  [io.quarkus] (main) Installed features: [cdi, smallrye-health]

源配置

源配置使用与特定连接器文档页面上描述的相同的配置属性(仅带 debezium.source 前缀),以及一些必要的其他特定属性,用于在 Kafka Connect 外部运行。

属性 Default (默认值) 描述

实现源连接器的 Java 类名称。

org.apache.kafka.connect.storage.FileOffsetBackingStore

用于存储和检索非 Kafka 部署偏移量的类。可用选项

  • org.apache.kafka.connect.storage.FileOffsetBackingStore 用于非 Kafka 部署

  • org.apache.kafka.connect.storage.MemoryOffsetBackingStore 用于测试环境的易失性存储

  • io.debezium.storage.jdbc.offset.JdbcOffsetBackingStore 用于使用 JDBC 的数据库

  • io.debezium.storage.redis.offset.RedisOffsetBackingStore 用于 Redis 部署

如果使用文件偏移量存储(默认),则为非 Kafka 部署存储连接器偏移量的文件。

定义将偏移量刷新到文件的频率。

(可选) 如果使用 Redis 存储偏移量,则为提供 Redis 目标流的地址,格式为 host:port。如果未提供,将尝试读取 debezium.sink.redis.address

(可选) 如果使用 Redis 存储偏移量,则为用于与 Redis 通信的用户名。如果未提供 redis.address 配置,并且 redis.address 从 Redis 接收器获取,则将尝试从 debezium.sink.redis.user 加载该值。

(可选) 如果使用 Redis 存储偏移量,则为用于与 Redis 通信的密码(相应用户的)。如果设置了用户,则必须设置密码。如果未提供 redis.address 配置,并且 redis.address 从 Redis 接收器获取,则将尝试从 debezium.sink.redis.password 加载该值。

(可选) 如果使用 Redis 存储偏移量,则指定是否使用 SSL 与 Redis 通信。如果未提供 redis.address 配置,并且 redis.address 从 Redis 接收器获取,则将尝试从 debezium.sink.redis.ssl.enabled 加载该值。默认为 'false'。

(可选) 如果使用 Redis 存储偏移量,则指定是否启用与 Redis 的主机名验证。如果未提供 redis.address 配置,并且 redis.address 从 Redis 接收器获取,则将尝试从 debezium.sink.redis.ssl.hostname.verification.enabled 加载该值。默认为 'false'。

(可选) 如果使用启用了 SSL 的 Redis 存储偏移量,则为信任存储文件路径。如果设置,Redis 连接将使用此属性而不是其他配置或系统属性。

(可选) 如果使用启用了 SSL 的 Redis 存储偏移量,则为信任存储文件的密码。如果设置,Redis 连接将使用此属性而不是其他配置或系统属性。

JKS

(可选) 如果使用启用了 SSL 的 Redis 存储偏移量,则为信任存储文件的类型。如果设置,Redis 连接将使用此属性而不是其他配置或系统属性。

(可选) 如果使用启用了 SSL 的 Redis 存储偏移量,则为密钥存储文件路径。如果设置,Redis 连接将使用此属性而不是其他配置或系统属性。

(可选) 如果使用启用了 SSL 的 Redis 存储偏移量,则为密钥存储文件的密码。如果设置,Redis 连接将使用此属性而不是其他配置或系统属性。

JKS

(可选) 如果使用启用了 SSL 的 Redis 存储偏移量,则为密钥存储文件的类型。如果设置,Redis 连接将使用此属性而不是其他配置或系统属性。

(可选) 如果使用 Redis 存储偏移量,则定义 redis 中的哈希键。如果未提供 redis.key 配置,则默认值为 metadata:debezium:offsets

false

如果使用 Redis 存储偏移量,则启用等待副本。如果 Redis 配置了副本分片,这允许验证数据是否已写入副本。有关更多信息,请参阅 Redis WAIT 命令。

1000

如果使用 Redis 存储偏移量,则定义等待副本的超时时间(以毫秒为单位)。必须为正值。

false

如果使用 Redis 存储偏移量,则启用等待副本失败时的重试。

1000

如果使用 Redis 存储偏移量,则定义等待副本失败时重试的延迟。

io.debezium.storage.kafka.history.KafkaSchemaHistory

一些连接器(例如 MySQL、SQL Server、Db2、Oracle)会跟踪数据库模式的演变,并将此数据存储在数据库模式历史记录中。默认情况下,这是基于 Kafka 的。还有其他选项可用

  • io.debezium.storage.file.history.FileSchemaHistory 用于非 Kafka 部署

  • io.debezium.relational.history.MemorySchemaHistory 用于测试环境的易失性存储

  • io.debezium.storage.redis.history.RedisSchemaHistory 用于 Redis 部署

  • io.debezium.storage.rocketmq.history.RocketMqSchemaHistory 用于 RocketMQ 部署

  • io.debezium.storage.azure.blob.history.AzureBlobSchemaHistory 用于 Azure Blob Storage 部署

FileSchemaHistory 持久化其数据的文件的名称和位置。

如果使用 RedisSchemaHistory,则连接到 Redis 的主机:端口。

如果使用 RedisSchemaHistory,则使用的 Redis 用户。

如果使用 RedisSchemaHistory,则使用的 Redis 密码。

如果使用 RedisSchemaHistory,则使用 SSL 连接。

如果使用 RedisSchemaHistory,则启用主机名验证。

(可选) 如果使用启用了 SSL 的 Redis 存储模式历史记录,则为信任存储文件路径。如果设置,Redis 连接将使用此属性而不是其他配置或系统属性。

(可选) 如果使用启用了 SSL 的 Redis 存储模式历史记录,则为信任存储文件的密码。如果设置,Redis 连接将使用此属性而不是其他配置或系统属性。

JKS

(可选) 如果使用启用了 SSL 的 Redis 存储模式历史记录,则为信任存储文件的类型。如果设置,Redis 连接将使用此属性而不是其他配置或系统属性。

(可选) 如果使用启用了 SSL 的 Redis 存储模式历史记录,则为密钥存储文件路径。如果设置,Redis 连接将使用此属性而不是其他配置或系统属性。

(可选) 如果使用启用了 SSL 的 Redis 存储模式历史记录,则为密钥存储文件的密码。如果设置,Redis 连接将使用此属性而不是其他配置或系统属性。

JKS

(可选) 如果使用启用了 SSL 的 Redis 存储模式历史记录,则为密钥存储文件的类型。如果设置,Redis 连接将使用此属性而不是其他配置或系统属性。

如果使用 RedisSchemaHistory,则用于存储的 Redis 键。默认:metadata:debezium:schema_history

如果使用 RedisSchemaHistory,在尝试重连 Redis 时的初始延迟(毫秒)。默认:300(毫秒)

如果使用 RedisSchemaHistory,在尝试重连 Redis 时的最大延迟(毫秒)。默认:10000(毫秒)

连接 Redis 的最大尝试次数。默认:10

如果使用 RedisSchemaHistory,Redis 客户端的连接超时(毫秒)。默认:2000(毫秒)

如果使用 RedisSchemaHistory,Redis 客户端的套接字超时(毫秒)。默认:2000(毫秒)

false

如果使用 Redis 存储模式历史记录,则启用等待副本。如果 Redis 配置了副本分片,这允许验证数据是否已写入副本。有关更多信息,请参阅 Redis WAIT 命令。

1000

如果使用 Redis 存储模式历史记录,则定义等待副本的超时时间(以毫秒为单位)。必须为正值。

false

如果使用 Redis 存储模式历史记录,则启用等待副本失败时的重试。

1000

如果使用 Redis 存储模式历史记录,则定义等待副本失败时重试的延迟。

数据库模式历史记录的 RocketMQ 主题名称。

localhost:9876

RocketMQ 服务发现 NameServer 地址配置。

false

RocketMQ 访问控制启用配置,默认为 'false'。

RocketMQ 访问密钥。如果 debezium.source.schema.history.internal.rocketmq.acl.enabled 为 true,则该值不能为空。

RocketMQ 密钥。如果 debezium.source.schema.history.internal.rocketmq.acl.enabled 为 true,则该值不能为空。

60

恢复数据库模式历史记录的最大尝试次数。

1000

恢复期间轮询持久化数据时等待的毫秒数。

60000

发送消息到 RocketMQ 的超时时间。

Azure Blob Storage 账户连接字符串。

Azure Blob Storage 账户名称。仅当 debezium.source.schema.history.internal.azure.storage.account.connectionstring 为空时才应设置此项,否则将使用 Azure Active Directory 身份验证。

Azure Blob Storage 账户容器名称。

持久化模式历史记录数据的 Azure Blob Storage Blob 名称。

格式配置

消息输出格式可以分别配置键和值。默认情况下,输出是 JSON 格式,但可以使用 Kafka Connect 的任意 Converter 实现。

属性 Default (默认值) 描述

json

键的输出格式名称,可以是 json/jsonbytearray/avro/protobuf/simplestring/binary 之一。

传递给键转换器的配置属性。

json

值的输出格式名称,可以是 json/jsonbytearray/avro/protobuf/cloudevents/simplestring/binary 之一。

传递给值转换器的配置属性。

json

值的输出格式名称,可以是 json/jsonbytearray 之一。

传递给头部转换器的配置属性。

转换配置

在消息传递到接收器之前,它们可以经过一系列转换。服务器支持 Kafka Connect 定义的单消息转换。配置将需要包含转换列表、每个转换的实现类以及每个转换的配置选项。

属性 Default (默认值) Description [id="debezium-transforms"]

debezium.transforms

逗号分隔的转换符号名称列表。

debezium.transforms.<name>.type

实现名称为 <name> 的转换的 Java 类名称。

debezium.transforms.<name>.*

传递给名称为 <name> 的转换的配置属性。

debezium.transforms.<name>.predicate

要应用于名称为 <name> 的转换的谓词的名称。

debezium.transforms.<name>.negate

false

确定是否将名称为 <name> 的转换的谓词结果取反。

谓词配置

可以将谓词与转换关联起来,以使转换成为可选的。服务器支持 Kafka Connect 定义的过滤器和条件 SMT。配置将需要包含谓词列表、每个谓词的实现类以及每个谓词的配置选项。

属性 Default (默认值) Description [id="debezium-predicates"]

debezium.predicates

逗号分隔的谓词符号名称列表。

debezium.predicates.<name>.type

实现名称为 <name> 的谓词的 Java 类名称。

debezium.predicates.<name>.*

传递给名称为 <name> 的谓词的配置属性。

异步引擎属性

默认情况下,Debezium 服务器使用异步嵌入式引擎 (AsyncEmbeddedEngine) 作为其处理引擎。您可以为异步嵌入式引擎配置以下选项

属性

Default (默认值)

描述

按需分配线程,基于工作负载和可用 CPU 核心数。

可用于处理更改事件记录的线程数。如果未指定值(默认值),则引擎使用 Java 的 ThreadPoolExecutor 根据当前工作负载动态调整线程数。最大线程数是给定机器上的 CPU 核心数。如果指定了值,则引擎使用 Java 的 固定线程池 方法创建一个具有指定线程数的线程池。要使用给定机器上的所有可用核心,请将占位符值设置为 AVAILABLE_CORES

1000

调用任务关闭后,引擎允许处理待定记录的最大时间(以毫秒为单位)。

180,000 (3 分钟)

引擎等待任务的生命周期管理操作(启动和停止)完成的毫秒数。

其他配置

Debezium Server 构建在 Quarkus 框架之上。Quarkus 提供的所有配置选项在 Debezium Server 中也可用。最常用的有

属性 Default (默认值) Description [id="debezium-quarkus-http-port"]

quarkus.http.port

8080

Debezium 暴露 Microprofile Health 端点和其他已公开状态信息的端口。健康检查可以通过 http://host:8080/q/health 访问。

quarkus.log.level

INFO

每个日志类别的默认日志级别。

quarkus.log.console.json

true

确定是否启用 JSON 控制台格式化扩展,该扩展会禁用“正常”的控制台格式化。

可以通过在 config/application.properties 文件中设置 quarkus.log.console.json=false 来禁用 JSON 日志记录,如 config/application.properties.example 文件所示。

启用消息过滤

Debezium Server 提供过滤器 SMT(单消息转换)功能。有关更多详细信息,请参阅消息过滤。但是,出于安全原因,默认情况下不启用它,并且必须在 Debezium Server 启动时明确启用。要启用它,请将环境变量 ENABLE_DEBEZIUM_SCRIPTING 设置为 true。这将把 debezium-scripting jar 文件和 JSR 223 实现(目前是 Groovy 和 graalvm.js)jar 文件添加到服务器类路径中。这些 jar 文件包含在 Debezium Server 发行版的 opt_lib 目录中。

接收器配置

接收器配置特定于每种接收器类型。

通过配置属性 debezium.sink.type 选择接收器。

Amazon Kinesis

Amazon Kinesis 是一个数据流系统实现,支持流分片和其他高可扩展性技术。Kinesis 暴露了一组 REST API,并提供了一个(不仅仅是)Java SDK,用于实现接收器。

属性 Default (默认值) 描述

必须设置为 kinesis

提供 Kinesis 目标流的区域名称。

由 aws sdk 确定的端点

(可选) 提供 Kinesis 目标流的端点 URL。

(可选) 用于通过默认凭据配置文件与 Amazon API 通信的凭据配置文件名称。如果不存在,将使用默认凭据提供商链。它将按以下顺序查找凭据:环境变量、Java 系统属性、Web 身份令牌凭据、默认凭据配置文件、Amazon ECS 容器凭据和实例配置文件凭据。

default

Kinesis 不支持无键消息的概念。因此,此字符串将用作没有主键的表的消息键。

注入点

可以通过自定义逻辑提供特定功能替代实现的 Kinesis 接收器行为可以进行修改。当替代实现不可用时,将使用默认实现。

Interface CDI classifier 描述

@CustomConsumerBuilder

用于将消息发送到目标流的自定义配置的 KinesisClient 实例。

自定义实现将计划的目标(主题)名称映射到物理 Kinesis 流名称。默认情况下,使用相同的名称。

Google Cloud Pub/Sub

Google Cloud Pub/Sub 是一个消息/事件系统实现,专为可扩展的批处理和流处理应用程序而设计。Pub/Sub 暴露了一组 REST API,并提供了一个(不仅仅是)Java SDK,用于实现接收器。

属性 Default (默认值) 描述

必须设置为 pubsub

系统范围的默认项目 ID

创建目标主题的项目名称。

true

Pub/Sub 可选择使用消息键来保证具有相同顺序键的消息按相同顺序传递。此功能可以禁用。

default

没有主键的表发送的消息的键为 null。Pub/Sub 不支持此功能,因此必须使用代理键。

100

在将待处理消息发布到 Pub/Sub 之前,达到元素计数或请求字节阈值的最大等待时间。

100L

一旦队列中有这么多消息,就一次性发送所有消息,即使延迟阈值尚未达到。

10000000L

一旦批处理请求中的字节数达到此阈值,就一次性发送所有消息,即使延迟或消息计数阈值都未超过。

false

启用时,将使用流量控制来配置发布者客户端,以限制发布请求的速率。

Long.MAX_VALUE

(可选) 如果启用了流量控制,则在消息被阻止发布之前允许的最大消息数。

Long.MAX_VALUE

(可选) 如果启用了流量控制,则在消息被阻止发布之前允许的最大字节数。

60000

发布(包括重试)到 Pub/Sub 的总超时时间。

5

重试请求之前等待的初始时间。

2.0

之前的等待时间乘以此乘数以得出下次等待时间,直到达到最大值。

Long.MAX_VALUE

重试之前的最大等待时间。即达到此值后,等待时间将不再通过乘数增加。

10000

控制初始远程过程调用的超时时间。

2.0

之前的 RPC 超时时间乘以此乘数以得出下一个 RPC 超时值,直到达到最大值。

10000

到 Cloud Pub/Sub 的单个发布请求的最大超时时间。

30000

从 Cloud Pub/Sub 获取发布请求结果的最大等待时间。

0

客户端库用于发布消息的线程数。设置为 0 时禁用。

-1

消息在传输前被压缩的字节数阈值。设置为 -1 时禁用。

pubsub 模拟器的地址。仅用于开发或测试环境配合pubsub 模拟器使用。除非设置了此值,否则 debezium-server 将连接到运行在 gcp 项目中的 cloud pubsub 实例,这是生产环境中的期望行为。

要连接的 Google Cloud 区域(例如 us-central1asia-northeast1)。指定时,Debezium 将使用 Pub/Sub 的区域性端点,格式为 {region}-pubsub.googleapis.com:443。这允许连接到区域性端点而不是全局端点。请注意,如果指定了 debezium.sink.pubsub.address,则此参数将被忽略。

注入点

可以通过自定义逻辑提供特定功能替代实现的 Pub/Sub 接收器行为可以进行修改。当替代实现不可用时,将使用默认实现。

Interface CDI classifier 描述

@CustomConsumerBuilder

提供自定义配置的 Publisher 实例,用于将消息发送到专用主题的类。

自定义实现将计划的主题名称映射到物理 Pub/Sub 主题名称。默认情况下,使用相同的名称。

Pub/Sub Lite

Google Cloud Pub/Sub Lite 是 Google Cloud Pub/Sub 的经济高效的替代方案。Pub/Sub 暴露了一组 REST API,并提供了一个(不仅仅是)Java SDK,用于实现接收器。

属性 Default (默认值) 描述

必须设置为 pubsublite

系统范围的默认项目 ID

创建目标主题的项目名称或项目 ID。

创建主题的区域。示例 us-east1-b

true

Pub/Sub Lite 可选择使用消息键来保证具有相同键的消息发送到相同分区。此功能可以禁用。

default

没有主键的表发送的消息的键为 null。Pub/Sub Lite 不支持此功能,因此必须使用代理键。

30000

从 Cloud Pub/Sub 获取发布请求结果的最大等待时间。

注入点

可以通过自定义逻辑提供特定功能替代实现的 Pub/Sub Lite 接收器行为可以进行修改。当替代实现不可用时,将使用默认实现。

Interface CDI classifier 描述

@CustomConsumerBuilder

提供自定义配置的 Publisher 实例,用于将消息发送到专用主题的类。

自定义实现将计划的主题名称映射到物理 Pub/Sub Lite 主题名称。默认情况下,使用相同的名称。

HTTP 客户端

HTTP 客户端会将更改流式传输到任何 HTTP 服务器以进行进一步处理,其原始设计目标是让 Debezium 作为Knative 事件源。HTTP 客户端接收器支持可选的JSON Web Token (JWT) 身份验证

属性 Default (默认值) 描述

必须设置为 http

将事件流式传输到的 HTTP 服务器 URL。这也可以通过定义 K_SINK 环境变量来设置,Knative 源框架会使用该变量。

60000

在超时之前等待服务器响应的秒数。(默认为 60 秒)

5

抛出异常之前的重试次数(默认为 5 次)。

1000

故障后重试发送记录的毫秒等待时间(默认为 1 秒)。

X-DEBEZIUM-

将以此值作为前缀的标头(默认为 X-DEBEZIUM-)。

true

标头值将被 base64 编码(默认为 true)。

指定 HTTP 客户端接收器连接到 HTTP 服务器时使用的身份验证类型。支持以下选项之一:

jwt

JSON Web Token (JWT) 身份验证。

standard-webhooks

标准 Webhooks.

如果省略此属性,HTTP 客户端接收器将不使用身份验证标头进行连接。

指定 JWT 身份验证的用户名。

指定 JWT 身份验证的密码。

指定 JWT 身份验证的基础 URL(例如,http://myserver:8000/)。路径 auth/authenticateauth/refreshToken 将用于 JWT 初始和身份验证 REST 请求。

请求的持续时间(以分钟为单位),在此之前身份验证令牌将过期。

请求的持续时间(以分钟为单位),在此之前刷新令牌将过期。

Debezium 用于为 webhook 请求生成 HMAC-SHA256 签名的 webhook 签名密钥。密钥必须是 Base64 编码的,大小在 24 字节到 64 字节(192-512 位)之间。可选地,您可以为密钥添加 whsec_ 前缀,以帮助将其与其他类型的密钥或令牌区分开。有关实现或验证 webhook 签名的更多信息,请参阅标准 Webhooks 规范

Apache Pulsar

Apache Pulsar 是一个高性能、低延迟的服务器到服务器消息传递服务器。Pulsar 暴露了 REST API,并且原生端点提供了一个(不仅仅是)Java 客户端,用于实现接收器。

属性 Default (默认值) 描述

必须设置为 pulsar

0

配置发送消息批次到 Pulsar 并等待生产者刷新和持久化所有消息的超时时间(毫秒)。默认设置为 0,表示无超时。确保生产者上的 maxPendingMessagesblockIfQueueFull 已正确配置。

Pulsar 模块支持传递配置。客户端 配置属性在去掉前缀后传递给客户端。至少需要提供 serviceUrl

Pulsar 模块支持传递配置。消息生产者 配置属性在去掉前缀后传递给生产者。topic 由 Debezium 设置。

DEFAULT

指定生产者的批处理构建器。生产者使用批处理构建器创建批消息容器。此设置仅在启用批处理时适用。有效选项是 DEFAULTKEY_BASED,后者用于KeyShared 订阅

default

没有主键的表发送的消息的键为 null。Pulsar 不支持此功能,因此必须使用代理键。

public

用于传递消息的目标租户。

default

用于传递消息的目标命名空间。

注入点

可以通过自定义逻辑提供特定功能替代实现的 Pulsar 接收器行为可以进行修改。当替代实现不可用时,将使用默认实现。

Interface CDI classifier 描述

自定义实现将计划的主题名称映射到物理 Pulsar 主题名称。默认情况下,使用相同的名称。

Azure Event Hubs

Azure Event Hubs 是一个大数据流平台和事件摄取服务,每秒可接收和处理数百万个事件。发送到事件中心的数据可以通过任何实时分析提供商或批处理/存储适配器进行转换和存储。

属性 Default (默认值) 描述

必须设置为 eventhubs

与 Event Hubs 通信所需的连接字符串。格式为:Endpoint=sb://<NAMESPACE>/;SharedAccessKeyName=<ACCESS_KEY_NAME>;SharedAccessKey=<ACCESS_KEY_VALUE>

事件中心名称

(可选) 事件将被发送到的事件中心分区的标识符。如果您希望 Debezium 收到的所有更改事件都发送到 Event Hubs 的特定分区,请使用此项。如果已指定 debezium.sink.eventhubs.partitionkey,请勿使用此项。

(可选) 分区键将用于哈希事件。如果您希望 Debezium 收到的所有更改事件都发送到 Event Hubs 的特定分区,请使用此项。如果已指定 debezium.sink.eventhubs.partitionid,请勿使用此项。

设置事件批次的最大大小(以字节为单位)。

无默认值

(可选) 指定 Debezium 用于加密 Azure Event Hubs 消息键的哈希函数。

指定以下值之一:

  • java

  • md5

  • sha1

  • sha256

在 EventHubs 中使用分区

默认情况下,当未定义可选的 debezium.sink.eventhubs.partitioniddebezium.sink.eventhubs.partitionkey 属性时,EventHubs 接收器将以循环方式将事件发送到所有可用分区。

您可以通过设置 debezium.sink.eventhubs.partitionid 属性来强制所有消息发送到一个固定的分区。或者,您可以使用 debezium.sink.eventhubs.partitionkey 属性指定一个固定的分区键,EventHubs 将使用该键将所有事件路由到特定分区。

如果您有更具体的路由要求,可以使用分区路由转换器。确保转换器中的 partition.topic.num 设置指定的分区数等于或小于您的 EventHubs 命名空间中可用的分区数,这样事件就不会路由到不存在的分区 ID。例如,要根据源模式名称将所有事件路由到 5 个分区,您可以在 application.properties 中设置以下内容:

# Uses a hash of `source.db` to calculate which partition to send the event to. Ensures all events from the same source schema are sent to the same partition.
debezium.transforms=PartitionRouter
debezium.transforms.PartitionRouter.type=io.debezium.transforms.partitions.PartitionRouting
debezium.transforms.PartitionRouter.partition.payload.fields=source.db
debezium.transforms.PartitionRouter.partition.topic.num=5
注入点

可以通过自定义逻辑提供特定功能替代实现的默认接收器行为可以进行修改。当替代实现不可用时,将使用默认实现。

Interface CDI classifier 描述

@CustomConsumerBuilder

用于发送消息的自定义配置的 EventHubProducerClient 实例。

Redis (Stream)

Redis 是一个开源(BSD 许可)的内存数据结构存储,用作数据库、缓存和消息代理。Stream 是一种数据类型,它以更抽象的方式模拟了*日志数据结构*。它实现了强大的操作来克服日志文件的限制。

属性 Default (默认值) 描述

必须设置为 redis

提供 Redis 目标流的地址,格式为 host:port

0

0..15 范围内的数字,用于选择要操作的数据库。默认为数据库 0。此功能仅适用于独立的 Redis 连接;Redis 群集仅使用数据库 0。

(可选) 用于与 Redis 通信的用户名。

(可选) 用于与 Redis 通信的密码(相应用户的)。如果设置了用户,则必须设置密码。

false

(可选) 一个布尔值,指定与 Redis 的连接是否需要 SSL。

false

(可选) 一个布尔值,指定与 Redis 的连接是否应验证服务器的主机名。

(可选) 如果使用带 SSL 的 Redis 接收器,则为信任存储文件路径。如果设置,Redis 连接将使用此属性而不是其他配置或系统属性。

(可选) 如果使用带 SSL 的 Redis 接收器,则为信任存储文件的密码。如果设置,Redis 连接将使用此属性而不是其他配置或系统属性。

JKS

(可选) 如果使用带 SSL 的 Redis 接收器,则为信任存储文件的类型。如果设置,Redis 连接将使用此属性而不是其他配置或系统属性。

(可选) 如果使用带 SSL 的 Redis 接收器,则为密钥存储文件路径。如果设置,Redis 连接将使用此属性而不是其他配置或系统属性。

(可选) 如果使用带 SSL 的 Redis 接收器,则为密钥存储文件的密码。如果设置,Redis 连接将使用此属性而不是其他配置或系统属性。

JKS

(可选) 如果使用带 SSL 的 Redis 接收器,则为密钥存储文件的类型。如果设置,Redis 连接将使用此属性而不是其他配置或系统属性。

default

Redis 不支持无键数据。因此,此字符串将用作没有主键的记录的键。

default

Redis 不支持空载荷的概念,如墓碑事件一样。因此,此字符串将用作没有载荷的记录的值。

500

在单个批写(管道事务)中插入的更改记录数。

300

在遇到 Redis 连接或 OOM 问题时的初始重试延迟。此值将在每次重试时加倍,但不会超过 debezium.sink.redis.retry.max.delay.ms

10000

遇到 Redis 连接或 OOM 问题时的最大延迟。

2000

Redis 客户端的连接超时(毫秒)。

2000

Redis 客户端的套接字超时(毫秒)。

false

启用等待副本。如果 Redis 配置了副本分片,这允许验证数据是否已写入副本。有关更多信息,请参阅 Redis WAIT 命令。

1000

等待副本的超时时间(毫秒)。必须为正值。

false

启用等待副本失败时的重试。

1000

等待副本失败时重试的延迟(毫秒)。

compact

发送到 Redis 流的消息的格式。可能的值为 extended(较新格式)和 compact(至今为止的旧格式)。下方阅读有关消息格式的更多信息。

85

如果 used_memory 百分比(相对于 Redis 配置的 maxmemory)高于或等于此阈值,则接收器将停止消耗记录。如果配置值为 0,则禁用此阈值。

0

如果 Redis maxmemory 不可用或为 0,则 debezium.sink.redis.memory.threshold.percentage 将应用于此值(如果此值为正)。默认值为 0(禁用)。

true

确定是否跳过 Debezium 连接器的心跳消息(不存储在 Redis 中)。当设置为 true(默认)时,心跳消息将被标记为已处理但不会存储在 Redis 流中。当设置为 false 时,心跳消息将与常规 CDC 事件一起存储在 Redis 流中。

消息格式

我们上面已经看到了 debezium.sink.redis.message.format 属性,它以两种方式配置消息格式,在 Redis 中看起来像这样:

  • extended 格式,使用两对 {1), 2)}={"key", "message key"} 和 {3), 4)}={"value", "message value"}

1) 1) "1639304527499-0"
   2) 1) "key"
      2) "{\"schema\": {\"type\": \"struct\", \"fields\": [{\"type\": \"int32\", \"optional\": false, \"field\": \"empno\"}], \"optional\": false, \"name\": \"redislabs.dbo.emp.Key\"}, \"payload\": {\"empno\": 11}}"
      3) "value"
      4) "{\"schema\": {\"type\": \"struct\", \"fields\": [{\"type\": \"struct\", \"fields\": [{\"type\": \"int32\", \"optional\": false, \"field\": \"empno\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"fname\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"lname\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"job\"}, {\"type\": \"int32\", \"optional\": true, \"field\": \"mgr\"}, {\"type\": \"int64\", \"optional\": true, \"name\": \"io.debezium.time.Timestamp\", \"version\": 1, \"field\": \"hiredate\"}, {\"type\": \"bytes\", \"optional\": true, \"name\": \"org.apache.kafka.connect.data.Decimal\", \"version\": 1, \"parameters\": {\"scale\": \"4\", \"connect.decimal.precision\": \"19\"}, \"field\": \"sal\"}, {\"type\": \"bytes\", \"optional\": true, \"name\": \"org.apache.kafka.connect.data.Decimal\", \"version\": 1, \"parameters\": {\"scale\": \"4\", \"connect.decimal.precision\": \"19\"}, \"field\": \"comm\"}, {\"type\": \"int32\", \"optional\": true, \"field\": \"dept\"}], \"optional\": true, \"name\": \"redislabs.dbo.emp.Value\", \"field\": \"before\"}, {\"type\": \"struct\", \"fields\": [{\"type\": \"int32\", \"optional\": false, \"field\": \"empno\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"fname\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"lname\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"job\"}, {\"type\": \"int32\", \"optional\": true, \"field\": \"mgr\"}, {\"type\": \"int64\", \"optional\": true, \"name\": \"io.debezium.time.Timestamp\", \"version\": 1, \"field\": \"hiredate\"}, {\"type\": \"bytes\", \"optional\": true, \"name\": \"org.apache.kafka.connect.data.Decimal\", \"version\": 1, \"parameters\": {\"scale\": \"4\", \"connect.decimal.precision\": \"19\"}, \"field\": \"sal\"}, {\"type\": \"bytes\", \"optional\": true, \"name\": \"org.apache.kafka.connect.data.Decimal\", \"version\": 1, \"parameters\": {\"scale\": \"4\", \"connect.decimal.precision\": \"19\"}, \"field\": \"comm\"}, {\"type\": \"int32\", \"optional\": true, \"field\": \"dept\"}], \"optional\": true, \"name\": \"redislabs.dbo.emp.Value\", \"field\": \"after\"}, {\"type\": \"struct\", \"fields\": [{\"type\": \"string\", \"optional\": false, \"field\": \"version\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"connector\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"name\"}, {\"type\": \"int64\", \"optional\": false, \"field\": \"ts_ms\"}, {\"type\": \"string\", \"optional\": true, \"name\": \"io.debezium.data.Enum\", \"version\": 1, \"parameters\": {\"allowed\": \"true,last,false\"}, \"default\": \"false\", \"field\": \"snapshot\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"db\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"sequence\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"schema\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"table\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"change_lsn\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"commit_lsn\"}, {\"type\": \"int64\", \"optional\": true, \"field\": \"event_serial_no\"}], \"optional\": false, \"name\": \"io.debezium.connector.sqlserver.Source\", \"field\": \"source\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"op\"}, {\"type\": \"int64\", \"optional\": true, \"field\": \"ts_ms\"}, {\"type\": \"struct\", \"fields\": [{\"type\": \"string\", \"optional\": false, \"field\": \"id\"}, {\"type\": \"int64\", \"optional\": false, \"field\": \"total_order\"}, {\"type\": \"int64\", \"optional\": false, \"field\": \"data_collection_order\"}], \"optional\": true, \"field\": \"transaction\"}], \"optional\": false, \"name\": \"redislabs.dbo.emp.Envelope\"}, \"payload\": {\"before\": {\"empno\": 11, \"fname\": \"Yossi\", \"lname\": \"Mague\", \"job\": \"PFE\", \"mgr\": 1, \"hiredate\": 1562630400000, \"sal\": \"dzWUAA==\", \"comm\": \"AYag\", \"dept\": 3}, \"after\": null, \"source\": {\"version\": \"1.6.0.Final\", \"connector\": \"sqlserver\", \"name\": \"redislabs\", \"ts_ms\": 1637859764960, \"snapshot\": \"false\", \"db\": \"RedisConnect\", \"sequence\": null, \"schema\": \"dbo\", \"table\": \"emp\", \"change_lsn\": \"0000003a:00002f50:0002\", \"commit_lsn\": \"0000003a:00002f50:0005\", \"event_serial_no\": 1}, \"op\": \"d\", \"ts_ms\": 1637859769370, \"transaction\": null}}"
  • compact 格式,只使用一对 {1), 2)}={"message key", "message value"}

1) 1) "1639304527499-0"
   2) 1) "{\"schema\": {\"type\": \"struct\", \"fields\": [{\"type\": \"int32\", \"optional\": false, \"field\": \"empno\"}], \"optional\": false, \"name\": \"redislabs.dbo.emp.Key\"}, \"payload\": {\"empno\": 11}}"
      2) "{\"schema\": {\"type\": \"struct\", \"fields\": [{\"type\": \"struct\", \"fields\": [{\"type\": \"int32\", \"optional\": false, \"field\": \"empno\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"fname\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"lname\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"job\"}, {\"type\": \"int32\", \"optional\": true, \"field\": \"mgr\"}, {\"type\": \"int64\", \"optional\": true, \"name\": \"io.debezium.time.Timestamp\", \"version\": 1, \"field\": \"hiredate\"}, {\"type\": \"bytes\", \"optional\": true, \"name\": \"org.apache.kafka.connect.data.Decimal\", \"version\": 1, \"parameters\": {\"scale\": \"4\", \"connect.decimal.precision\": \"19\"}, \"field\": \"sal\"}, {\"type\": \"bytes\", \"optional\": true, \"name\": \"org.apache.kafka.connect.data.Decimal\", \"version\": 1, \"parameters\": {\"scale\": \"4\", \"connect.decimal.precision\": \"19\"}, \"field\": \"comm\"}, {\"type\": \"int32\", \"optional\": true, \"field\": \"dept\"}], \"optional\": true, \"name\": \"redislabs.dbo.emp.Value\", \"field\": \"before\"}, {\"type\": \"struct\", \"fields\": [{\"type\": \"int32\", \"optional\": false, \"field\": \"empno\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"fname\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"lname\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"job\"}, {\"type\": \"int32\", \"optional\": true, \"field\": \"mgr\"}, {\"type\": \"int64\", \"optional\": true, \"name\": \"io.debezium.time.Timestamp\", \"version\": 1, \"field\": \"hiredate\"}, {\"type\": \"bytes\", \"optional\": true, \"name\": \"org.apache.kafka.connect.data.Decimal\", \"version\": 1, \"parameters\": {\"scale\": \"4\", \"connect.decimal.precision\": \"19\"}, \"field\": \"sal\"}, {\"type\": \"bytes\", \"optional\": true, \"name\": \"org.apache.kafka.connect.data.Decimal\", \"version\": 1, \"parameters\": {\"scale\": \"4\", \"connect.decimal.precision\": \"19\"}, \"field\": \"comm\"}, {\"type\": \"int32\", \"optional\": true, \"field\": \"dept\"}], \"optional\": true, \"name\": \"redislabs.dbo.emp.Value\", \"field\": \"after\"}, {\"type\": \"struct\", \"fields\": [{\"type\": \"string\", \"optional\": false, \"field\": \"version\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"connector\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"name\"}, {\"type\": \"int64\", \"optional\": false, \"field\": \"ts_ms\"}, {\"type\": \"string\", \"optional\": true, \"name\": \"io.debezium.data.Enum\", \"version\": 1, \"parameters\": {\"allowed\": \"true,last,false\"}, \"default\": \"false\", \"field\": \"snapshot\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"db\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"sequence\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"schema\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"table\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"change_lsn\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"commit_lsn\"}, {\"type\": \"int64\", \"optional\": true, \"field\": \"event_serial_no\"}], \"optional\": false, \"name\": \"io.debezium.connector.sqlserver.Source\", \"field\": \"source\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"op\"}, {\"type\": \"int64\", \"optional\": true, \"field\": \"ts_ms\"}, {\"type\": \"struct\", \"fields\": [{\"type\": \"string\", \"optional\": false, \"field\": \"id\"}, {\"type\": \"int64\", \"optional\": false, \"field\": \"total_order\"}, {\"type\": \"int64\", \"optional\": false, \"field\": \"data_collection_order\"}], \"optional\": true, \"field\": \"transaction\"}], \"optional\": false, \"name\": \"redislabs.dbo.emp.Envelope\"}, \"payload\": {\"before\": {\"empno\": 11, \"fname\": \"Yossi\", \"lname\": \"Mague\", \"job\": \"PFE\", \"mgr\": 1, \"hiredate\": 1562630400000, \"sal\": \"dzWUAA==\", \"comm\": \"AYag\", \"dept\": 3}, \"after\": null, \"source\": {\"version\": \"1.6.0.Final\", \"connector\": \"sqlserver\", \"name\": \"redislabs\", \"ts_ms\": 1637859764960, \"snapshot\": \"false\", \"db\": \"RedisConnect\", \"sequence\": null, \"schema\": \"dbo\", \"table\": \"emp\", \"change_lsn\": \"0000003a:00002f50:0002\", \"commit_lsn\": \"0000003a:00002f50:0005\", \"event_serial_no\": 1}, \"op\": \"d\", \"ts_ms\": 1637859769370, \"transaction\": null}}"

您可以在此处阅读有关 Redis Streams 的更多信息。

注入点

可以通过自定义逻辑提供特定功能替代实现的 Redis 接收器行为可以进行修改。当替代实现不可用时,将使用默认实现。

Interface CDI classifier 描述

自定义实现将计划的目标(流)名称映射到物理 Redis 流名称。默认情况下,使用相同的名称。

NATS Streaming

NATS Streaming 是一个由 NATS 提供支持的数据流系统,用 Go 编程语言编写。

属性 Default (默认值) 描述

必须设置为 nats-streaming

集群节点或节点的 URL(或逗号分隔的 URL 列表),格式为 nats://host:port

NATS Streaming 集群 ID。

NATS Streaming 客户端 ID。

注入点

可以通过自定义逻辑提供特定功能替代实现的 NATS Streaming 接收器行为可以进行修改。当替代实现不可用时,将使用默认实现。

Interface CDI classifier 描述

@CustomConsumerBuilder

用于将消息发布到目标主题的自定义配置的 StreamingConnection 实例。

自定义实现将计划的目标(主题)名称映射到物理 NATS Streaming 主题名称。默认情况下,使用相同的名称。

NATS JetStream

NATS 有一个内置的分布式持久化系统,称为JetStream,它在基础 'Core NATS' 功能和质量服务之上启用了新功能和更高质量的服务。

属性 Default (默认值) 描述

必须设置为 nats-jetstream

集群节点或节点的 URL(或逗号分隔的 URL 列表),格式为 nats://host:port

如果为 true,将创建一个基本流。

*.*.*

主题(消息通道名称)的逗号分隔列表。可以包含通配符,如 test.inventory.*
重要提示:要捕获模式更改事件和数据更改事件,您必须同时指定主题前缀和通配符模式。例如,如果您的 debezium.source.topic.prefixmyapp,请将主题配置为 myapp,myapp.>myapp,myapp.**.**。
- 模式更改事件 (DDL) 发布到确切的主题前缀(例如,myapp),并且会被任何带有子主题通配符的主题拒绝。
- 数据更改事件发布到表特定的主题(例如,myapp.database.table)。
- 使用 myapp.> 匹配任意数量的主题级别,或使用 myapp.**.**. 匹配前缀后的确切两个级别。

memory

控制消息如何在流中保存。可以是内存或文件。

无默认值

指定 NATS 服务器客户端的身份。将此属性添加到配置以启用与 NATS 的 JSON Web Token (JWT) 身份验证。要使用 JWT 身份验证与 NATS 配合使用,您必须指定 NKey 种子。如果密码身份验证已启用,请勿启用 JWT 身份验证。

无默认值

当为 NATS 启用了JWT 身份验证时,使用此属性指定代表 Debezium 用户的 NKey 种子。Debezium 使用指定的 NKey 种子来派生私钥。然后,它使用私钥对 NATS 服务器在身份验证过程中发出的 nonce 挑战进行加密签名。Debezium 将签名后的 nonce 与指定debezium.sink.nats-jetstream.auth.jwt 客户端的公钥一起返回给服务器。

无默认值

指定授权 NAT 用户的用户名。
当此属性存在于配置中时,与 NATS 的密码身份验证将启用。
要使用密码身份验证与 NATS 配合使用,请指定 debezium.sink.nats-jetstream.auth.password。如果JWT 身份验证已启用,请勿启用密码身份验证。

无默认值

指定在启用密码身份验证时使用的密码。

true

(可选) 一个布尔值,指定 Debezium 是否可以异步流式传输到 NATS JetStream 服务器。

5000

(可选) 指定 Debezium 在发送消息批次进行异步处理后等待 NATS 服务器确认的最大时间(以毫秒为单位)。在异步处理期间,每条消息都将使用 asyncTimeoutMs 指定的超时进行发布。

如果您需要更可配置的流,可以使用 nats cli 创建。有关流的更多信息:https://docs.nats.io/nats-concepts/jetstream/streams

注入点

可以通过自定义逻辑提供特定功能替代实现的 NATS JetStream 接收器行为可以进行修改。当替代实现不可用时,将使用默认实现。

Interface CDI classifier 描述

@CustomConsumerBuilder

用于将消息发布到目标主题的自定义配置的 JetStream 实例。

自定义实现将计划的目标(主题)名称映射到物理 NATS JetStream 主题名称。默认情况下,使用相同的名称。

Apache Kafka

Apache Kafka 是一个流行的分布式事件流开源平台。Debezium Server 支持将捕获的更改事件发布到配置的 Kafka 消息代理。

属性 Default (默认值) 描述

必须设置为 kafka

Kafka 接收器适配器支持传递配置。这意味着所有 Kafka 生产者 配置属性在去掉前缀后传递给生产者。至少需要提供 bootstrap.serverskey.serializervalue.serializer 属性。topic 由 Debezium 设置。

30000

服务器等待请求完成并返回记录元数据(以毫秒为单位)的最大时间。指定的超时时间也决定了服务器等待 Kafka 响应请求的时间间隔。将值设置为 0 可禁用超时。

注入点

可以通过自定义逻辑提供特定功能替代实现的 Kafka 接收器行为可以进行修改。当替代实现不可用时,将使用默认实现。

Interface CDI classifier 描述

自定义实现将原始目标(主题)名称映射到另一个 Kafka 主题。默认情况下,使用相同的名称。

Pravega

Pravega 是一个面向事件流和数据流的云原生存储系统。此接收器提供两种模式:非事务性和事务性。非事务性模式将 Debezium 批次中的每个事件单独写入 Pravega。事务性模式将 Debezium 批次写入 Pravega 事务,该事务在批次完成后提交。

Pravega 接收器期望目标范围和流已创建。

属性 Default (默认值) 描述

必须设置为 pravega

tcp://:9090

连接 Pravega 集群中 Controller 的连接字符串。

用于查找目标流的范围名称。

false

设置为 true 以使接收器为每个 Debezium 批次使用 Pravega 事务。

注入点

可以通过自定义逻辑提供特定功能替代实现的 Pravega 接收器行为可以进行修改。当替代实现不可用时,将使用默认实现。

Interface CDI classifier 描述

自定义实现将计划的目标(流)名称映射到物理 Pravega 流名称。默认情况下,使用相同的名称。

Infinispan

Infinispan 是开源内存数据网格,提供丰富的缓存类型和缓存存储。由于数据访问速度非常快,Infinispan 可以用作各种数据处理和分析工具的数据源(除其他用途外)。

Infinispan 接收器期望目标缓存已在 Infinispan 集群中定义并创建。

属性 Default (默认值) 描述

必须设置为 infinispan

Infinispan 集群的服务器主机名(也可以是逗号分隔的服务器列表)。

11222

Infinispan 服务器的端口。

记录将被存储的(现有)缓存的名称。

(可选) 用于连接到 Infinispan 集群的用户名。

(可选) 用于连接到 Infinispan 集群的密码。

注入点

可以通过自定义逻辑提供特定功能替代实现的 Infinispan 接收器行为可以进行修改。当替代实现不可用时,将使用默认实现。

Interface CDI classifier 描述

@CustomConsumerBuilder

自定义的 Hot Rod 缓存实例,用于连接和发送事件到 Infinspan 集群。

Apache RocketMQ

Apache RocketMQ 是一个分布式消息和流平台,具有低延迟、高性能和可靠性、万亿级容量和灵活的可扩展性。Debezium 服务器支持将捕获的更改事件发布到配置的 RocketMQ。

属性 Default (默认值) 描述

必须设置为 rocketmq

Apache RocketMQ 的名称服务器地址。

Apache RocketMQ 的生产者组。

4M,建议小于 4 MB。

(可选) 发送的消息体的最大字节数。

3000ms

(可选) 发送消息超时时长是客户端本地同步调用的等待时间。根据实际应用设置一个合适的值,以避免长时间线程阻塞。

false

(可选) 配置用于启用访问授权。

(可选) 用于连接 Apache RocketMQ 集群的访问密钥。

(可选) 用于连接 Apache RocketMQ 集群的访问密钥。

注入点

RocketMQ 接收器行为可以通过提供特定功能的替代实现的自定义逻辑进行修改。当替代实现不可用时,将使用默认实现。

Interface CDI classifier 描述

@CustomConsumerBuilder

用于将消息发布到目标主题的 RocketMQ 自定义配置实例。

自定义实现将计划的目标(流)名称映射到 RocketMQ 主题名称。默认情况下使用相同的名称。

RabbitMQ Stream

RabbitMQ 是一个开源消息代理,支持多种消息协议,并可以部署在分布式和联合配置中,以满足高规模、高可用性的要求。RabbitMQ 支持消息队列和流。Debezium Server 支持将捕获的更改事件发布到配置的 RabbitMQ Stream。

属性 Default (默认值) 描述

必须设置为 rabbitmq

localhost

RabbitMQ 服务器的主机。

5672

RabbitMQ 服务器的端口。

RabbitMQ 模块支持直通配置。连接 配置属性 会在去掉前缀后传递给 RabbitMQ 客户端。

30000

定义在发布消息后等待代理确认的最大毫秒数。

主题名称

(可选) 发布消息时使用的交换名称。

空字符串

(可选) 发布消息时使用的静态路由键。

false

(可选) 如果设置为 true,则会自动创建不存在的路由键。

true

(可选) 如果设置为 true,则目标队列内容将在 RabbitMQ 服务器重启后仍然存在。

false

(可选) 已弃用,请参阅 debezium.sink.rabbitmq.routingKey.source

2

(可选) 消息被传递到 RabbitMQ 服务器并存储在其上的方式

  • 1 - 非持久化

  • 2 - 持久化

default

RabbitMQ 不支持空负载的概念,例如墓碑事件。因此,此字符串将用作没有负载的记录的值。

static

(可选) 获取事件路由键的方式。

  • static (默认): 路由键将从 debezium.sink.rabbitmq.routingKey 获取。

  • topic: 路由键与交换名称相同。

  • key: 路由键将从记录键获取。

注入点

RabbitMQ 接收器行为可以通过自定义逻辑进行修改,提供特定功能的替代实现。当替代实现不可用时,将使用默认实现。

Interface CDI classifier 描述

自定义实现将计划的目标(流)名称映射到 RabbitMQ 交换名称和(如果启用)路由键名称。默认情况下使用相同的名称。

RabbitMQ Native Stream

RabbitMQ 3.9 起,Streams 被引入 RabbitMQ,利用了一种新的极速协议,可以与 AMQP 0.9.1 一起使用。Streams 非常适合大规模的扇出、重放和时间旅行以及大型日志,所有这些都具有非常高的吞吐量(每秒百万条消息)。

Debezium Server 得到了增强,可以通过利用 RabbitMQ Stream Java Client 来支持将捕获的更改事件发布到本地 RabbitMQ Streams。

属性 Default (默认值) 描述

必须设置为 rabbitmqstream

localhost

RabbitMQ 服务器的主机。

5552

RabbitMQ Stream 协议的端口。

RabbitMQ 模块支持直通配置。连接 配置属性 会在去掉前缀后传递给 RabbitMQ 客户端。

30000

定义在发布消息后等待代理确认的最大毫秒数。

default

RabbitMQ 不支持空负载的概念,例如墓碑事件。因此,此字符串将用作没有负载的记录的值。

Milvus

Milvus 是一个开源向量数据库,专为相似性搜索和高维数据(如机器学习模型的嵌入,例如文本、图像和音频)的检索而设计。您可以使用 Milvus 处理从源数据库捕获的向量数据类型,或者使用转换功能从消息字段计算向量,然后将这些向量用作嵌入。

Milvus 接收器会摄取传入的消息,并将每条消息的 after 部分 upsert 到一个集合中。集合不能包含点,因此接收器会将所有点替换为下划线字符。当收到删除消息时,匹配的记录将从集合中移除。

属性 Default (默认值) 描述

指定接收器的类型。必须设置为 milvus

(可选) 访问 Milvus 数据库实例的 URL。

default

(可选) 包含目标集合的数据库名称。

注入点

您可以通过应用自定义逻辑来修改 Milvus 接收器连接器的行为,该逻辑为特定功能提供替代实现。如果替代实现不可用,连接器将使用默认实现。

Interface CDI classifier 描述

@CustomConsumerBuilder

一个已配置为访问目标集合的自定义 MilvusClientV2 客户端实例。

自定义实现,用于映射目标主题的名称到 Milvus 集合。默认情况下,名称中的点将被替换为下划线。

Qdrant Sink

Qdrant 是一个开源向量数据库,专为向量相似性搜索和扩展的强大过滤功能而优化。它专为高负载应用程序而设计,使您能够高效地存储、管理和搜索嵌入向量。您可以使用 Qdrant 处理直接从源数据库捕获的向量数据类型,或者使用转换功能从消息字段计算嵌入,然后将这些嵌入发送到数据库进行处理。

Qdrant 接收器会摄取传入的消息,并将每条消息的 after 部分 upsert 到一个集合中。当收到删除消息时,匹配的记录将从集合中移除。

接收器的行为遵循以下规则

  • 每个 Debezium 集合或表都映射到一个 Qdrant 集合。

  • 主键是必需的,并用作 Qdrant 点 ID(仅支持 INT64UUID)。

  • FloatVectorDoubleVector 数据可以用作 Qdrant 向量的来源。

  • 非主键和非向量字段映射到 Qdrant 负载。

属性 Default (默认值) 描述

无默认值。

指定接收器的类型。您必须显式将值设置为 qdrant

localhost

(可选) 访问 Qdrant 数据库实例的主机名。

6333

(可选) 访问 Qdrant 数据库实例的端口。

(可选) 认证 Qdrant 数据库实例所需的 API 密钥。

无默认值。

(可选) 逗号分隔的 collection-name:field-name 对列表,用于为每个集合显式定义要使用的向量字段。
此字段对于包含多个向量字段的源表和集合是必需的。

无默认值。

(可选) 逗号分隔的列表,指定集合中代表 Qdrant 集合负载的字段名称子集。

注入点

要修改连接器行为,您可以应用自定义逻辑来为某些功能指定替代实现。如果您指定的实现不可用,连接器将使用默认实现。

Interface CDI classifier 描述

@CustomConsumerBuilder

一个已配置为访问目标集合的自定义 QdrantClient 客户端实例。

无默认值。

自定义实现,将目标主题的名称映射到 Qdrant 集合。

InstructLab

InstructLab 是一个社区驱动的项目,用于增强大型语言模型 (LLM) 在生成式人工智能应用中的使用。通过与 InstructLab 合作,识别基础模型能力差距的用户可以协作开发一个增强模型的分类法,每个贡献者提供特定的专业知识和技能。

Debezium Server 使您能够通过配置基于 InstructLab 问题和答案 (qna.yml) 文件的自定义接收器来自动化向分类法添加技能和知识的过程。接收器配置定义了一系列映射,用于从事件流中派生问题、答案和上下文值。这些映射可以直接从事件负载、头信息或静态配置的常量中的字段进行源。然后,用户可以定期使用 InstructLab 来训练模型,以利用添加到分类法中的新技能和知识。

属性 Default (默认值) 描述

必须设置为 instructlab

用于存储 InstructLab 分类法技能和知识的根目录的绝对路径。此值与 分类法域 属性结合使用,以构建 qna.yml 文件的完整路径。

用于匹配主题以确定是否应用 <name> 分类法映射的符号名称的逗号分隔列表。

.*

一个正则表达式,用于匹配主题以确定是否应用 <name> 分类法映射。

指定一个 映射定义,用作 qna.yml 文件中种子示例的 question 属性。这是必需的。

指定一个 映射定义,用作 qna.yml 文件中种子示例的 answer 属性。这是必需的。

指定一个 映射定义,用作 qna.yml 文件中种子示例的 context 属性。这是可选的。

指定分类法域,这是一系列用 / 分隔的目录,指向 qna.yml,但不包括分类法基础路径。例如,值为 a/b 且基础路径为 /taxonomy 表示 /taxonomy/a/b/qna.yml

映射定义

在 InstructLab 接收器配置中,您可以设置属性来指定 Debezium Server 如何将事件消息中的字段映射到 InstructLab qna.yml 文件中的 questionanswercontext 属性。

对于您要在 qna.yml 文件中填充的每种属性类型,您都需要指定一个映射前缀,该前缀确定 Debezium 从中提取值的消息字段。您可以指定以下前缀值

Value (值)

如果您在映射定义前加上字符串 value:,Debezium 将从传入事件负载中提取指定字段的值。例如,要使用事件负载中 abc 字段的值填充 InstructLab question 属性,请将 debezium.sink.instructlab.taxonomy.<name>.question 属性设置为 value:abc。当 Debezium 处理消息时,它会获取负载字段 abc 的值,并将其作为 question 属性添加到 qna.yml 文件中。

对于具有 Debezium 结构化负载的事件,Debezium 会从负载的 after 部分中提取指定字段。如果事件被展平,则字段将直接从事件的值中获取。

Header

如果您在映射定义前加上字符串 header:,Debezium 将提取传入事件消息的指定头字段的值。例如,如果您指定映射 header:h1,当 Debezium 在源消息中检测到名称为 h1 的头时,它将提取 h1 字段的值。

Constant

如果映射定义不包含 header:value: 前缀,当 Debezium 在传入消息中检测到指定值的实例时,它会将它们视为常量,并按原样使用。当您希望将特定静态常量的值映射到 qna.yml 文件中的属性时,请使用此选项。

扩展

Debezium Server 使用 Quarkus 框架,并依赖于依赖注入来使开发人员能够扩展其行为。请注意,仅支持 Quarkus 的 JVM 模式,而不支持通过 GraalVM 进行原生执行。服务器可以通过提供自定义逻辑来扩展

  • 实现新的接收器

  • 自定义现有接收器 - 即非标准配置

实现新的接收器

新的接收器可以作为 CDI bean 实现接口 DebeziumEngine.ChangeConsumer,并带有 @Named 注释、唯一的名称和 @Dependent 作用域来实现。Bean 的名称用作 debezium.sink.type 选项。

接收器需要使用 Microprofile Config API 读取配置。执行路径必须将消息传递到目标系统,并定期提交已传递/处理的消息。

有关详细信息,请参阅 Kinesis 接收器 实现。

自定义现有接收器

一些接收器暴露了依赖注入点,允许用户提供自己的 bean 来修改接收器的行为。典型的例子是微调目标客户端的设置、目标命名等。

有关详细信息,请参阅自定义 主题命名策略 实现的示例。

Cassandra 连接器

使用 Cassandra 连接器运行 Debezium Server

使用 Java 11+ 运行需要通过 JDK_JAVA_OPTIONS 环境变量或等效方式在启动时设置以下 Java 选项

JDK_JAVA_OPTIONS="--add-exports java.base/jdk.internal.misc=ALL-UNNAMED --add-exports java.base/jdk.internal.ref=ALL-UNNAMED --add-exports java.base/sun.nio.ch=ALL-UNNAMED --add-exports java.management.rmi/com.sun.jmx.remote.internal.rmi=ALL-UNNAMED --add-exports java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports java.rmi/sun.rmi.server=ALL-UNNAMED --add-exports java.sql/java.sql=ALL-UNNAMED  --add-opens java.base/java.lang.module=ALL-UNNAMED --add-opens java.base/jdk.internal.loader=ALL-UNNAMED --add-opens java.base/jdk.internal.ref=ALL-UNNAMED --add-opens java.base/jdk.internal.reflect=ALL-UNNAMED --add-opens java.base/jdk.internal.math=ALL-UNNAMED --add-opens java.base/jdk.internal.module=ALL-UNNAMED --add-opens java.base/jdk.internal.util.jar=ALL-UNNAMED --add-opens jdk.management/com.sun.management.internal=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens java.base/sun.nio.ch=ALL-UNNAMED"

使用 Redis 接收器运行 Cassandra 连接器的基本 application.properties 示例

# Sink
debezium.sink.type=redis
debezium.sink.redis.address=localhost:6379

# Connector
debezium.source.connector.class=io.debezium.connector.cassandra.Cassandra4Connector
## node.id must be unique per each connector running on each Cassandra node
debezium.source.cassandra.node.id=sample_node_01
debezium.source.cassandra.hosts=127.0.0.1
debezium.source.cassandra.port=9042
debezium.source.cassandra.config=/opt/cassandra/conf/cassandra.yaml
debezium.source.commit.log.relocation.dir=cassandra/relocdir
debezium.source.offset.storage=io.debezium.server.redis.RedisOffsetBackingStore
debezium.source.topic.prefix=sample_prefix
## internal Cassandra http port
debezium.source.http.port=8040

操作代码转换

默认情况下,Cassandra 连接器有自己的操作代码,这些代码与 Debezium 操作代码不完全兼容。如有需要,可以在 Debezium Server 的 application.properties 中定义一个特定的转换来启用转换

debezium.transforms=EnvelopeTransformation
debezium.transforms.EnvelopeTransformation.type=io.debezium.connector.cassandra.transforms.EnvelopeTransformation

这将如下转换操作代码

INSERT "i"          -> CREATE "c"
UPDATE "u"          -> UPDATE "u"
DELETE "d"          -> DELETE "d"
RANGE_TOMBSTONE "r" -> TRUNCATE "t"