Debezium 支持
Debezium Engine,变更数据捕获 (CDC) 入站通道适配器。DebeziumMessageProducer
允许捕获数据库变更事件,将它们转换为消息并随后流式传输到出站通道。
你需要将 spring integration Debezium 依赖添加到你的项目中
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-debezium</artifactId>
<version>6.4.4</version>
</dependency>
compile "org.springframework.integration:spring-integration-debezium:6.4.4"
你还需要为你的输入数据库包含一个 Debezium 连接器 依赖。例如,要在 PostgreSQL 中使用 Debezium,你需要 postgres Debezium 连接器
-
Maven
-
Gradle
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-postgres</artifactId>
<version>${debezium-version}</version>
</dependency>
compile "io.debezium:debezium-connector-postgres:{debezium-version}"
将 |
Debezium 入站通道适配器
Debezium 适配器需要一个预先配置好的 DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>>
实例。
Debezium-supplier 提供了一个开箱即用的 |
Debezium Java DSL 可以从提供的 |
此外,DebeziumMessageProducer
可以通过以下配置属性进行调整
-
contentType
- 允许处理JSON
(默认)、AVRO
和PROTOBUF
消息内容。此contentType
必须
与提供的DebeziumEngine.Builder
配置的SerializationFormat
对齐。 -
enableBatch
- 当设置为false
(默认)时,Debezium 适配器会为从源数据库接收到的每个ChangeEvent
数据变更事件发送新的Message
。如果设置为true
,则适配器会为从 Debezium 引擎接收到的每批ChangeEvent
发送一个单独的Message
。这种 payload 不可序列化,需要自定义的序列化/反序列化实现。 -
enableEmptyPayload
- 启用对墓碑消息(即删除消息)的支持。在数据库行删除时,Debezium 可以发送一个墓碑变更事件,该事件的 key 与被删除行的 key 相同,值为Optional.empty
。默认为false
。 -
headerMapper
- 自定义HeaderMapper
实现,用于选择并转换ChangeEvent
头到Message
头。默认的DefaultDebeziumHeaderMapper
实现提供了setHeaderNamesToMap
的 setter 方法。默认情况下,所有头都会被映射。 -
taskExecutor
- 为 Debezium 引擎设置自定义TaskExecutor
。
以下代码片段演示了此通道适配器的各种配置
使用 Java 配置
以下 Spring Boot 应用展示了如何使用 Java 配置配置入站适配器
@SpringBootApplication
public class DebeziumJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(DebeziumJavaApplication.class)
.web(WebApplicationType.NONE)
.run(args);
}
@Bean
public MessageChannel debeziumInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer debeziumMessageProducer(
DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder,
MessageChannel debeziumInputChannel) {
DebeziumMessageProducer debeziumMessageProducer =
new DebeziumMessageProducer(debeziumEngineBuilder);
debeziumMessageProducer.setOutputChannel(debeziumInputChannel);
return debeziumMessageProducer;
}
@ServiceActivator(inputChannel = "debeziumInputChannel")
public void handler(Message<?> message) {
Object destination = message.getHeaders().get(DebeziumHeaders.DESTINATION); (1)
String key = new String((byte[]) message.getHeaders().get(DebeziumHeaders.KEY)); (2)
String payload = new String((byte[]) message.getPayload()); (3)
System.out.println("KEY: " + key + ", DESTINATION: " + destination + ", PAYLOAD: " + payload);
}
}
1 | 事件所针对的逻辑目标的名称。通常,目标由 topic.prefix 配置选项、数据库名称和表名称组成。例如:my-topic.inventory.orders 。 |
2 | 包含变更表的 key 的 schema 以及变更行的实际 key。key schema 及其对应的 key payload 都包含变更表中在连接器创建事件时 PRIMARY KEY (或唯一约束)中每个列的字段。 |
3 | 与 key 类似,payload 具有 schema 部分和 payload value 部分。schema 部分包含描述 payload value 部分的 Envelope 结构的 schema,包括其嵌套字段。创建、更新或删除数据的操作产生的变更事件都具有带 Envelope 结构的 value payload。 |
|
类似地,我们可以配置 DebeziumMessageProducer
以批处理方式处理传入的变更事件
@Bean
public MessageProducer debeziumMessageProducer(
DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder,
MessageChannel debeziumInputChannel) {
DebeziumMessageProducer debeziumMessageProducer = new DebeziumMessageProducer(debeziumEngineBuilder);
debeziumMessageProducer.setEnableBatch(true);
debeziumMessageProducer.setOutputChannel(debeziumInputChannel);
return debeziumMessageProducer;
}
@ServiceActivator(inputChannel = "debeziumInputChannel")
public void handler(List<ChangeEvent<Object, Object>> payload) {
System.out.println(payload);
}
Debezium Java DSL 支持
spring-integration-debezium
通过 Debezium
工厂和 DebeziumMessageProducerSpec
实现提供了方便的 Java DSL 流式 API。
Debezium Java DSL 的入站通道适配器是
DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder = ...
IntegrationFlow.from(
Debezium.inboundChannelAdapter(debeziumEngineBuilder)
.headerNames("special*")
.contentType("application/json")
.enableBatch(false))
.handle(m -> System.out.println(new String((byte[]) m.getPayload())))
或者从原生 Debezium 配置属性创建一个 DebeziumMessageProducerSpec
实例,并默认使用 JSON
序列化格式。
Properties debeziumConfig = ...
IntegrationFlow
.from(Debezium.inboundChannelAdapter(debeziumConfig))
.handle(m -> System.out.println(new String((byte[]) m.getPayload())))
以下 Spring Boot 应用提供了一个使用 Java DSL 配置入站适配器的示例
@SpringBootApplication
public class DebeziumJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(DebeziumJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow debeziumInbound(
DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder) {
return IntegrationFlow
.from(Debezium
.inboundChannelAdapter(debeziumEngineBuilder)
.headerNames("special*")
.contentType("application/json")
.enableBatch(false))
.handle(m -> System.out.println(new String((byte[]) m.getPayload())))
.get();
}
}