SchemaChangeEventFilter
使用 SchemaChangeEventFilter 单消息转换 (SMT) 来过滤连接器从表中捕获并发送到 Kafka 的 schema 更改事件。启用 SMT 后,连接器只会将未过滤的 DDL 事件同步到 Kafka,从而让您能够控制哪些 DDL 操作暴露给消费应用程序。
Schema change event filter 配置
要使用此转换,请将 SchemaChangeEventFilter 添加到连接器配置中,并指定要移除的事件类型。以下连接器配置摘录显示了如何将转换添加到连接器,并将其设置为移除连接器捕获的 DDL 消息中的 DROP 和 TRUNCATE 事件。
用于移除
DROP 和 TRUNCATE 事件的 Schema change event filter 配置transforms=filterTableDropAndTruncateEvent
transforms.filterTableDropAndTruncateEvent.type=io.debezium.transforms.SchemaChangeEventFilter
transforms.filterTableDropAndTruncateEvent.schema.change.event.exclude.list=DROP,TRUNCATE
有关配置此转换的更多信息,请参阅 schema change event filter 配置选项。
应用 SchemaChangeEventFilter SMT 的效果
为连接器配置 SMT 后,每当连接器捕获到与 配置的类型匹配的 schema 更改事件时,它会将这些事件的值重置为 null。连接器不会将具有由此产生的 null 值 的事件发送到 Kafka。
以下示例展示了 SMT 处理前的 schema 更改事件记录,以及 SMT 过滤事件后的 输出。
示例 1.
SchemaChangeEventFilter 转换处理前的 schema 更改事件记录{
"schema":{
"type":"struct",
"fields":[
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"field":"databaseName"
},
{
"type":"string",
"optional":true,
"field":"schemaName"
},
{
"type":"string",
"optional":true,
"field":"ddl"
},
{
"type":"array",
"items":{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"type"
},
{
"type":"string",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"io.debezium.connector.schema.Change"
},
"optional":false,
"field":"tableChanges"
}
],
"optional":false,
"name":"filter.SchemaChangeValue"
},
"payload":{
"ts_ms":1691035505397,
"databaseName":"test",
"schemaName":"test_schema",
"ddl":"",
"tableChanges":[
{
"type":"DROP",
"id":"test.table"
}
]
}
}
根据前面的 配置示例,在 SMT 处理完 原始 schema 更改事件 后,它会将 DROP 事件的值重置为 null,如下例所示:
示例 2. SMT 处理后的 schema 更改事件记录
null