Redis 支持
Spring Integration 2.1 引入了对 Redis 的支持:“一个开源的高级键值存储”。这种支持以基于 Redis 的 MessageStore 以及发布-订阅消息适配器的形式出现,这些适配器通过 Redis 的 PUBLISH、SUBSCRIBE 和 UNSUBSCRIBE 命令得到支持。
项目需要此依赖项
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-redis</artifactId>
<version>7.0.0</version>
</dependency>
compile "org.springframework.integration:spring-integration-redis:7.0.0"
必须包含 Redis 客户端依赖,例如 Lettuce。
要下载、安装和运行 Redis,请参阅 Redis 文档。
连接到 Redis
要开始与 Redis 交互,首先必须获得一个连接。Spring Integration 使用另一个 Spring 项目 Spring Data Redis 提供的支持,该项目提供了典型的 Spring 构造:ConnectionFactory 和 Template。这些抽象简化了与多个 Redis 客户端 Java API 的集成。目前,Spring Data Redis 支持 Jedis 和 Lettuce。
使用 RedisConnectionFactory
来自 Spring Data Redis 的 RedisConnectionFactory 是用于管理与 Redis 连接的高级抽象。以下列表显示了接口定义
public interface RedisConnectionFactory extends PersistenceExceptionTranslator {
/**
* Provides a suitable connection for interacting with Redis.
* @return connection for interacting with Redis.
*/
RedisConnection getConnection();
}
以下示例展示了如何在 Java 中创建 LettuceConnectionFactory
LettuceConnectionFactory cf = new LettuceConnectionFactory();
cf.afterPropertiesSet();
以下示例展示了如何在 Spring 的 XML 配置中创建 LettuceConnectionFactory
<bean id="redisConnectionFactory"
class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
<property name="port" value="7379" />
</bean>
RedisConnectionFactory 的实现提供了一组属性,例如端口和主机。一旦 RedisConnectionFactory 实例存在,就可以创建 RedisTemplate。
使用 RedisTemplate
与 Spring 中的其他模板类(如 JdbcTemplate 和 JmsTemplate)一样,RedisTemplate 是一个帮助类,它简化了 Redis 数据访问代码。有关 RedisTemplate 及其变体(如 StringRedisTemplate)的更多信息,请参阅 Spring Data Redis 文档。
以下示例展示了如何在 Java 中创建 RedisTemplate 实例
RedisTemplate rt = new RedisTemplate<String, Object>();
rt.setConnectionFactory(redisConnectionFactory);
以下示例展示了如何在 Spring 的 XML 配置中创建 RedisTemplate 实例
<bean id="redisTemplate"
class="org.springframework.data.redis.core.RedisTemplate">
<property name="connectionFactory" ref="redisConnectionFactory"/>
</bean>
使用 Redis 进行消息传递
如 简介 中所述,Redis 通过其 PUBLISH、SUBSCRIBE 和 UNSUBSCRIBE 命令提供发布-订阅消息支持。与 JMS 和 AMQP 一样,Spring Integration 提供消息通道和适配器,用于通过 Redis 发送和接收消息。
Redis 发布/订阅通道
与 JMS 类似,在某些情况下,生产者和消费者都旨在成为同一应用程序的一部分,并在同一进程中运行。这可以通过一对入站和出站通道适配器来完成。然而,与 Spring Integration 的 JMS 支持一样,有一种更简单的方法来解决此用例。相反,可以使用发布-订阅通道,如下例所示
<int-redis:publish-subscribe-channel id="redisChannel" topic-name="si.test.topic"/>
publish-subscribe-channel 的行为与主 Spring Integration 命名空间中的普通 <publish-subscribe-channel/> 元素非常相似。它可以由任何端点的 input-channel 和 output-channel 属性引用。不同之处在于,此通道由 Redis 主题名称支持:由 topic-name 属性指定的 String 值。然而,与 JMS 不同,此主题不必提前创建,甚至不必由 Redis 自动创建。在 Redis 中,主题是简单的 String 值,扮演地址的角色。生产者和消费者可以使用相同的 String 值作为其主题名称进行通信。对此通道的简单订阅意味着在生产端点和消费端点之间可以进行异步发布-订阅消息传递。然而,与通过在简单 Spring Integration <channel/> 元素中添加 <queue/> 元素创建的异步消息通道不同,消息不存储在内存队列中。相反,这些消息通过 Redis 传递,这使得我们可以依赖其对持久性和集群的支持以及与其他非 Java 平台的互操作性。
Redis 入站通道适配器
Redis 入站通道适配器 (RedisInboundChannelAdapter) 以与其他入站适配器相同的方式将传入的 Redis 消息适配到 Spring 消息中。它接收平台特定消息(本例中为 Redis)并通过使用 MessageConverter 策略将其转换为 Spring 消息。以下示例展示了如何配置 Redis 入站通道适配器
<int-redis:inbound-channel-adapter id="redisAdapter"
topics="thing1, thing2"
channel="receiveChannel"
error-channel="testErrorChannel"
message-converter="testConverter" />
<bean id="redisConnectionFactory"
class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
<property name="port" value="7379" />
</bean>
<bean id="testConverter" class="things.something.SampleMessageConverter" />
前面的示例展示了 Redis 入站通道适配器的简单而完整的配置。请注意,前面的配置依赖于熟悉的 Spring 范例:自动发现某些 bean。在本例中,redisConnectionFactory 被隐式注入到适配器中。或者,可以通过 connection-factory 属性注入自定义 RedisConnectionFactory。
此外,请注意,前面的配置为适配器注入了一个自定义 MessageConverter。该方法类似于 JMS,其中 MessageConverter 实例用于在 Redis 消息和 Spring Integration 消息有效负载之间进行转换。默认值为 SimpleMessageConverter。
入站适配器可以订阅多个主题名称,因此 topics 属性中是逗号分隔的值集。
从 3.0 版开始,入站适配器除了现有的 topics 属性外,现在还具有 topic-patterns 属性。此属性包含一个逗号分隔的 Redis 主题模式集。有关 Redis 发布-订阅的更多信息,请参阅 Redis 发布/订阅。
入站适配器可以使用 RedisSerializer 反序列化 Redis 消息的正文。<int-redis:inbound-channel-adapter> 的 serializer 属性可以设置为空字符串,这将导致 RedisSerializer 属性的值为 null。在这种情况下,Redis 消息的原始 byte[] 正文作为消息有效负载提供。
从 5.0 版开始,可以通过使用 <int-redis:inbound-channel-adapter> 的 task-executor 属性将 Executor 实例注入到入站适配器中。此外,收到的 Spring Integration 消息现在具有 RedisHeaders.MESSAGE_SOURCE 头,以指示已发布消息的来源:主题或模式。这可以在下游用于路由逻辑。
Redis 出站通道适配器
Redis 出站通道适配器以与其他出站适配器相同的方式将传出的 Spring Integration 消息适配到 Redis 消息中。它接收 Spring Integration 消息并通过使用 MessageConverter 策略将其转换为平台特定消息(本例中为 Redis)。以下示例展示了如何配置 Redis 出站通道适配器
<int-redis:outbound-channel-adapter id="outboundAdapter"
channel="sendChannel"
topic="thing1"
message-converter="testConverter"/>
<bean id="redisConnectionFactory"
class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
<property name="port" value="7379"/>
</bean>
<bean id="testConverter" class="things.something.SampleMessageConverter" />
该配置与 Redis 入站通道适配器并行。适配器被隐式注入 RedisConnectionFactory,该工厂的 bean 名称为 redisConnectionFactory。此示例还包括可选(和自定义)的 MessageConverter(testConverter bean)。
自 Spring Integration 3.0 起,<int-redis:outbound-channel-adapter> 提供了 topic 属性的替代方案:存在 topic-expression 属性,用于在运行时确定消息的 Redis 主题。这些属性是互斥的。
Redis 队列入站通道适配器
Spring Integration 3.0 引入了队列入站通道适配器,用于从 Redis 列表中“弹出”消息。默认情况下,它使用“右弹出”,但可以配置为使用“左弹出”。该适配器是消息驱动的。它使用内部侦听器线程,不使用轮询器。
以下列表显示了 queue-inbound-channel-adapter 的所有可用属性
<int-redis:queue-inbound-channel-adapter id="" (1)
channel="" (2)
auto-startup="" (3)
phase="" (4)
connection-factory="" (5)
queue="" (6)
error-channel="" (7)
serializer="" (8)
receive-timeout="" (9)
recovery-interval="" (10)
expect-message="" (11)
task-executor="" (12)
right-pop=""/> (13)
| 1 | 组件 bean 名称。如果未提供 channel 属性,则会创建一个 DirectChannel 并使用此 id 属性作为 bean 名称在应用程序上下文中注册。在这种情况下,端点本身将以 bean 名称 id 加上 .adapter 进行注册。(如果 bean 名称是 thing1,则端点注册为 thing1.adapter。) |
| 2 | 此端点向其发送 Message 实例的 MessageChannel。 |
| 3 | 一个 SmartLifecycle 属性,用于指定此端点是否应在应用程序上下文启动后自动启动。它默认为 true。 |
| 4 | 一个 SmartLifecycle 属性,用于指定此端点启动的阶段。它默认为 0。 |
| 5 | 对 RedisConnectionFactory bean 的引用。它默认为 redisConnectionFactory。 |
| 6 | 执行基于队列的“pop”操作以获取 Redis 消息的 Redis 列表的名称。 |
| 7 | 当从端点的侦听任务接收到异常时,将 ErrorMessage 实例发送到的 MessageChannel。默认情况下,底层 MessagePublishingErrorHandler 使用应用程序上下文中的默认 errorChannel。 |
| 8 | RedisSerializer bean 引用。它可以是一个空字符串,表示“无序列化器”。在这种情况下,来自入站 Redis 消息的原始 byte[] 将作为 Message 有效负载发送到 channel。默认情况下,它是 JdkSerializationRedisSerializer。 |
| 9 | “pop”操作等待来自队列的 Redis 消息的超时时间(以毫秒为单位)。默认值为 1 秒。 |
| 10 | “pop”操作发生异常后,侦听器任务应休眠的时间(以毫秒为单位),然后重新启动侦听器任务。 |
| 11 | 指定此端点是否期望 Redis 队列中的数据包含完整的 Message 实例。如果此属性设置为 true,则 serializer 不能是空字符串,因为消息需要某种形式的反序列化(默认情况下为 JDK 序列化)。其默认值为 false。 |
| 12 | 对 Spring TaskExecutor(或标准 JDK 1.5+ Executor)bean 的引用。它用于底层侦听任务。它默认为 SimpleAsyncTaskExecutor。 |
| 13 | 指定此端点是应使用“右弹出”(当为 true 时)还是“左弹出”(当为 false 时)从 Redis 列表读取消息。如果为 true,则当与默认 Redis 队列出站通道适配器一起使用时,Redis 列表充当 FIFO 队列。将其设置为 false 以与使用“右推”写入列表的软件一起使用或实现堆栈状消息顺序。其默认值为 true。自 4.3 版起。 |
task-executor 必须配置多个线程进行处理;否则,当 RedisQueueMessageDrivenEndpoint 尝试在错误后重新启动侦听器任务时,可能会出现死锁。errorChannel 可用于处理这些错误,以避免重新启动,但最好不要使应用程序暴露于可能的死锁情况。有关可能的 TaskExecutor 实现,请参阅 Spring Framework 参考手册。 |
Redis 队列出站通道适配器
Spring Integration 3.0 引入了队列出站通道适配器,用于将 Spring Integration 消息“推入”Redis 列表。默认情况下,它使用“左推”,但可以配置为使用“右推”。以下列表显示了 Redis queue-outbound-channel-adapter 的所有可用属性
<int-redis:queue-outbound-channel-adapter id="" (1)
channel="" (2)
connection-factory="" (3)
queue="" (4)
queue-expression="" (5)
serializer="" (6)
extract-payload="" (7)
left-push=""/> (8)
| 1 | 组件 bean 名称。如果未提供 channel 属性,则会创建一个 DirectChannel 并使用此 id 属性作为 bean 名称在应用程序上下文中注册。在这种情况下,端点将以 bean 名称 id 加上 .adapter 进行注册。(如果 bean 名称是 thing1,则端点注册为 thing1.adapter。) |
| 2 | 此端点从中接收 Message 实例的 MessageChannel。 |
| 3 | 对 RedisConnectionFactory bean 的引用。它默认为 redisConnectionFactory。 |
| 4 | 执行基于队列的“push”操作以发送 Redis 消息的 Redis 列表的名称。此属性与 queue-expression 互斥。 |
| 5 | 一个 SpEL Expression,用于确定 Redis 列表的名称。它在运行时使用传入的 Message 作为 #root 变量。此属性与 queue 互斥。 |
| 6 | 一个 RedisSerializer bean 引用。它默认为 JdkSerializationRedisSerializer。但是,对于 String 有效负载,如果未提供 serializer 引用,则使用 StringRedisSerializer。 |
| 7 | 指定此端点是应仅发送有效负载还是发送整个 Message 到 Redis 队列。它默认为 true。 |
| 8 | 指定此端点是应使用“左推”(当为 true 时)还是“右推”(当为 false 时)将消息写入 Redis 列表。如果为 true,则当与默认 Redis 队列入站通道适配器一起使用时,Redis 列表充当 FIFO 队列。将其设置为 false 以与使用“左弹出”从列表读取的软件一起使用或实现堆栈状消息顺序。它默认为 true。自 4.3 版起。 |
Redis 应用程序事件
自 Spring Integration 3.0 起,Redis 模块提供了 IntegrationEvent 的实现,它反过来又是 org.springframework.context.ApplicationEvent。RedisExceptionEvent 封装了 Redis 操作中的异常(端点是事件的“源”)。例如,<int-redis:queue-inbound-channel-adapter/> 在从 BoundListOperations.rightPop 操作捕获异常后发出这些事件。异常可以是任何泛型 org.springframework.data.redis.RedisSystemException 或 org.springframework.data.redis.RedisConnectionFailureException。使用 <int-event:inbound-channel-adapter/> 处理这些事件对于确定后台 Redis 任务的问题并采取管理措施很有用。
Redis 消息存储
如《企业集成模式》(EIP) 一书所述,消息存储 允许持久化消息。当处理具有缓冲消息能力的组件(聚合器、重新排序器等)并且可靠性是问题时,这可能很有用。在 Spring Integration 中,MessageStore 策略还为 EIP 中描述的 索赔检查 模式提供了基础。
Spring Integration 的 Redis 模块提供了 RedisMessageStore。以下示例展示了如何将其与聚合器一起使用
<bean id="redisMessageStore" class="o.s.i.redis.store.RedisMessageStore">
<constructor-arg ref="redisConnectionFactory"/>
</bean>
<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
message-store="redisMessageStore"/>
前面的示例是一个 bean 配置,它需要一个 RedisConnectionFactory 作为构造函数参数。
默认情况下,RedisMessageStore 使用 Java 序列化来序列化消息。但是,如果需要不同的序列化技术(例如 JSON),则可以将自定义序列化器设置到 RedisMessageStore 的 valueSerializer 属性中。
框架为 Message 实例和 MessageHeaders 实例提供了 Jackson 序列化器和反序列化器实现——分别是 MessageJsonDeserializer 和 MessageHeadersJsonSerializer。它们必须使用 SimpleModule 选项为 ObjectMapper 进行配置。此外,ObjectMapper 上应设置 enableDefaultTyping 以添加每个序列化复杂对象的类型信息。该 type 信息随后在反序列化期间使用。框架提供了一个名为 JacksonMessagingUtils.messagingAwareMapper() 的实用方法,它已经提供了前面提到的所有属性和序列化器。此实用方法带有 trustedPackages 参数,用于限制 Java 包的反序列化以避免安全漏洞。默认的可信包:java.util、java.lang、org.springframework.messaging.support、org.springframework.integration.support、org.springframework.integration.message、org.springframework.integration.store。要在 RedisMessageStore 中管理 JSON 序列化,必须应用如下配置
RedisMessageStore store = new RedisMessageStore(redisConnectionFactory);
ObjectMapper mapper = JacksonMessagingUtils.messagingAwareMapper();
RedisSerializer<Object> serializer = new GenericJackson3JsonRedisSerializer(mapper);
store.setValueSerializer(serializer);
从 4.3.12 版本开始,RedisMessageStore 支持 prefix 选项,允许区分同一 Redis 服务器上的存储实例。
Redis 通道消息存储
前面 展示的 RedisMessageStore 将每个组作为单个键(组 ID)下的值进行维护。虽然 QueueChannel 可以用于持久化,但为此目的提供了一个专门的 RedisChannelMessageStore(自 4.0 版起)。此存储为每个通道使用一个 LIST,发送消息时使用 LPUSH,接收消息时使用 RPOP。默认情况下,此存储也使用 JDK 序列化,但可以修改其值序列化器,如 前面所述。
建议使用支持存储的通道,而不是使用通用的 RedisMessageStore。以下示例定义了一个 Redis 消息存储并在带有队列的通道中使用它
<bean id="redisMessageStore" class="o.s.i.redis.store.RedisChannelMessageStore">
<constructor-arg ref="redisConnectionFactory"/>
</bean>
<int:channel id="somePersistentQueueChannel">
<int:queue message-store="redisMessageStore"/>
<int:channel>
用于存储数据的键的形式为:<storeBeanName>:<channelId>(在前面的示例中为 redisMessageStore:somePersistentQueueChannel)。
此外,还提供了子类 RedisChannelPriorityMessageStore。当与 QueueChannel 一起使用时,消息按(FIFO)优先级顺序接收。它使用标准的 IntegrationMessageHeaderAccessor.PRIORITY 头并支持优先级值 (0 - 9)。具有其他优先级(和没有优先级)的消息在任何具有优先级的消息之后按 FIFO 顺序检索。
这些存储仅实现 BasicMessageGroupStore,不实现 MessageGroupStore。它们只能用于支持 QueueChannel 等情况。 |
Redis 元数据存储
Spring Integration 3.0 引入了一个新的基于 Redis 的 MetadataStore(请参阅 元数据存储)实现。RedisMetadataStore 可用于在应用程序重新启动后维护 MetadataStore 的状态。这样的 MetadataStore 实现可以与以下适配器一起使用
要指示这些适配器使用新的 RedisMetadataStore,请声明一个名为 metadataStore 的 Spring bean。Feed 入站通道适配器和 feed 入站通道适配器都会自动拾取并使用声明的 RedisMetadataStore。以下示例展示了如何声明此类 bean
<bean name="metadataStore" class="o.s.i.redis.store.metadata.RedisMetadataStore">
<constructor-arg name="connectionFactory" ref="redisConnectionFactory"/>
</bean>
RedisMetadataStore 由 RedisProperties 支持。与其交互使用 BoundHashOperations,这反过来又需要整个 Properties 存储的 key。在 MetadataStore 的情况下,此 key 扮演区域的角色,这在分布式环境中很有用,当多个应用程序使用同一个 Redis 服务器时。默认情况下,此 key 的值为 MetaData。
从 4.0 版本开始,此存储实现了 ConcurrentMetadataStore,允许它在多个应用程序实例之间可靠地共享,其中只有一个实例允许存储或修改键的值。
RedisMetadataStore.replace() 不能与 Redis 集群一起使用(例如,在 AbstractPersistentAcceptOnceFileListFilter 中),因为目前不支持用于原子性的 WATCH 命令。 |
Redis 存储入站通道适配器
Redis 存储入站通道适配器是一个轮询消费者,它从 Redis 集合读取数据并将其作为 Message 有效负载发送。以下示例展示了如何配置 Redis 存储入站通道适配器
<int-redis:store-inbound-channel-adapter id="listAdapter"
connection-factory="redisConnectionFactory"
key="myCollection"
channel="redisChannel"
collection-type="LIST" >
<int:poller fixed-rate="2000" max-messages-per-poll="10"/>
</int-redis:store-inbound-channel-adapter>
前面的示例展示了如何使用 store-inbound-channel-adapter 元素配置 Redis 存储入站通道适配器,为各种属性提供值,例如
-
key或key-expression:正在使用的集合的键名。 -
collection-type:此适配器支持的集合类型枚举。支持的集合有LIST、SET、ZSET、PROPERTIES和MAP。 -
connection-factory:对o.s.data.redis.connection.RedisConnectionFactory实例的引用。 -
redis-template:对o.s.data.redis.core.RedisTemplate实例的引用。 -
所有入站适配器共有的其他属性(例如“channel”)。
redis-template 和 connection-factory 是互斥的。 |
|
默认情况下,适配器使用
|
由于 key 具有字面值,因此前面的示例相对简单和静态。有时,键的值必须根据某些条件在运行时更改。为此,请改用 key-expression,其中提供的表达式可以是任何有效的 SpEL 表达式。
此外,可以对从 Redis 集合读取的成功处理的数据进行一些后处理。例如,值在处理后可能会被移动或删除。事务同步功能可用于此类逻辑。以下示例使用 key-expression 和事务同步
<int-redis:store-inbound-channel-adapter id="zsetAdapterWithSingleScoreAndSynchronization"
connection-factory="redisConnectionFactory"
key-expression="'presidents'"
channel="otherRedisChannel"
auto-startup="false"
collection-type="ZSET">
<int:poller fixed-rate="1000" max-messages-per-poll="2">
<int:transactional synchronization-factory="syncFactory"/>
</int:poller>
</int-redis:store-inbound-channel-adapter>
<int:transaction-synchronization-factory id="syncFactory">
<int:after-commit expression="payload.removeByScore(18, 18)"/>
</int:transaction-synchronization-factory>
<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager"/>
轮询器可以通过使用 transactional 元素进行事务处理。此元素可以引用真正的事务管理器,例如,如果流的其他部分调用 JDBC。如果没有“真正的”事务,则可以使用 o.s.i.transaction.PseudoTransactionManager,它是 Spring 的 PlatformTransactionManager 的实现,并在没有实际事务时启用 Redis 适配器的事务同步功能。
| 这并不会使 Redis 活动本身具有事务性。它允许在成功(提交)之前或之后或在失败(回滚)之后进行操作同步。 |
一旦轮询器是事务性的,就可以在 transactional 元素上添加 o.s.i.transaction.TransactionSynchronizationFactory 实例。TransactionSynchronizationFactory 创建 TransactionSynchronization 实例。为方便起见,公开了一个默认的基于 SpEL 的 TransactionSynchronizationFactory,它允许配置 SpEL 表达式,其执行与事务协调(同步)。支持提交前、提交后和回滚后的表达式,以及发送评估结果(如果有)的通道(每种事件一个)。对于每个子元素,可以指定 expression 和 channel 属性。如果只存在 channel 属性,则收到的消息作为特定同步场景的一部分发送到该通道。如果只存在 expression 属性,并且表达式的结果是非 null 值,则会生成一个以结果作为有效负载的消息并发送到默认通道 (NullChannel) 并出现在日志中(在 DEBUG 级别)。如果表达式的结果为 null 或 void,则不生成消息。
RedisStoreMessageSource 添加了一个 store 属性,其中包含一个绑定到事务 IntegrationResourceHolder 的 RedisStore 实例,可以从 TransactionSynchronizationProcessor 实现访问。
有关事务同步的更多信息,请参阅 事务同步。
RedisStore 出站通道适配器
RedisStore 出站通道适配器允许将消息有效负载写入 Redis 集合,如下例所示
<int-redis:store-outbound-channel-adapter id="redisListAdapter"
collection-type="LIST"
channel="requestChannel"
key="myCollection" />
前面的配置通过使用 store-inbound-channel-adapter 元素配置 Redis 存储出站通道适配器。它为各种属性提供值,例如
-
key或key-expression:正在使用的集合的键名。 -
extract-payload-elements:如果设置为true(默认值)并且有效负载是“多值”对象(即Collection或Map)的实例,则使用“addAll”和“putAll”语义进行存储。否则,如果设置为false,则有效负载作为单个条目存储,无论其类型如何。如果有效负载不是“多值”对象的实例,则此属性的值将被忽略,并且有效负载始终作为单个条目存储。 -
collection-type:此适配器支持的Collection类型枚举。支持的集合有LIST、SET、ZSET、PROPERTIES和MAP。 -
map-key-expression:返回正在存储的条目的键名的 SpEL 表达式。它仅在collection-type是MAP或PROPERTIES并且“extract-payload-elements”为 false 时适用。 -
connection-factory:对o.s.data.redis.connection.RedisConnectionFactory实例的引用。 -
redis-template:对o.s.data.redis.core.RedisTemplate实例的引用。 -
所有入站适配器共有的其他属性(例如“channel”)。
redis-template 和 connection-factory 是互斥的。 |
默认情况下,适配器使用 StringRedisTemplate。它将 StringRedisSerializer 实例用于键、值、哈希键和哈希值。但是,如果 extract-payload-elements 设置为 false,则将使用一个 RedisTemplate,它对键和哈希键使用 StringRedisSerializer 实例,对值和哈希值使用 JdkSerializationRedisSerializer 实例。使用 JDK 序列化器时,重要的是要了解 Java 序列化用于所有值,无论该值实际上是否是集合。如果需要对值的序列化进行更多控制,则可以提供自定义 RedisTemplate,而不是依赖这些默认值。 |
由于 key 和其他属性具有字面值,因此前面的示例相对简单和静态。有时,这些值可能会根据某些条件在运行时动态更改。为此,提供了它们的 -expression 等效项(key-expression、map-key-expression 等),其中表达式可以是任何有效的 SpEL 表达式。
Redis 出站命令网关
Spring Integration 4.0 引入了 Redis 命令网关,允许使用通用的 RedisConnection#execute 方法执行任何标准 Redis 命令。以下列表显示了 Redis 出站网关的可用属性
<int-redis:outbound-gateway
request-channel="" (1)
reply-channel="" (2)
requires-reply="" (3)
reply-timeout="" (4)
connection-factory="" (5)
redis-template="" (6)
arguments-serializer="" (7)
command-expression="" (8)
argument-expressions="" (9)
use-command-variable="" (10)
arguments-strategy="" /> (11)
| 1 | 此端点从中接收 Message 实例的 MessageChannel。 |
| 2 | 此端点发送回复 Message 实例的 MessageChannel。 |
| 3 | 指定此出站网关是否必须返回非 null 值。它默认为 true。当 Redis 返回 null 值时,将抛出 ReplyRequiredException。 |
| 4 | 等待发送回复消息的超时时间(以毫秒为单位)。它通常用于基于队列的有限回复通道。 |
| 5 | 对 RedisConnectionFactory bean 的引用。它默认为 redisConnectionFactory。它与 redis-template 属性互斥。 |
| 6 | 对 RedisTemplate bean 的引用。它与 connection-factory 属性互斥。 |
| 7 | 对 org.springframework.data.redis.serializer.RedisSerializer 实例的引用。如有必要,它用于将每个命令参数序列化为 byte[]。 |
| 8 | 返回命令键的 SpEL 表达式。它默认为 redis_command 消息头。它不得评估为 null。 |
| 9 | 逗号分隔的 SpEL 表达式,它们被评估为命令参数。与 arguments-strategy 属性互斥。如果未提供任何属性,则 payload 用作命令参数。参数表达式可以评估为“null”以支持可变数量的参数。 |
| 10 | 一个 boolean 标志,用于指定在配置 argument-expressions 时,评估的 Redis 命令字符串是否在 o.s.i.redis.outbound.ExpressionArgumentsStrategy 的表达式评估上下文中作为 #cmd 变量可用。否则,此属性将被忽略。 |
| 11 | 对 o.s.i.redis.outbound.ArgumentsStrategy 实例的引用。它与 argument-expressions 属性互斥。如果未提供任何属性,则 payload 用作命令参数。 |
<int-redis:outbound-gateway> 可以用作执行任何所需 Redis 操作的通用组件。以下示例展示了如何从 Redis 原子数获取增量值
<int-redis:outbound-gateway request-channel="requestChannel"
reply-channel="replyChannel"
command-expression="'INCR'"/>
Message 有效负载应具有 redisCounter 名称,这可以通过 org.springframework.data.redis.support.atomic.RedisAtomicInteger bean 定义提供。
RedisConnection#execute 方法具有泛型 Object 作为其返回类型。实际结果取决于命令类型。例如,MGET 返回 List<byte[]>。有关命令、其参数和结果类型的更多信息,请参阅 Redis 规范。
Redis 队列出站网关
Spring Integration 引入了 Redis 队列出站网关以执行请求和回复场景。它将对话 UUID 推送到提供的 queue,将以该 UUID 作为其键的值推送到 Redis 列表,并等待来自以 UUID 加上 .reply 作为键的 Redis 列表的回复。每次交互都使用不同的 UUID。以下列表显示了 Redis 出站网关的可用属性
<int-redis:queue-outbound-gateway
request-channel="" (1)
reply-channel="" (2)
requires-reply="" (3)
reply-timeout="" (4)
connection-factory="" (5)
queue="" (6)
order="" (7)
serializer="" (8)
extract-payload=""/> (9)
| 1 | 此端点从中接收 Message 实例的 MessageChannel。 |
| 2 | 此端点发送回复 Message 实例的 MessageChannel。 |
| 3 | 指定此出站网关是否必须返回非 null 值。此值默认为 false。否则,当 Redis 返回 null 值时,将抛出 ReplyRequiredException。 |
| 4 | 等待发送回复消息的超时时间(以毫秒为单位)。它通常用于基于队列的有限回复通道。 |
| 5 | 对 RedisConnectionFactory bean 的引用。它默认为 redisConnectionFactory。它与“redis-template”属性互斥。 |
| 6 | 出站网关向其发送对话 UUID 的 Redis 列表的名称。 |
| 7 | 当注册了多个网关时,此出站网关的顺序。 |
| 8 | RedisSerializer bean 引用。它可以是一个空字符串,表示“无序列化器”。在这种情况下,来自入站 Redis 消息的原始 byte[] 将作为 Message 有效负载发送到 channel。默认情况下,它是 JdkSerializationRedisSerializer。 |
| 9 | 指定此端点是否期望 Redis 队列中的数据包含完整的 Message 实例。如果此属性设置为 true,则 serializer 不能是空字符串,因为消息需要某种形式的反序列化(默认情况下为 JDK 序列化)。 |
Redis 队列入站网关
Spring Integration 4.1 引入了 Redis 队列入站网关以执行请求和回复场景。它从提供的 queue 弹出对话 UUID,从 Redis 列表中弹出以该 UUID 作为其键的值,并将回复推送到以 UUID 加上 .reply 作为键的 Redis 列表。以下列表显示了 Redis 队列入站网关的可用属性
<int-redis:queue-inbound-gateway
request-channel="" (1)
reply-channel="" (2)
executor="" (3)
reply-timeout="" (4)
connection-factory="" (5)
queue="" (6)
order="" (7)
serializer="" (8)
receive-timeout="" (9)
expect-message="" (10)
recovery-interval=""/> (11)
| 1 | 此端点发送从 Redis 数据创建的 Message 实例的 MessageChannel。 |
| 2 | 此端点等待回复 Message 实例的 MessageChannel。可选 - replyChannel 头仍在使用中。 |
| 3 | 对 Spring TaskExecutor(或标准 JDK Executor)bean 的引用。它用于底层侦听任务。它默认为 SimpleAsyncTaskExecutor。 |
| 4 | 等待发送回复消息的超时时间(以毫秒为单位)。它通常用于基于队列的有限回复通道。 |
| 5 | 对 RedisConnectionFactory bean 的引用。它默认为 redisConnectionFactory。它与 redis-template 属性互斥。 |
| 6 | 对话 UUID 的 Redis 列表的名称。 |
| 7 | 当注册了多个网关时,此入站网关的顺序。 |
| 8 | RedisSerializer bean 引用。它可以是一个空字符串,表示“无序列化器”。在这种情况下,来自入站 Redis 消息的原始 byte[] 将作为 Message 有效负载发送到 channel。它默认为 JdkSerializationRedisSerializer。(请注意,在 4.3 版本之前,它默认是 StringRedisSerializer。要恢复该行为,请提供对 StringRedisSerializer 的引用)。 |
| 9 | 等待接收消息的超时时间(以毫秒为单位)。它通常用于基于队列的有限请求通道。 |
| 10 | 指定此端点是否期望 Redis 队列中的数据包含完整的 Message 实例。如果此属性设置为 true,则 serializer 不能是空字符串,因为消息需要某种形式的反序列化(默认情况下为 JDK 序列化)。 |
| 11 | “右弹出”操作发生异常后,侦听器任务应休眠的时间(以毫秒为单位),然后重新启动侦听器任务。 |
task-executor 必须配置多个线程进行处理;否则,当 RedisQueueMessageDrivenEndpoint 尝试在错误后重新启动侦听器任务时,可能会出现死锁。errorChannel 可用于处理这些错误,以避免重新启动,但最好不要使应用程序暴露于可能的死锁情况。有关可能的 TaskExecutor 实现,请参阅 Spring Framework 参考手册。 |
Redis 流出站通道适配器
Spring Integration 5.4 引入了 Reactive Redis Stream 出站通道适配器,用于将消息有效负载写入 Redis 流。出站通道适配器使用 ReactiveStreamOperations.add(…) 将 Record 添加到流中。以下示例展示了如何使用 Java 配置和 Service 类来实现 Redis 流出站通道适配器。
@Bean
@ServiceActivator(inputChannel = "messageChannel")
public ReactiveRedisStreamMessageHandler reactiveValidatorMessageHandler(
ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
ReactiveRedisStreamMessageHandler reactiveStreamMessageHandler =
new ReactiveRedisStreamMessageHandler(reactiveRedisConnectionFactory, "myStreamKey"); (1)
reactiveStreamMessageHandler.setSerializationContext(serializationContext); (2)
reactiveStreamMessageHandler.setHashMapper(hashMapper); (3)
reactiveStreamMessageHandler.setExtractPayload(true); (4)
return reactiveStreamMessageHandler;
}
| 1 | 使用 ReactiveRedisConnectionFactory 和流名称构造 ReactiveRedisStreamMessageHandler 实例以添加记录。另一个构造函数变体基于 SpEL 表达式,用于根据请求消息评估流键。 |
| 2 | 设置用于在添加到流之前序列化记录键和值的 RedisSerializationContext。 |
| 3 | 设置 HashMapper,它提供 Java 类型和 Redis 哈希/映射之间的契约。 |
| 4 | 如果为“true”,通道适配器将从请求消息中提取有效负载值以用于流记录。或者使用整个消息作为值。它默认为 true。 |
从 6.5 版本开始,ReactiveRedisStreamMessageHandler 提供了 setAddOptionsFunction(Function<Message<?>, RedisStreamCommands.XAddOptions> addOptionsFunction) 选项,用于根据请求消息为内部 ReactiveStreamOperations.add(Record<K, ?> record, XAddOptions xAddOptions) 调用构建 RedisStreamCommands.XAddOptions。
Redis 流入站通道适配器
Spring Integration 5.4 引入了 Reactive Stream 入站通道适配器,用于从 Redis 流读取消息。入站通道适配器根据自动确认标志使用 StreamReceiver.receive(…) 或 StreamReceiver.receiveAutoAck() 从 Redis 流读取记录。以下示例展示了如何使用 Java 配置来实现 Redis 流入站通道适配器。
@Bean
public ReactiveRedisStreamMessageProducer reactiveRedisStreamProducer(
ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
ReactiveRedisStreamMessageProducer messageProducer =
new ReactiveRedisStreamMessageProducer(reactiveRedisConnectionFactory, "myStreamKey"); (1)
messageProducer.setStreamReceiverOptions( (2)
StreamReceiver.StreamReceiverOptions.builder()
.pollTimeout(Duration.ofMillis(100))
.build());
messageProducer.setAutoStartup(true); (3)
messageProducer.setAutoAck(false); (4)
messageProducer.setCreateConsumerGroup(true); (5)
messageProducer.setConsumerGroup("my-group"); (6)
messageProducer.setConsumerName("my-consumer"); (7)
messageProducer.setOutputChannel(fromRedisStreamChannel); (8)
messageProducer.setReadOffset(ReadOffset.latest()); (9)
messageProducer.extractPayload(true); (10)
return messageProducer;
}
| 1 | 使用 ReactiveRedisConnectionFactory 和流键构造 ReactiveRedisStreamMessageProducer 实例以读取记录。 |
| 2 | 一个 StreamReceiver.StreamReceiverOptions,用于使用反应式基础设施消费 redis 流。 |
| 3 | 一个 SmartLifecycle 属性,用于指定此端点是否应在应用程序上下文启动后自动启动。它默认为 true。如果为 false,则应手动启动 RedisStreamMessageProducer:messageProducer.start()。 |
| 4 | 如果为 false,则接收到的消息不会自动确认。消息的确认将推迟到客户端消费消息。它默认为 true。 |
| 5 | 如果为 true,则将创建消费者组。在创建消费者组期间,也将创建流(如果尚不存在)。消费者组跟踪消息传递并区分消费者。它默认为 false。 |
| 6 | 设置消费者组名称。它默认为定义的 bean 名称。 |
| 7 | 设置消费者名称。将消息作为 my-consumer 从组 my-group 中读取。 |
| 8 | 此端点发送消息的通道。 |
| 9 | 定义读取消息的偏移量。它默认为 ReadOffset.latest()。 |
| 10 | 如果为“true”,通道适配器将从 Record 中提取有效负载值。否则,整个 Record 用作有效负载。它默认为 true。 |
如果 autoAck 设置为 false,则 Redis 流中的 Record 不会被 Redis 驱动程序自动确认,而是将 IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK 标头添加到要生成的消息中,其值为 SimpleAcknowledgment 实例。目标集成流负责在根据此类记录完成消息的业务逻辑时调用其 acknowledge() 回调。即使在反序列化期间发生异常并配置了 errorChannel,也需要类似的逻辑。因此,目标错误处理程序必须决定确认或拒绝此类失败消息。除了 IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK 之外,ReactiveRedisStreamMessageProducer 还将这些标头填充到要生成的消息中:RedisHeaders.STREAM_KEY、RedisHeaders.STREAM_MESSAGE_ID、RedisHeaders.CONSUMER_GROUP 和 RedisHeaders.CONSUMER。
从 5.5 版本开始,可以在 ReactiveRedisStreamMessageProducer 上显式配置 StreamReceiver.StreamReceiverOptionsBuilder 选项,包括新引入的 onErrorResume 函数,如果 Redis 流消费者在发生反序列化错误时应继续轮询,则需要此函数。默认函数将消息发送到错误通道(如果提供),并可能对失败消息进行确认,如上所述。所有这些 StreamReceiver.StreamReceiverOptionsBuilder 与外部提供的 StreamReceiver.StreamReceiverOptions 互斥。
Redis 锁注册表
Spring Integration 4.0 引入了 RedisLockRegistry。某些组件(例如,聚合器和重新排序器)使用从 LockRegistry 实例获取的锁,以确保一次只有一个线程操作一个组。DefaultLockRegistry 在单个组件中执行此功能。可以在这些组件上配置外部锁注册表。当与共享的 MessageGroupStore 一起使用时,可以设置 RedisLockRegistry 以在多个应用程序实例之间提供此功能,以便一次只有一个实例可以操作该组。
当本地线程释放锁时,另一个本地线程通常可以立即获取锁。如果使用不同注册表实例的线程释放锁,则可能需要长达 100 毫秒才能获取锁。
为避免“悬挂”锁(当服务器失败时),此注册表中的锁在默认 60 秒后过期,但可以在注册表上配置。锁通常保持的时间要短得多。
| 由于键可能会过期,尝试解锁已过期的锁会导致抛出异常。但是,受此类锁保护的资源可能已受到损害,因此此类异常应被视为严重异常。过期时间应设置为足够大的值以防止这种情况,但要设置得足够低,以便在服务器故障后能在合理的时间内恢复锁。 |
从 5.0 版本开始,RedisLockRegistry 实现了 ExpirableLockRegistry,它会删除最后获取时间超过 age 且当前未锁定的锁。
从 5.5.6 版本开始,RedisLockRegistry 支持通过 RedisLockRegistry.setCacheCapacity() 自动清理 RedisLockRegistry.locks 中的 redisLocks 缓存。有关更多信息,请参阅其 JavaDoc。
从 5.5.13 版本开始,RedisLockRegistry 公开了一个 setRedisLockType(RedisLockType) 选项,用于确定 Redis 锁获取应以哪种模式发生
-
RedisLockType.SPIN_LOCK- 锁通过定期循环(100 毫秒)检查是否可以获取锁来获取。默认。 -
RedisLockType.PUB_SUB_LOCK- 锁通过 redis 发布-订阅订阅获取。
pub-sub 是首选模式 - 客户端 Redis 服务器之间的网络通信更少,性能更高 - 当订阅在另一个进程中收到解锁通知时,锁会立即获取。但是,Redis 不支持 Master/Replica 连接中的 pub-sub(例如,在 AWS ElastiCache 环境中),因此,选择忙循环模式作为默认值,以使注册表在任何环境中都能工作。
从 6.4 版本开始,RedisLockRegistry.RedisLock.unlock() 方法在锁的所有权过期时会抛出 ConcurrentModificationException,而不是 IllegalStateException。
从 6.4 版本开始,添加了 RedisLockRegistry.setRenewalTaskScheduler() 以配置用于定期续订锁的调度器。设置后,锁将在成功获取锁后每 1/3 的过期时间自动续订,直到解锁或 Redis 键被删除。
从 7.0 版本开始,RedisLock 实现了 DistributedLock 接口,以支持锁状态数据的自定义生存时间 (TTL) 功能。现在可以使用 lock(Duration ttl) 或 tryLock(long time, TimeUnit unit, Duration ttl) 方法获取 RedisLock,并指定生存时间 (TTL) 值。RedisLockRegistry 现在提供了新的 renewLock(Object lockKey, Duration ttl) 方法,允许使用自定义生存时间值续订锁。
集群模式下 AWS ElastiCache for Valkey 支持
从 6.4.9/6.5.4/7.0.0 版本开始,RedisLockRegistry 支持集群模式下的 AWS Elasticache for Valkey。在此版本的 valkey(一个 redis 的替代品)中,所有 PubSub 操作(PUBLISH、SUBSCRIBE 等)都在内部使用其分片变体(SPUBLISH、SSUBSCRIBE 等)。如果出现以下形式的任何错误
Caused by: io.lettuce.core.RedisCommandExecutionException: ERR Script attempted to access keys that do not hash to the same slot script: b2dedc0ab01c17f9f20e3e6ddb62dcb6afbed0bd, on @user_script:3.
“在 RedisLockRegistry 的 unlock 步骤中,必须提供包含主题标签 {…} 的锁键,以确保解锁脚本中的所有操作都被哈希到相同的集群槽/分片,例如
RedisLockRegistry lockRegistry = new RedisLockRegistry("my-lock-key{choose_your_tag}");
lockRegistry.lock();
# critical section
lockRegistry.unlock();