入站通道适配器
通常,消息流从入站通道适配器(例如 <int-jdbc:inbound-channel-adapter>
)开始。适配器配置了 <poller>
,并要求 MessageSource<?>
定期生成消息。Java DSL 也允许从 MessageSource<?>
开始一个 IntegrationFlow
。为此,IntegrationFlow
流式 API 提供了一个重载的 IntegrationFlow.from(MessageSource<?> messageSource)
方法。您可以将 MessageSource<?>
配置为一个 bean,并将其作为该方法的参数提供。IntegrationFlow.from()
的第二个参数是一个 Consumer<SourcePollingChannelAdapterSpec>
lambda,它允许您为 SourcePollingChannelAdapter
提供选项(例如 PollerMetadata
或 SmartLifecycle
)。以下示例展示了如何使用流式 API 和 lambda 来创建 IntegrationFlow
@Bean
public MessageSource<Object> jdbcMessageSource() {
return new JdbcPollingChannelAdapter(this.dataSource, "SELECT * FROM something");
}
@Bean
public IntegrationFlow pollingFlow() {
return IntegrationFlow.from(jdbcMessageSource(),
c -> c.poller(Pollers.fixedRate(100).maxMessagesPerPoll(1)))
.transform(Transformers.toJson())
.channel("furtherProcessChannel")
.get();
}
对于那些没有直接构建 Message
对象要求的场景,您可以使用基于 java.util.function.Supplier
的 IntegrationFlow.fromSupplier()
变体。Supplier.get()
的结果会被自动封装到 Message
中(如果它本身不是 Message
)。