Redis 支持
Spring Integration 2.1 引入了对 Redis 的支持:“一个开源的高级键值存储”。这种支持以基于 Redis 的 MessageStore
以及 Redis 通过其 PUBLISH
、SUBSCRIBE
和 UNSUBSCRIBE
命令支持的发布-订阅消息适配器形式出现。
您需要将此依赖项包含到您的项目中
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-redis</artifactId>
<version>6.3.0</version>
</dependency>
compile "org.springframework.integration:spring-integration-redis:6.3.0"
您还需要包含 Redis 客户端依赖项,例如 Lettuce。
要下载、安装和运行 Redis,请参阅 Redis 文档。
连接到 Redis
要开始与 Redis 交互,您首先需要连接到它。Spring Integration 使用另一个 Spring 项目提供的支持,Spring Data Redis,它提供了典型的 Spring 结构:ConnectionFactory
和 Template
。这些抽象简化了与多个 Redis 客户端 Java API 的集成。目前,Spring Data Redis 支持 Jedis 和 Lettuce。
使用 RedisConnectionFactory
要连接到 Redis,您可以使用 RedisConnectionFactory
接口的实现之一。以下清单显示了接口定义
public interface RedisConnectionFactory extends PersistenceExceptionTranslator {
/**
* Provides a suitable connection for interacting with Redis.
* @return connection for interacting with Redis.
*/
RedisConnection getConnection();
}
以下示例展示了如何在 Java 中创建 LettuceConnectionFactory
LettuceConnectionFactory cf = new LettuceConnectionFactory();
cf.afterPropertiesSet();
以下示例展示了如何在 Spring 的 XML 配置中创建 LettuceConnectionFactory
<bean id="redisConnectionFactory"
class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
<property name="port" value="7379" />
</bean>
RedisConnectionFactory
的实现提供了一组属性,例如端口和主机,您可以在需要时设置这些属性。一旦您拥有 RedisConnectionFactory
的实例,您就可以创建一个 RedisTemplate
实例并将其注入 RedisConnectionFactory
。
使用 RedisTemplate
与 Spring 中的其他模板类(如 JdbcTemplate
和 JmsTemplate
)一样,RedisTemplate
是一个辅助类,它简化了 Redis 数据访问代码。有关 RedisTemplate
及其变体(如 StringRedisTemplate
)的更多信息,请参阅 Spring Data Redis 文档。
以下示例展示了如何在 Java 中创建 RedisTemplate
实例
RedisTemplate rt = new RedisTemplate<String, Object>();
rt.setConnectionFactory(redisConnectionFactory);
以下示例展示了如何在 Spring 的 XML 配置中创建 RedisTemplate
实例
<bean id="redisTemplate"
class="org.springframework.data.redis.core.RedisTemplate">
<property name="connectionFactory" ref="redisConnectionFactory"/>
</bean>
使用 Redis 进行消息传递
如 引言 中所述,Redis 通过其 PUBLISH
、SUBSCRIBE
和 UNSUBSCRIBE
命令提供对发布/订阅消息传递的支持。与 JMS 和 AMQP 一样,Spring Integration 提供了消息通道和适配器,用于通过 Redis 发送和接收消息。
Redis 发布/订阅通道
与 JMS 类似,在某些情况下,生产者和消费者都打算成为同一个应用程序的一部分,在同一个进程中运行。您可以通过使用一对入站和出站通道适配器来实现这一点。但是,与 Spring Integration 的 JMS 支持一样,有一种更简单的方法来解决这种情况。您可以创建一个发布/订阅通道,如下面的示例所示
<int-redis:publish-subscribe-channel id="redisChannel" topic-name="si.test.topic"/>
publish-subscribe-channel
的行为与主 Spring Integration 命名空间中的普通 <publish-subscribe-channel/>
元素非常相似。任何端点的 input-channel
和 output-channel
属性都可以引用它。区别在于此通道由 Redis 主题名称支持:由 topic-name
属性指定的 String
值。但是,与 JMS 不同,此主题不必预先创建,甚至不必由 Redis 自动创建。在 Redis 中,主题只是充当地址的简单 String
值。生产者和消费者可以使用相同的 String
值作为其主题名称进行通信。对该通道的简单订阅意味着生产者和消费者端点之间可以进行异步发布/订阅消息传递。但是,与通过在简单的 Spring Integration <channel/>
元素中添加 <queue/>
元素创建的异步消息通道不同,消息不会存储在内存队列中。相反,这些消息会通过 Redis 传递,这使您可以依赖其对持久性和集群的支持,以及它与其他非 Java 平台的互操作性。
Redis 入站通道适配器
Redis 入站通道适配器 (RedisInboundChannelAdapter
) 以与其他入站适配器相同的方式将传入的 Redis 消息适配为 Spring 消息。它接收特定于平台的消息(在本例中为 Redis),并使用 MessageConverter
策略将其转换为 Spring 消息。以下示例展示了如何配置 Redis 入站通道适配器
<int-redis:inbound-channel-adapter id="redisAdapter"
topics="thing1, thing2"
channel="receiveChannel"
error-channel="testErrorChannel"
message-converter="testConverter" />
<bean id="redisConnectionFactory"
class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
<property name="port" value="7379" />
</bean>
<bean id="testConverter" class="things.something.SampleMessageConverter" />
上面的示例展示了 Redis 入站通道适配器的简单但完整的配置。请注意,上面的配置依赖于 Spring 的熟悉模式,即自动发现某些 Bean。在本例中,redisConnectionFactory
被隐式注入到适配器中。您可以使用 connection-factory
属性显式指定它。
此外,请注意,上面的配置将适配器与自定义 MessageConverter
注入。这种方法类似于 JMS,其中 MessageConverter
实例用于在 Redis 消息和 Spring Integration 消息有效负载之间进行转换。默认值为 SimpleMessageConverter
。
入站适配器可以订阅多个主题名称,因此 topics
属性中包含以逗号分隔的值集。
从 3.0 版本开始,除了现有的 topics
属性之外,入站适配器现在还具有 topic-patterns
属性。此属性包含以逗号分隔的 Redis 主题模式集。有关 Redis 发布/订阅的更多信息,请参阅 Redis Pub/Sub。
入站适配器可以使用 RedisSerializer
来反序列化 Redis 消息的主体。<int-redis:inbound-channel-adapter>
的 serializer
属性可以设置为一个空字符串,这将导致 RedisSerializer
属性的值为 null
。在这种情况下,Redis 消息的原始 byte[]
主体将作为消息有效负载提供。
从 5.0 版本开始,您可以使用 <int-redis:inbound-channel-adapter>
的 task-executor
属性向入站适配器提供 Executor
实例。此外,接收到的 Spring Integration 消息现在具有 RedisHeaders.MESSAGE_SOURCE
标头,以指示已发布消息的来源:主题或模式。您可以在下游使用它进行路由逻辑。
Redis 出站通道适配器
Redis 出站通道适配器以与其他出站适配器相同的方式将传出的 Spring Integration 消息适配为 Redis 消息。它接收 Spring Integration 消息,并使用 MessageConverter
策略将其转换为特定于平台的消息(在本例中为 Redis)。以下示例展示了如何配置 Redis 出站通道适配器
<int-redis:outbound-channel-adapter id="outboundAdapter"
channel="sendChannel"
topic="thing1"
message-converter="testConverter"/>
<bean id="redisConnectionFactory"
class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
<property name="port" value="7379"/>
</bean>
<bean id="testConverter" class="things.something.SampleMessageConverter" />
配置与 Redis 入站通道适配器类似。适配器隐式注入了一个 RedisConnectionFactory
,它使用 redisConnectionFactory
作为其 Bean 名称进行定义。此示例还包括可选的(和自定义的)MessageConverter
(testConverter
Bean)。
从 Spring Integration 3.0 开始,<int-redis:outbound-channel-adapter>
提供了 topic
属性的替代方案:您可以使用 topic-expression
属性在运行时确定 Redis 主题。这些属性是互斥的。
Redis 队列入站通道适配器
Spring Integration 3.0 引入了一个队列入站通道适配器,用于从 Redis 列表中“弹出”消息。默认情况下,它使用“右弹出”,但您可以将其配置为使用“左弹出”。适配器是消息驱动的。它使用内部监听线程,不使用轮询器。
以下列表显示了 queue-inbound-channel-adapter
的所有可用属性
<int-redis:queue-inbound-channel-adapter id="" (1)
channel="" (2)
auto-startup="" (3)
phase="" (4)
connection-factory="" (5)
queue="" (6)
error-channel="" (7)
serializer="" (8)
receive-timeout="" (9)
recovery-interval="" (10)
expect-message="" (11)
task-executor="" (12)
right-pop=""/> (13)
1 | 组件 bean 名称。如果您没有提供 channel 属性,则会创建一个 DirectChannel 并使用此 id 属性作为 bean 名称注册到应用程序上下文。在这种情况下,端点本身将使用 bean 名称 id 加上 .adapter 注册。(如果 bean 名称是 thing1 ,则端点将注册为 thing1.adapter 。) |
2 | 要将来自此端点的 Message 实例发送到的 MessageChannel 。 |
3 | 一个 SmartLifecycle 属性,用于指定此端点是否应该在应用程序上下文启动后自动启动。它默认为 true 。 |
4 | 一个 SmartLifecycle 属性,用于指定此端点启动的阶段。它默认为 0 。 |
5 | 对 RedisConnectionFactory bean 的引用。它默认为 redisConnectionFactory 。 |
6 | 执行基于队列的“弹出”操作以获取 Redis 消息的 Redis 列表的名称。 |
7 | 要将 ErrorMessage 实例发送到的 MessageChannel ,这些实例是在从端点的监听任务接收异常时生成的。默认情况下,底层的 MessagePublishingErrorHandler 使用应用程序上下文中的默认 errorChannel 。 |
8 | RedisSerializer bean 引用。它可以是空字符串,这意味着“没有序列化器”。在这种情况下,来自入站 Redis 消息的原始 byte[] 将作为 Message 负载发送到 channel 。默认情况下,它是一个 JdkSerializationRedisSerializer 。 |
9 | “弹出”操作等待从队列中获取 Redis 消息的超时时间(以毫秒为单位)。默认值为 1 秒。 |
10 | 监听任务在“弹出”操作发生异常后应休眠的时间(以毫秒为单位),然后再重新启动监听任务。 |
11 | 指定此端点是否期望来自 Redis 队列的数据包含完整的 Message 实例。如果将此属性设置为 true ,则 serializer 不能是空字符串,因为消息需要某种形式的反序列化(默认情况下为 JDK 序列化)。它的默认值为 false 。 |
12 | 对 Spring TaskExecutor (或标准 JDK 1.5+ Executor ) bean 的引用。它用于底层的监听任务。它默认为 SimpleAsyncTaskExecutor 。 |
13 | 指定此端点是否应使用“右弹出”(当 true 时)或“左弹出”(当 false 时)从 Redis 列表读取消息。如果为 true ,则 Redis 列表在与默认 Redis 队列出站通道适配器一起使用时充当 FIFO 队列。将其设置为 false 以与使用“右推”写入列表的软件一起使用,或实现类似堆栈的消息顺序。其默认值为 true 。自版本 4.3 起。 |
task-executor 必须配置为具有多个线程才能进行处理;否则,当 RedisQueueMessageDrivenEndpoint 尝试在错误后重新启动监听器任务时,可能会出现死锁。errorChannel 可用于处理这些错误,以避免重新启动,但最好不要将应用程序暴露于可能的死锁情况。有关可能的 TaskExecutor 实现,请参阅 Spring 框架 参考手册。
|
Redis 队列出站通道适配器
Spring Integration 3.0 引入了队列出站通道适配器,用于将 Spring Integration 消息“推”到 Redis 列表。默认情况下,它使用“左推”,但您可以将其配置为改为使用“右推”。以下列表显示了 Redis queue-outbound-channel-adapter
的所有可用属性。
<int-redis:queue-outbound-channel-adapter id="" (1)
channel="" (2)
connection-factory="" (3)
queue="" (4)
queue-expression="" (5)
serializer="" (6)
extract-payload="" (7)
left-push=""/> (8)
1 | 组件 bean 名称。如果您未提供 channel 属性,则会创建一个 DirectChannel 并使用此 id 属性作为 bean 名称在应用程序上下文中注册。在这种情况下,端点将使用 id 加上 .adapter 的 bean 名称注册。(如果 bean 名称是 thing1 ,则端点将注册为 thing1.adapter 。) |
2 | 此端点接收 Message 实例的 MessageChannel 。 |
3 | 对 RedisConnectionFactory bean 的引用。它默认为 redisConnectionFactory 。 |
4 | 执行基于队列的“推”操作以发送 Redis 消息的 Redis 列表的名称。此属性与 queue-expression 相互排斥。 |
5 | 一个 SpEL Expression ,用于确定 Redis 列表的名称。它在运行时使用传入的 Message 作为 #root 变量。此属性与 queue 相互排斥。 |
6 | 一个 RedisSerializer bean 引用。它默认为 JdkSerializationRedisSerializer 。但是,对于 String 负载,如果未提供 serializer 引用,则使用 StringRedisSerializer 。 |
7 | 指定此端点是否应仅发送有效负载或将整个Message 发送到 Redis 队列。默认值为true 。 |
8 | 指定此端点是否应使用“左推”(当true 时)或“右推”(当false 时)将消息写入 Redis 列表。如果为true ,则 Redis 列表在与默认 Redis 队列入站通道适配器一起使用时充当FIFO 队列。将其设置为false 以与使用“左弹出”从列表中读取的软件一起使用,或实现类似堆栈的消息顺序。默认值为true 。自版本 4.3 起。 |
Redis 应用程序事件
自 Spring Integration 3.0 起,Redis 模块提供了IntegrationEvent
的实现,而IntegrationEvent
本身是org.springframework.context.ApplicationEvent
。RedisExceptionEvent
封装了来自 Redis 操作的异常(端点是事件的“源”)。例如,<int-redis:queue-inbound-channel-adapter/>
在从BoundListOperations.rightPop
操作中捕获异常后会发出这些事件。异常可能是任何通用的org.springframework.data.redis.RedisSystemException
或org.springframework.data.redis.RedisConnectionFailureException
。使用<int-event:inbound-channel-adapter/>
处理这些事件对于确定后台 Redis 任务的问题并采取管理措施很有用。
Redis 消息存储
如企业集成模式(EIP)书中所述,消息存储允许您持久化消息。当处理具有缓冲消息功能的组件(聚合器、重新排序器等)时,这在可靠性至关重要的情况下非常有用。在 Spring Integration 中,MessageStore
策略也为凭证检查模式提供了基础,该模式也在 EIP 中进行了描述。
Spring Integration 的 Redis 模块提供了RedisMessageStore
。以下示例展示了如何在聚合器中使用它
<bean id="redisMessageStore" class="o.s.i.redis.store.RedisMessageStore">
<constructor-arg ref="redisConnectionFactory"/>
</bean>
<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
message-store="redisMessageStore"/>
前面的示例是一个 Bean 配置,它期望RedisConnectionFactory
作为构造函数参数。
默认情况下,RedisMessageStore
使用 Java 序列化来序列化消息。但是,如果您想使用其他序列化技术(例如 JSON),可以通过设置RedisMessageStore
的valueSerializer
属性来提供您自己的序列化器。
从 4.3.10 版本开始,框架为 Message
实例和 MessageHeaders
实例提供了 Jackson 序列化器和反序列化器实现——分别为 MessageJacksonDeserializer
和 MessageHeadersJacksonSerializer
。它们必须使用 SimpleModule
选项为 ObjectMapper
配置。此外,您应该在 ObjectMapper
上设置 enableDefaultTyping
以为每个序列化后的复杂对象添加类型信息(如果您信任源)。然后在反序列化期间使用该类型信息。框架提供了一个名为 JacksonJsonUtils.messagingAwareMapper()
的实用程序方法,该方法已经包含了前面提到的所有属性和序列化器。此实用程序方法带有 trustedPackages
参数,用于限制反序列化的 Java 包,以避免安全漏洞。默认的受信任包:java.util
、java.lang
、org.springframework.messaging.support
、org.springframework.integration.support
、org.springframework.integration.message
、org.springframework.integration.store
。要管理 RedisMessageStore
中的 JSON 序列化,您必须以类似于以下示例的方式对其进行配置
RedisMessageStore store = new RedisMessageStore(redisConnectionFactory);
ObjectMapper mapper = JacksonJsonUtils.messagingAwareMapper();
RedisSerializer<Object> serializer = new GenericJackson2JsonRedisSerializer(mapper);
store.setValueSerializer(serializer);
从 4.3.12 版本开始,RedisMessageStore
支持 prefix
选项,允许区分同一 Redis 服务器上存储实例。
Redis 通道消息存储
RedisMessageStore
如前所述 将每个组维护为单个键(组 ID)下的值。虽然您可以使用它来支持 QueueChannel
以实现持久性,但为此目的提供了一个专门的 RedisChannelMessageStore
(从 4.0 版本开始)。此存储为每个通道使用一个 LIST
,在发送消息时使用 LPUSH
,在接收消息时使用 RPOP
。默认情况下,此存储还使用 JDK 序列化,但您可以修改值序列化器,如 前面所述。
我们建议使用此存储支持通道,而不是使用通用的 RedisMessageStore
。以下示例定义了一个 Redis 消息存储,并在具有队列的通道中使用它
<bean id="redisMessageStore" class="o.s.i.redis.store.RedisChannelMessageStore">
<constructor-arg ref="redisConnectionFactory"/>
</bean>
<int:channel id="somePersistentQueueChannel">
<int:queue message-store="redisMessageStore"/>
<int:channel>
用于存储数据的键具有以下形式:<storeBeanName>:<channelId>
(在前面的示例中,redisMessageStore:somePersistentQueueChannel
)。
此外,还提供了一个子类 RedisChannelPriorityMessageStore
。当您将其与 QueueChannel
一起使用时,消息将按 (FIFO) 优先级顺序接收。它使用标准的 IntegrationMessageHeaderAccessor.PRIORITY
标头并支持优先级值 (0 - 9
)。具有其他优先级的消息(以及没有优先级的消息)在任何具有优先级的消息之后按 FIFO 顺序检索。
这些存储仅实现 BasicMessageGroupStore ,而不实现 MessageGroupStore 。它们只能用于支持 QueueChannel 等情况。
|
Redis 元数据存储
Spring Integration 3.0 引入了一个新的基于 Redis 的 MetadataStore
(参见 元数据存储)实现。您可以使用 RedisMetadataStore
来维护 MetadataStore
的状态,使其在应用程序重启后仍然存在。您可以将此新的 MetadataStore
实现与以下适配器一起使用:
要指示这些适配器使用新的 RedisMetadataStore
,请声明一个名为 metadataStore
的 Spring bean。Feed 入站通道适配器和 feed 入站通道适配器都会自动获取并使用声明的 RedisMetadataStore
。以下示例展示了如何声明这样的 bean
<bean name="metadataStore" class="o.s.i.redis.store.metadata.RedisMetadataStore">
<constructor-arg name="connectionFactory" ref="redisConnectionFactory"/>
</bean>
RedisMetadataStore
由 RedisProperties
支持。与它的交互使用 BoundHashOperations
,而这反过来又需要一个用于整个 Properties
存储的 key
。在 MetadataStore
的情况下,这个 key
扮演着区域的角色,这在分布式环境中很有用,当多个应用程序使用同一个 Redis 服务器时。默认情况下,这个 key
的值为 MetaData
。
从 4.0 版本开始,此存储实现 ConcurrentMetadataStore
,使其能够可靠地跨多个应用程序实例共享,其中只允许一个实例存储或修改键的值。
您不能将 RedisMetadataStore.replace() (例如,在 AbstractPersistentAcceptOnceFileListFilter 中)与 Redis 集群一起使用,因为目前不支持用于原子性的 WATCH 命令。
|
Redis 存储入站通道适配器
Redis 存储入站通道适配器是一个轮询消费者,它从 Redis 集合中读取数据并将其作为 Message
负载发送。以下示例展示了如何配置 Redis 存储入站通道适配器
<int-redis:store-inbound-channel-adapter id="listAdapter"
connection-factory="redisConnectionFactory"
key="myCollection"
channel="redisChannel"
collection-type="LIST" >
<int:poller fixed-rate="2000" max-messages-per-poll="10"/>
</int-redis:store-inbound-channel-adapter>
上面的示例展示了如何使用 store-inbound-channel-adapter
元素配置 Redis 存储入站通道适配器,为各种属性提供值,例如
-
key
或key-expression
:正在使用的集合的键的名称。 -
collection-type
: 此适配器支持的集合类型的枚举。支持的集合类型包括LIST
、SET
、ZSET
、PROPERTIES
和MAP
。 -
connection-factory
: 对o.s.data.redis.connection.RedisConnectionFactory
实例的引用。 -
redis-template
: 对o.s.data.redis.core.RedisTemplate
实例的引用。 -
其他所有入站适配器共有的属性(例如“channel”)。
您不能同时设置 redis-template 和 connection-factory 。
|
默认情况下,适配器使用
|
由于它对 key
有一个字面值,因此前面的示例相对简单且静态。有时,您可能需要根据某些条件在运行时更改键的值。为此,请使用 key-expression
,其中提供的表达式可以是任何有效的 SpEL 表达式。
此外,您可能希望对从 Redis 集合中读取的成功处理的数据执行一些后处理。例如,您可能希望在处理完值后将其移动或删除。您可以使用 Spring Integration 2.2 中添加的事务同步功能来实现。以下示例使用 key-expression
和事务同步
<int-redis:store-inbound-channel-adapter id="zsetAdapterWithSingleScoreAndSynchronization"
connection-factory="redisConnectionFactory"
key-expression="'presidents'"
channel="otherRedisChannel"
auto-startup="false"
collection-type="ZSET">
<int:poller fixed-rate="1000" max-messages-per-poll="2">
<int:transactional synchronization-factory="syncFactory"/>
</int:poller>
</int-redis:store-inbound-channel-adapter>
<int:transaction-synchronization-factory id="syncFactory">
<int:after-commit expression="payload.removeByScore(18, 18)"/>
</int:transaction-synchronization-factory>
<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager"/>
您可以使用 transactional
元素将轮询器声明为事务性的。此元素可以引用一个真正的事务管理器(例如,如果您的流程的其他部分调用 JDBC)。如果您没有“真正的”事务,可以使用 o.s.i.transaction.PseudoTransactionManager
,它实现了 Spring 的 PlatformTransactionManager
,并在没有实际事务的情况下启用 Redis 适配器的事务同步功能。
这不会使 Redis 操作本身成为事务性的。它允许在成功(提交)或失败(回滚)之前或之后同步操作。 |
一旦您的轮询器成为事务性的,您可以在transactional
元素上设置o.s.i.transaction.TransactionSynchronizationFactory
的实例。TransactionSynchronizationFactory
创建TransactionSynchronization
的实例。为了方便起见,我们公开了一个基于SpEL的默认TransactionSynchronizationFactory
,它允许您配置SpEL表达式,并与事务协调(同步)其执行。支持提交前、提交后和回滚后的表达式,以及通道(每个事件类型一个),将评估结果(如果有)发送到这些通道。对于每个子元素,您可以指定expression
和channel
属性。如果只存在channel
属性,则接收到的消息将在特定同步场景中发送到该通道。如果只存在expression
属性,并且表达式的结果是非空值,则会生成一条消息,其有效负载为结果,并发送到默认通道(NullChannel
),并在日志中显示(在DEBUG
级别)。如果您希望评估结果转到特定通道,请添加channel
属性。如果表达式的结果为空或无效,则不会生成任何消息。
RedisStoreMessageSource
添加了一个store
属性,该属性绑定到事务IntegrationResourceHolder
的RedisStore
实例,可以从TransactionSynchronizationProcessor
实现中访问。
有关事务同步的更多信息,请参阅事务同步。
RedisStore 出站通道适配器
RedisStore 出站通道适配器允许您将消息有效负载写入 Redis 集合,如下面的示例所示
<int-redis:store-outbound-channel-adapter id="redisListAdapter"
collection-type="LIST"
channel="requestChannel"
key="myCollection" />
前面的配置使用store-inbound-channel-adapter
元素创建一个 Redis 存储出站通道适配器。它为各种属性提供值,例如
-
key
或key-expression
:正在使用的集合的键的名称。 -
extract-payload-elements
: 如果设置为true
(默认值)并且有效负载是“多值”对象的实例(即Collection
或Map
),则使用“addAll”和“putAll”语义存储。否则,如果设置为false
,则无论其类型如何,有效负载都将存储为单个条目。如果有效负载不是“多值”对象的实例,则忽略此属性的值,并且有效负载始终存储为单个条目。 -
collection-type
: 此适配器支持的Collection
类型的枚举。支持的集合是LIST
、SET
、ZSET
、PROPERTIES
和MAP
。 -
map-key-expression
: 返回要存储的条目的键名称的 SpEL 表达式。它仅在collection-type
为MAP
或PROPERTIES
且 'extract-payload-elements' 为 false 时适用。 -
connection-factory
: 对o.s.data.redis.connection.RedisConnectionFactory
实例的引用。 -
redis-template
: 对o.s.data.redis.core.RedisTemplate
实例的引用。 -
其他所有入站适配器共有的属性(例如“channel”)。
您不能同时设置 redis-template 和 connection-factory 。
|
默认情况下,适配器使用 StringRedisTemplate 。它对键、值、哈希键和哈希值使用 StringRedisSerializer 实例。但是,如果 extract-payload-elements 设置为 false ,则将使用具有 StringRedisSerializer 实例(用于键和哈希键)和 JdkSerializationRedisSerializer 实例(用于值和哈希值)的 RedisTemplate 。使用 JDK 序列化器时,重要的是要了解,Java 序列化用于所有值,无论该值实际上是否为集合。如果您需要对值的序列化进行更多控制,请考虑提供自己的 RedisTemplate ,而不是依赖这些默认值。
|
由于它对 key
和其他属性具有文字值,因此前面的示例相对简单且静态。有时,您可能需要根据某些条件在运行时动态更改这些值。为此,请使用它们的 -expression
等效项(key-expression
、map-key-expression
等),其中提供的表达式可以是任何有效的 SpEL 表达式。
Redis 出站命令网关
Spring Integration 4.0 引入了 Redis 命令网关,让您可以使用通用 RedisConnection#execute
方法执行任何标准 Redis 命令。以下清单显示了 Redis 出站网关的可用属性
<int-redis:outbound-gateway
request-channel="" (1)
reply-channel="" (2)
requires-reply="" (3)
reply-timeout="" (4)
connection-factory="" (5)
redis-template="" (6)
arguments-serializer="" (7)
command-expression="" (8)
argument-expressions="" (9)
use-command-variable="" (10)
arguments-strategy="" /> (11)
1 | 此端点接收 Message 实例的 MessageChannel 。 |
2 | 此端点发送回复 Message 实例的 MessageChannel 。 |
3 | 指定此出站网关是否必须返回非空值。默认值为 true 。当 Redis 返回 null 值时,将抛出 ReplyRequiredException 。 |
4 | 等待回复消息发送的超时时间(以毫秒为单位)。它通常应用于基于队列的有限回复通道。 |
5 | 对 RedisConnectionFactory bean 的引用。它默认为 redisConnectionFactory 。它与 'redis-template' 属性互斥。 |
6 | 对 RedisTemplate bean 的引用。它与 'connection-factory' 属性互斥。 |
7 | 对 org.springframework.data.redis.serializer.RedisSerializer 实例的引用。它用于将每个命令参数序列化为 byte[],如果需要。 |
8 | 返回命令键的 SpEL 表达式。它默认为 redis_command 消息头。它不能评估为 null 。 |
9 | 以逗号分隔的 SpEL 表达式,它们被评估为命令参数。与 arguments-strategy 属性互斥。如果您不提供任何属性,则 payload 将用作命令参数。参数表达式可以评估为 'null' 以支持可变数量的参数。 |
10 | 一个 boolean 标志,用于指定当 argument-expressions 配置时,评估的 Redis 命令字符串是否在 o.s.i.redis.outbound.ExpressionArgumentsStrategy 中的表达式评估上下文中作为 #cmd 变量提供。否则,此属性将被忽略。 |
11 | 对 o.s.i.redis.outbound.ArgumentsStrategy 实例的引用。它与 argument-expressions 属性互斥。如果您不提供任何属性,则 payload 将用作命令参数。 |
您可以使用 <int-redis:outbound-gateway>
作为通用组件来执行任何所需的 Redis 操作。以下示例展示了如何从 Redis 原子数获取递增值
<int-redis:outbound-gateway request-channel="requestChannel"
reply-channel="replyChannel"
command-expression="'INCR'"/>
Message
负载应具有 redisCounter
的名称,该名称可能由 org.springframework.data.redis.support.atomic.RedisAtomicInteger
bean 定义提供。
RedisConnection#execute
方法具有一个泛型 Object
作为其返回类型。实际结果取决于命令类型。例如,MGET
返回一个 List<byte[]>
。有关命令、其参数和结果类型的更多信息,请参阅 Redis 规范。
Redis 队列出站网关
Spring Integration 引入了 Redis 队列出站网关来执行请求和回复场景。它将一个对话 UUID
推送到提供的 queue
,将具有该 UUID
作为键的值推送到 Redis 列表,并等待来自键为 UUID
加上 .reply
的 Redis 列表的回复。每次交互都使用不同的 UUID。以下列表显示了 Redis 出站网关的可用属性。
<int-redis:queue-outbound-gateway
request-channel="" (1)
reply-channel="" (2)
requires-reply="" (3)
reply-timeout="" (4)
connection-factory="" (5)
queue="" (6)
order="" (7)
serializer="" (8)
extract-payload=""/> (9)
1 | 此端点接收 Message 实例的 MessageChannel 。 |
2 | 此端点发送回复 Message 实例的 MessageChannel 。 |
3 | 指定此出站网关是否必须返回非空值。此值默认情况下为 false 。否则,当 Redis 返回 null 值时,将抛出 ReplyRequiredException 。 |
4 | 等待回复消息发送的超时时间(以毫秒为单位)。它通常应用于基于队列的有限回复通道。 |
5 | 对 RedisConnectionFactory bean 的引用。它默认为 redisConnectionFactory 。它与 'redis-template' 属性互斥。 |
6 | 出站网关发送对话 UUID 的 Redis 列表的名称。 |
7 | 当注册多个网关时,此出站网关的顺序。 |
8 | RedisSerializer bean 引用。它可以是空字符串,这意味着“没有序列化器”。在这种情况下,来自入站 Redis 消息的原始 byte[] 将作为 Message 负载发送到 channel 。默认情况下,它是一个 JdkSerializationRedisSerializer 。 |
9 | 指定此端点是否期望来自 Redis 队列的数据包含完整的 Message 实例。如果将此属性设置为 true ,则 serializer 不能是空字符串,因为消息需要某种形式的反序列化(默认情况下为 JDK 序列化)。 |
Redis 队列入站网关
Spring Integration 4.1 引入了 Redis 队列入站网关来执行请求和回复场景。它从提供的 queue
中弹出对话 UUID
,从 Redis 列表中弹出具有该 UUID
作为键的值,并将回复推送到键为 UUID
加上 .reply
的 Redis 列表。以下列表显示了 Redis 队列入站网关的可用属性。
<int-redis:queue-inbound-gateway
request-channel="" (1)
reply-channel="" (2)
executor="" (3)
reply-timeout="" (4)
connection-factory="" (5)
queue="" (6)
order="" (7)
serializer="" (8)
receive-timeout="" (9)
expect-message="" (10)
recovery-interval=""/> (11)
1 | 此端点发送从 Redis 数据创建的 Message 实例的 MessageChannel 。 |
2 | 此端点等待回复 Message 实例的 MessageChannel 。可选 - replyChannel 标头仍在使用中。 |
3 | 对 Spring TaskExecutor (或标准 JDK Executor )bean 的引用。它用于底层监听任务。它默认为 SimpleAsyncTaskExecutor 。 |
4 | 等待回复消息发送的超时时间(以毫秒为单位)。它通常应用于基于队列的有限回复通道。 |
5 | 对 RedisConnectionFactory bean 的引用。它默认为 redisConnectionFactory 。它与 'redis-template' 属性互斥。 |
6 | 对话 UUID 的 Redis 列表的名称。 |
7 | 当注册多个网关时,此入站网关的顺序。 |
8 | RedisSerializer bean 引用。它可以是空字符串,这意味着“无序列化器”。在这种情况下,来自入站 Redis 消息的原始 byte[] 将作为 Message 负载发送到 channel 。它默认为 JdkSerializationRedisSerializer 。(请注意,在 4.3 版之前的版本中,它默认情况下是 StringRedisSerializer 。要恢复该行为,请提供对 StringRedisSerializer 的引用)。 |
9 | 等待接收消息被获取的超时时间(以毫秒为单位)。它通常应用于基于队列的有限请求通道。 |
10 | 指定此端点是否期望来自 Redis 队列的数据包含完整的 Message 实例。如果将此属性设置为 true ,则 serializer 不能是空字符串,因为消息需要某种形式的反序列化(默认情况下为 JDK 序列化)。 |
11 | 监听器任务在“正确弹出”操作出现异常后应休眠的时间(以毫秒为单位),然后再重新启动监听器任务。 |
task-executor 必须配置为具有多个线程才能进行处理;否则,当 RedisQueueMessageDrivenEndpoint 尝试在错误后重新启动监听器任务时,可能会出现死锁。errorChannel 可用于处理这些错误,以避免重新启动,但最好不要将应用程序暴露于可能的死锁情况。有关可能的 TaskExecutor 实现,请参阅 Spring 框架 参考手册。
|
Redis 流出站通道适配器
Spring Integration 5.4 引入了 Reactive Redis 流出站通道适配器,用于将 Message 负载写入 Redis 流。出站通道适配器使用 ReactiveStreamOperations.add(…)
将 Record
添加到流中。以下示例展示了如何使用 Java 配置和服务类来使用 Redis 流出站通道适配器。
@Bean
@ServiceActivator(inputChannel = "messageChannel")
public ReactiveRedisStreamMessageHandler reactiveValidatorMessageHandler(
ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
ReactiveRedisStreamMessageHandler reactiveStreamMessageHandler =
new ReactiveRedisStreamMessageHandler(reactiveRedisConnectionFactory, "myStreamKey"); (1)
reactiveStreamMessageHandler.setSerializationContext(serializationContext); (2)
reactiveStreamMessageHandler.setHashMapper(hashMapper); (3)
reactiveStreamMessageHandler.setExtractPayload(true); (4)
return reactiveStreamMessageHandler;
}
1 | 使用 ReactiveRedisConnectionFactory 和流名称构造 ReactiveRedisStreamMessageHandler 的实例,以添加记录。另一个构造函数变体基于 SpEL 表达式,以针对请求消息评估流键。 |
2 | 设置 RedisSerializationContext ,用于在添加到流之前序列化记录键和值。 |
3 | 设置 HashMapper ,它提供 Java 类型和 Redis 哈希/映射之间的契约。 |
4 | 如果为“true”,通道适配器将从请求消息中提取负载,以供添加的流记录使用。或者使用整个消息作为值。它默认为 true 。 |
Redis 流入站通道适配器
Spring Integration 5.4 引入了 Reactive 流入站通道适配器,用于从 Redis 流中读取消息。入站通道适配器使用 StreamReceiver.receive(…)
或 StreamReceiver.receiveAutoAck()
(基于自动确认标志)从 Redis 流中读取记录。以下示例展示了如何使用 Java 配置来使用 Redis 流入站通道适配器。
@Bean
public ReactiveRedisStreamMessageProducer reactiveRedisStreamProducer(
ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
ReactiveRedisStreamMessageProducer messageProducer =
new ReactiveRedisStreamMessageProducer(reactiveRedisConnectionFactory, "myStreamKey"); (1)
messageProducer.setStreamReceiverOptions( (2)
StreamReceiver.StreamReceiverOptions.builder()
.pollTimeout(Duration.ofMillis(100))
.build());
messageProducer.setAutoStartup(true); (3)
messageProducer.setAutoAck(false); (4)
messageProducer.setCreateConsumerGroup(true); (5)
messageProducer.setConsumerGroup("my-group"); (6)
messageProducer.setConsumerName("my-consumer"); (7)
messageProducer.setOutputChannel(fromRedisStreamChannel); (8)
messageProducer.setReadOffset(ReadOffset.latest()); (9)
messageProducer.extractPayload(true); (10)
return messageProducer;
}
1 | 使用 ReactiveRedisConnectionFactory 和流键构造 ReactiveRedisStreamMessageProducer 的实例,以读取记录。 |
2 | 一个 StreamReceiver.StreamReceiverOptions ,用于使用响应式基础设施来使用 Redis 流。 |
3 | 一个SmartLifecycle 属性,用于指定此端点是否应该在应用程序上下文启动后自动启动。默认值为true 。如果为false ,则应手动启动RedisStreamMessageProducer messageProducer.start() 。 |
4 | 如果为false ,则不会自动确认收到的消息。消息的确认将被推迟到消费消息的客户端。默认值为true 。 |
5 | 如果为true ,则将创建一个消费者组。在创建消费者组时,也将创建流(如果尚不存在)。消费者组跟踪消息传递并区分消费者。默认值为false 。 |
6 | 设置消费者组名称。默认值为定义的 Bean 名称。 |
7 | 设置消费者名称。从组my-group 中以my-consumer 的身份读取消息。 |
8 | 要将来自此端点的消息发送到的消息通道。 |
9 | 定义读取消息的偏移量。默认值为ReadOffset.latest() 。 |
10 | 如果为true ,则通道适配器将从Record 中提取有效负载值。否则,将使用整个Record 作为有效负载。默认值为true 。 |
如果autoAck
设置为false
,则 Redis 流中的Record
不会被 Redis 驱动程序自动确认,而是将一个IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK
头添加到要生产的消息中,其值为SimpleAcknowledgment
实例。目标集成流负责在基于此记录完成业务逻辑后调用其acknowledge()
回调。即使在反序列化期间发生异常并且配置了errorChannel
时,也需要类似的逻辑。因此,目标错误处理程序必须决定确认或拒绝此类失败的消息。除了IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK
之外,ReactiveRedisStreamMessageProducer
还会将这些头填充到要生产的消息中:RedisHeaders.STREAM_KEY
、RedisHeaders.STREAM_MESSAGE_ID
、RedisHeaders.CONSUMER_GROUP
和RedisHeaders.CONSUMER
。
从版本 5.5 开始,您可以在ReactiveRedisStreamMessageProducer
上显式配置StreamReceiver.StreamReceiverOptionsBuilder
选项,包括新引入的onErrorResume
函数,如果 Redis 流消费者在发生反序列化错误时应继续轮询,则需要此函数。默认函数将消息发送到错误通道(如果提供),并可能对失败的消息进行确认,如上所述。所有这些StreamReceiver.StreamReceiverOptionsBuilder
与外部提供的StreamReceiver.StreamReceiverOptions
相互排斥。
Redis 锁注册器
Spring Integration 4.0 引入了 `RedisLockRegistry`。某些组件(例如,聚合器和重新排序器)使用从 `LockRegistry` 实例获取的锁来确保一次只有一个线程操作一个组。`DefaultLockRegistry` 在单个组件内执行此功能。您现在可以在这些组件上配置外部锁注册器。当您将其与共享的 `MessageGroupStore` 一起使用时,您可以使用 `RedisLockRegistry` 在多个应用程序实例之间提供此功能,以便一次只有一个实例可以操作该组。
当锁被本地线程释放时,另一个本地线程通常可以立即获取该锁。如果锁被使用不同注册器实例的线程释放,则获取该锁可能需要长达 100 毫秒。
为了避免“挂起”锁(当服务器发生故障时),此注册器中的锁在默认情况下会在 60 秒后过期,但您可以在注册器上配置此值。锁通常保持的时间要短得多。
由于键可能会过期,尝试解锁过期的锁会导致抛出异常。但是,受此类锁保护的资源可能已被破坏,因此应将此类异常视为严重异常。您应该将过期时间设置为足够大的值以防止这种情况,但将其设置为足够低的值,以便在服务器故障后在合理的时间内恢复锁。 |
从 5.0 版本开始,`RedisLockRegistry` 实现 `ExpirableLockRegistry`,它会删除超过 `age` 时间且当前未锁定的最后获取的锁。
从 5.5.6 版本开始,`RedisLockRegistry` 支持通过 `RedisLockRegistry.setCacheCapacity()` 自动清理 `RedisLockRegistry.locks` 中 redisLocks 的缓存。有关更多信息,请参阅其 JavaDocs。
从 5.5.13 版本开始,`RedisLockRegistry` 公开了一个 `setRedisLockType(RedisLockType)` 选项,用于确定以哪种模式获取 Redis 锁。
-
`RedisLockType.SPIN_LOCK` - 锁是通过周期性循环(100 毫秒)获取的,循环检查是否可以获取锁。默认值。
-
`RedisLockType.PUB_SUB_LOCK` - 锁是通过 redis 发布-订阅订阅获取的。
发布-订阅模式是首选模式 - 减少了客户端 Redis 服务器之间的网络通信,并且性能更高 - 当订阅通知到另一个进程解锁时,锁会立即获取。但是,Redis 在主/从连接中不支持发布-订阅模式(例如在 AWS ElastiCache 环境中),因此选择繁忙循环模式作为默认模式,以使注册表在任何环境中都能正常工作。