Apache Camel 支持
Spring Integration 提供了 API 和配置,用于与在同一应用上下文中声明的 Apache Camel 端点进行通信。
你需要将此依赖项添加到你的项目中
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-camel</artifactId>
<version>6.4.4</version>
</dependency>
compile "org.springframework.integration:spring-integration-camel:6.4.4"
Spring Integration 和 Apache Camel 都实现了企业集成模式,并提供了方便的方式来组合它们,但这两个项目在 API 和抽象实现上采用了不同的方法。Spring Integration 完全依赖于 Spring Core 的依赖注入容器。它使用许多其他 Spring 项目(如 Spring Data、Spring AMQP、Spring for Apache Kafka 等)来实现其通道适配器。它还使用 MessageChannel
抽象作为一级公民,开发者在组合其集成流时需要了解这一点。另一方面,Apache Camel 不提供消息通道的一级公民抽象,并建议通过隐藏在 API 背后的内部交换来组合其路由。此外,它还需要一些额外的依赖项和配置才能在 Spring 应用中使用。
即使最终的企业集成解决方案的各个部分如何实现并不重要,开发者体验和高生产力也是需要考虑的因素。因此,开发者可能会出于许多原因选择一个框架而不是另一个,或者在某些目标系统支持存在空白时两者都使用。Spring Integration 和 Apache Camel 应用可以通过许多外部协议互相交互,它们都为这些协议实现了通道适配器。例如,一个 Spring Integration 流可以将记录发布到 Apache Kafka 主题,该主题由消费者端的 Apache Camel 端点消费。或者,一个 Apache Camel 路由可以将数据写入 SFTP 文件目录,该目录由 Spring Integration 的 SFTP 入站通道适配器轮询。或者,在同一个 Spring 应用上下文中,它们可以通过 ApplicationEvent
抽象进行通信。
为了简化开发过程并避免不必要的网络跳跃,Apache Camel 提供了一个模块,用于通过消息通道与 Spring Integration 进行通信。所需的只是应用上下文中的 MessageChannel
引用,用于发送或消费消息。当 Apache Camel 路由是消息流的发起者,而 Spring Integration 仅作为解决方案的一部分扮演支持角色时,这种方式效果很好。
为了提供类似的开发者体验,Spring Integration 现在提供了一个通道适配器,用于调用 Apache Camel 端点,并且可选地等待回复。没有入站通道适配器,因为从 Spring Integration API 和抽象的角度来看,订阅 MessageChannel
来消费 Apache Camel 消息就足够了。
Apache Camel 出站通道适配器
CamelMessageHandler
是 AbstractReplyProducingMessageHandler
的实现,可以工作在单向(默认)和请求-回复两种模式下。它使用 org.apache.camel.ProducerTemplate
向 org.apache.camel.Endpoint
发送(或发送并接收)。交互模式可以通过 ExchangePattern
选项控制(该选项可以在运行时通过 SpEL 表达式对请求消息进行评估)。目标 Apache Camel 端点可以明确配置,或配置为在运行时评估的 SpEL 表达式。否则,它会回退到 ProducerTemplate
上提供的 defaultEndpoint
。除了指定端点,还可以提供内联的、明确的 LambdaRouteBuilder
,例如,用于调用 Spring Integration 中没有通道适配器支持的 Apache Camel 组件。
此外,可以提供一个 HeaderMapper<org.apache.camel.Message>
(CamelHeaderMapper
是默认实现),用于确定在 Spring Integration 和 Apache Camel 消息之间映射哪些消息头。默认情况下,所有消息头都会被映射。
CamelMessageHandler
支持 async
模式,它调用 ProducerTemplate.asyncSend()
并生成一个 CompletableFuture
用于回复处理(如果存在回复)。
exchangeProperties
可以通过 SpEL 表达式进行自定义,该表达式必须评估为一个 Map
。
如果未提供 ProducerTemplate
,则会通过从应用上下文中解析的 CamelContext
bean 创建一个。
@Bean
@ServiceActivator(inputChannel = "sendToCamel")
CamelMessageHandler camelService(ProducerTemplate producerTemplate) {
CamelHeaderMapper headerMapper = new CamelHeaderMapper();
headerMapper.setOutboundHeaderNames("");
headerMapper.setInboundHeaderNames("testHeader");
CamelMessageHandler camelMessageHandler = new CamelMessageHandler(producerTemplate);
camelMessageHandler.setEndpointUri("direct:simple");
camelMessageHandler.setExchangePatternExpression(spelExpressionParser.parseExpression("headers.exchangePattern"));
camelMessageHandler.setHeaderMapper(headerMapper);
return camelMessageHandler;
}
对于 Java DSL 流定义,此通道适配器可以通过 Camel
工厂提供的几种变体进行配置
@Bean
IntegrationFlow camelFlow() {
return f -> f
.handle(Camel.gateway().endpointUri("direct:simple"))
.handle(Camel.route(this::camelRoute))
.handle(Camel.handler().endpointUri("log:com.mycompany.order?level=WARN"));
}
private void camelRoute(RouteBuilder routeBuilder) {
routeBuilder.from("direct:inbound").transform(routeBuilder.simple("${body.toUpperCase()}"));
}