Redis 支持

Spring Integration 2.1 引入了对 Redis 的支持:“一个开源的高级键值存储”。这种支持以基于 Redis 的 MessageStore 以及通过 Redis 的 PUBLISHSUBSCRIBEUNSUBSCRIBE 命令支持的发布/订阅消息适配器形式提供。

您需要在项目中包含此依赖

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-redis</artifactId>
    <version>6.4.4</version>
</dependency>
compile "org.springframework.integration:spring-integration-redis:6.4.4"

您还需要包含 Redis 客户端依赖,例如 Lettuce。

要下载、安装和运行 Redis,请参阅 Redis 文档

连接到 Redis

要开始与 Redis 交互,首先需要连接到它。Spring Integration 使用另一个 Spring 项目提供的支持,即 Spring Data Redis,它提供了典型的 Spring 构造:ConnectionFactoryTemplate。这些抽象简化了与多种 Redis 客户端 Java API 的集成。目前,Spring Data Redis 支持 JedisLettuce

使用 RedisConnectionFactory

要连接到 Redis,您可以使用 RedisConnectionFactory 接口的其中一个实现。以下清单显示了接口定义

public interface RedisConnectionFactory extends PersistenceExceptionTranslator {

    /**
     * Provides a suitable connection for interacting with Redis.
     * @return connection for interacting with Redis.
     */
    RedisConnection getConnection();
}

以下示例展示了如何在 Java 中创建 LettuceConnectionFactory

LettuceConnectionFactory cf = new LettuceConnectionFactory();
cf.afterPropertiesSet();

以下示例展示了如何在 Spring XML 配置中创建 LettuceConnectionFactory

<bean id="redisConnectionFactory"
    class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
    <property name="port" value="7379" />
</bean>

RedisConnectionFactory 的实现提供了一组属性,例如端口和主机,您可以根据需要进行设置。一旦拥有 RedisConnectionFactory 实例,就可以创建一个 RedisTemplate 实例,并通过 RedisConnectionFactory 注入它。

使用 RedisTemplate

与 Spring 中的其他模板类(例如 JdbcTemplateJmsTemplate)一样,RedisTemplate 是一个帮助类,简化了 Redis 数据访问代码。有关 RedisTemplate 及其变体(例如 StringRedisTemplate)的更多信息,请参阅 Spring Data Redis 文档

以下示例展示了如何在 Java 中创建 RedisTemplate 实例

RedisTemplate rt = new RedisTemplate<String, Object>();
rt.setConnectionFactory(redisConnectionFactory);

以下示例展示了如何在 Spring XML 配置中创建 RedisTemplate 实例

<bean id="redisTemplate"
         class="org.springframework.data.redis.core.RedisTemplate">
    <property name="connectionFactory" ref="redisConnectionFactory"/>
</bean>

使用 Redis 进行消息处理

引言中所述,Redis 通过其 PUBLISHSUBSCRIBEUNSUBSCRIBE 命令提供了发布/订阅消息支持。与 JMS 和 AMQP 一样,Spring Integration 提供了消息通道和适配器,用于通过 Redis 发送和接收消息。

Redis 发布/订阅通道

与 JMS 类似,在某些情况下,生产者和消费者都旨在成为同一应用程序的一部分,并在同一进程中运行。您可以通过使用一对入站和出站通道适配器来实现这一点。然而,与 Spring Integration 的 JMS 支持一样,有一种更简单的方法来解决此用例。您可以创建一个发布/订阅通道,如下例所示

<int-redis:publish-subscribe-channel id="redisChannel" topic-name="si.test.topic"/>

publish-subscribe-channel 的行为与主 Spring Integration 命名空间中的普通 <publish-subscribe-channel/> 元素非常相似。它可以通过任何端点的 input-channeloutput-channel 属性来引用。不同之处在于,此通道由 Redis 主题名称支持:一个由 topic-name 属性指定的 String 值。然而,与 JMS 不同,此主题不必提前创建,甚至不必由 Redis 自动创建。在 Redis 中,主题是简单的 String 值,扮演地址的角色。生产者和消费者可以使用相同的主题名称 String 值进行通信。简单订阅此通道意味着在生产者和消费者端点之间可以进行异步发布/订阅消息传递。然而,与在简单的 Spring Integration <channel/> 元素内添加 <queue/> 元素创建的异步消息通道不同,消息不会存储在内存队列中。相反,这些消息会通过 Redis 传递,这使您可以依赖其对持久化和集群的支持以及与非 Java 平台的互操作性。

Redis 入站通道适配器

Redis 入站通道适配器 (RedisInboundChannelAdapter) 以与其他入站适配器相同的方式将传入的 Redis 消息转换为 Spring 消息。它接收特定于平台的消息(在本例中为 Redis),并使用 MessageConverter 策略将它们转换为 Spring 消息。以下示例展示了如何配置 Redis 入站通道适配器

<int-redis:inbound-channel-adapter id="redisAdapter"
       topics="thing1, thing2"
       channel="receiveChannel"
       error-channel="testErrorChannel"
       message-converter="testConverter" />

<bean id="redisConnectionFactory"
    class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
    <property name="port" value="7379" />
</bean>

<bean id="testConverter" class="things.something.SampleMessageConverter" />

前面的示例展示了 Redis 入站通道适配器简单但完整的配置。请注意,前面的配置依赖于 Spring 熟悉的自动发现某些 bean 的范例。在本例中,redisConnectionFactory 会隐式注入到适配器中。您可以通过使用 connection-factory 属性来显式指定它。

另外请注意,前面的配置将适配器注入了自定义的 MessageConverter。这种方法类似于 JMS,其中使用 MessageConverter 实例在 Redis 消息和 Spring Integration 消息载荷之间进行转换。默认是一个 SimpleMessageConverter

入站适配器可以订阅多个主题名称,因此 topics 属性中的值是用逗号分隔的集合。

从 3.0 版本开始,入站适配器除了现有的 topics 属性外,现在还有 topic-patterns 属性。此属性包含一组用逗号分隔的 Redis 主题模式。有关 Redis 发布/订阅的更多信息,请参阅 Redis Pub/Sub

入站适配器可以使用 RedisSerializer 来反序列化 Redis 消息体。<int-redis:inbound-channel-adapter>serializer 属性可以设置为空字符串,这将导致 RedisSerializer 属性的值为 null。在这种情况下,Redis 消息的原始 byte[] 消息体将作为消息载荷提供。

从 5.0 版本开始,您可以通过使用 <int-redis:inbound-channel-adapter>task-executor 属性为入站适配器提供 Executor 实例。此外,接收到的 Spring Integration 消息现在具有 RedisHeaders.MESSAGE_SOURCE 头部,以指示发布消息的来源:主题或模式。您可以在下游使用它进行路由逻辑。

Redis 出站通道适配器

Redis 出站通道适配器以与其他出站适配器相同的方式将传出的 Spring Integration 消息转换为 Redis 消息。它接收 Spring Integration 消息,并使用 MessageConverter 策略将它们转换为特定于平台的消息(在本例中为 Redis)。以下示例展示了如何配置 Redis 出站通道适配器

<int-redis:outbound-channel-adapter id="outboundAdapter"
    channel="sendChannel"
    topic="thing1"
    message-converter="testConverter"/>

<bean id="redisConnectionFactory"
    class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
    <property name="port" value="7379"/>
</bean>

<bean id="testConverter" class="things.something.SampleMessageConverter" />

此配置与 Redis 入站通道适配器类似。适配器隐式注入了 RedisConnectionFactory,其 bean 名称定义为 redisConnectionFactory。此示例还包括可选(和自定义)的 MessageConverter(即 testConverter bean)。

从 Spring Integration 3.0 开始,<int-redis:outbound-channel-adapter> 提供了一个替代 topic 属性的选项:您可以使用 topic-expression 属性在运行时确定消息的 Redis 主题。这些属性是互斥的。

Redis 队列入站通道适配器

Spring Integration 3.0 引入了队列入站通道适配器,用于从 Redis 列表“弹出”消息。默认情况下,它使用“右弹出”,但您可以配置它使用“左弹出”。该适配器是消息驱动的。它使用内部监听器线程,不使用轮询器。

以下列表显示了 queue-inbound-channel-adapter 所有可用的属性

<int-redis:queue-inbound-channel-adapter id=""  (1)
                    channel=""  (2)
                    auto-startup=""  (3)
                    phase=""  (4)
                    connection-factory=""  (5)
                    queue=""  (6)
                    error-channel=""  (7)
                    serializer=""  (8)
                    receive-timeout=""  (9)
                    recovery-interval=""  (10)
                    expect-message=""  (11)
                    task-executor=""  (12)
                    right-pop=""/>  (13)
1 组件 bean 名称。如果您未提供 channel 属性,则会在应用程序上下文中创建一个 DirectChannel 并使用此 id 属性作为 bean 名称进行注册。在这种情况下,端点本身将使用 bean 名称 id 加上 .adapter 进行注册。(如果 bean 名称是 thing1,则端点将注册为 thing1.adapter。)
2 该端点将 Message 实例发送到的 MessageChannel
3 一个 SmartLifecycle 属性,用于指定该端点是否应在应用程序上下文启动后自动启动。默认为 true
4 一个 SmartLifecycle 属性,用于指定该端点启动的阶段。默认为 0
5 RedisConnectionFactory bean 的引用。默认为 redisConnectionFactory
6 执行基于队列的 'pop' 操作以获取 Redis 消息的 Redis 列表的名称。
7 当从端点的监听任务接收到异常时,用于发送 ErrorMessage 实例的 MessageChannel。默认情况下,底层 MessagePublishingErrorHandler 使用应用程序上下文中的默认 errorChannel
8 RedisSerializer bean 引用。它可以是一个空字符串,表示“无序列化器”。在这种情况下,入站 Redis 消息中的原始 byte[] 将作为 Message 载荷发送到 channel。默认情况下,它是 JdkSerializationRedisSerializer
9 'pop' 操作等待从队列中获取 Redis 消息的超时时间(毫秒)。默认为 1 秒。
10 在 'pop' 操作发生异常后,监听任务在重启之前应休眠的时间(毫秒)。
11 指定此端点是否期望 Redis 队列中的数据包含完整的 Message 实例。如果此属性设置为 true,则 serializer 不能是空字符串,因为消息需要某种形式的反序列化(默认为 JDK 序列化)。默认值为 false
12 对 Spring TaskExecutor(或标准 JDK 1.5+ Executor)bean 的引用。它用于底层的监听任务。默认为 SimpleAsyncTaskExecutor
13 指定此端点应使用“右弹出”(当 true 时)还是“左弹出”(当 false 时)从 Redis 列表中读取消息。如果为 true,则与默认的 Redis 队列出站通道适配器一起使用时,Redis List 充当 FIFO 队列。将其设置为 false 以与使用“右推入”写入列表的软件一起使用或实现栈式消息顺序。默认值为 true。从版本 4.3 开始。
task-executor 必须配置多个线程进行处理;否则,当 RedisQueueMessageDrivenEndpoint 尝试在错误后重新启动监听任务时,可能会发生死锁。可以使用 errorChannel 处理这些错误以避免重启,但最好不要让应用程序暴露在可能的死锁情况中。有关可能的 TaskExecutor 实现,请参阅 Spring Framework 参考手册

Redis 队列出站通道适配器

Spring Integration 3.0 引入了队列出站通道适配器,用于将 Spring Integration 消息“推入”到 Redis 列表。默认情况下,它使用“左推入”,但您可以配置它使用“右推入”。以下列表显示了 Redis queue-outbound-channel-adapter 所有可用的属性

<int-redis:queue-outbound-channel-adapter id=""  (1)
                    channel=""  (2)
                    connection-factory=""  (3)
                    queue=""  (4)
                    queue-expression=""  (5)
                    serializer=""  (6)
                    extract-payload=""  (7)
                    left-push=""/>  (8)
1 组件 bean 名称。如果您未提供 channel 属性,则会在应用程序上下文中创建一个 DirectChannel 并使用此 id 属性作为 bean 名称进行注册。在这种情况下,端点将使用 bean 名称 id 加上 .adapter 进行注册。(如果 bean 名称是 thing1,则端点将注册为 thing1.adapter。)
2 该端点从中接收 Message 实例的 MessageChannel
3 RedisConnectionFactory bean 的引用。默认为 redisConnectionFactory
4 执行基于队列的 'push' 操作以发送 Redis 消息的 Redis 列表的名称。此属性与 queue-expression 互斥。
5 一个 SpEL Expression,用于确定 Redis 列表的名称。它在运行时使用传入的 Message 作为 #root 变量。此属性与 queue 互斥。
6 一个 RedisSerializer bean 引用。默认为 JdkSerializationRedisSerializer。但是,对于 String 载荷,如果未提供 serializer 引用,则使用 StringRedisSerializer
7 指定此端点应将仅载荷还是完整的 Message 发送到 Redis 队列。默认值为 true
8 指定此端点应使用“左推入”(当 true 时)还是“右推入”(当 false 时)将消息写入 Redis 列表。如果为 true,则与默认的 Redis 队列入站通道适配器一起使用时,Redis 列表充当 FIFO 队列。将其设置为 false 以与使用“左弹出”从列表读取的软件一起使用或实现栈式消息顺序。默认值为 true。从版本 4.3 开始。

Redis 应用事件

从 Spring Integration 3.0 开始,Redis 模块提供了 IntegrationEvent 的实现,后者又是一个 org.springframework.context.ApplicationEventRedisExceptionEvent 封装了 Redis 操作中的异常(端点是事件的“来源”)。例如,<int-redis:queue-inbound-channel-adapter/> 在捕获 BoundListOperations.rightPop 操作的异常后会发出这些事件。异常可以是任何通用的 org.springframework.data.redis.RedisSystemExceptionorg.springframework.data.redis.RedisConnectionFailureException。使用 <int-event:inbound-channel-adapter/> 处理这些事件对于确定后台 Redis 任务的问题和采取管理操作非常有用。

Redis 消息存储

正如《企业集成模式》(EIP) 一书中所述,消息存储使您可以持久化消息。当处理具有消息缓冲能力(聚合器、重排序器等)的组件且关注可靠性时,这非常有用。在 Spring Integration 中,MessageStore 策略也为 存根(Claim Check)模式提供了基础,该模式也在 EIP 中有所描述。

Spring Integration 的 Redis 模块提供了 RedisMessageStore。以下示例展示了如何将其与聚合器一起使用

<bean id="redisMessageStore" class="o.s.i.redis.store.RedisMessageStore">
    <constructor-arg ref="redisConnectionFactory"/>
</bean>

<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
         message-store="redisMessageStore"/>

前面的示例是一个 bean 配置,它期望 RedisConnectionFactory 作为构造函数参数。

默认情况下,RedisMessageStore 使用 Java 序列化来序列化消息。然而,如果您想使用不同的序列化技术(例如 JSON),您可以通过设置 RedisMessageStorevalueSerializer 属性来提供您自己的序列化器。

从 4.3.10 版本开始,框架提供了 Message 实例和 MessageHeaders 实例的 Jackson 序列化器和反序列化器实现,分别是 MessageJacksonDeserializerMessageHeadersJacksonSerializer。它们需要使用 SimpleModule 选项配置 ObjectMapper。此外,您应该在 ObjectMapper 上设置 enableDefaultTyping,为每个序列化的复杂对象添加类型信息(如果您信任来源)。然后,该类型信息将在反序列化期间使用。框架提供了一个名为 JacksonJsonUtils.messagingAwareMapper() 的实用方法,该方法已经提供了所有前面提到的属性和序列化器。该实用方法带有 trustedPackages 参数,用于限制 Java 包的反序列化范围,以避免安全漏洞。默认的受信任包包括:java.utiljava.langorg.springframework.messaging.supportorg.springframework.integration.supportorg.springframework.integration.messageorg.springframework.integration.store。要在 RedisMessageStore 中管理 JSON 序列化,您必须按照类似于以下示例的方式进行配置

RedisMessageStore store = new RedisMessageStore(redisConnectionFactory);
ObjectMapper mapper = JacksonJsonUtils.messagingAwareMapper();
RedisSerializer<Object> serializer = new GenericJackson2JsonRedisSerializer(mapper);
store.setValueSerializer(serializer);

从 4.3.12 版本开始,RedisMessageStore 支持 prefix 选项,以允许在同一 Redis 服务器上区分不同的存储实例。

Redis 通道消息存储

前面展示的 RedisMessageStore 将每个组维护为一个单独键(组 ID)下的值。虽然您可以使用它来支持 QueueChannel 进行持久化,但为此目的提供了一个专门的 RedisChannelMessageStore(从 4.0 版本开始)。此存储为每个通道使用一个 LIST,发送消息时使用 LPUSH,接收消息时使用 RPOP。默认情况下,此存储也使用 JDK 序列化,但您可以修改值序列化器,如前面所述

我们建议使用此存储来支持通道,而不是使用通用的 RedisMessageStore。以下示例定义了一个 Redis 消息存储,并在带有队列的通道中使用了它

<bean id="redisMessageStore" class="o.s.i.redis.store.RedisChannelMessageStore">
	<constructor-arg ref="redisConnectionFactory"/>
</bean>

<int:channel id="somePersistentQueueChannel">
    <int:queue message-store="redisMessageStore"/>
<int:channel>

用于存储数据的键采用以下形式:<storeBeanName>:<channelId>(在前面的示例中为 redisMessageStore:somePersistentQueueChannel)。

此外,还提供了一个子类 RedisChannelPriorityMessageStore。当您将其与 QueueChannel 一起使用时,消息会按照(FIFO)优先级顺序接收。它使用标准的 IntegrationMessageHeaderAccessor.PRIORITY 头部,并支持优先级值(0 - 9)。具有其他优先级(以及没有优先级)的消息在所有具有优先级的消息之后按 FIFO 顺序检索。

这些存储仅实现了 BasicMessageGroupStore,而未实现 MessageGroupStore。它们只能用于支持 QueueChannel 等情况。

Redis 元数据存储

Spring Integration 3.0 引入了一个新的基于 Redis 的 MetadataStore(参见 Metadata Store)实现。您可以使用 RedisMetadataStore 在应用程序重启后维护 MetadataStore 的状态。您可以将此新的 MetadataStore 实现与以下适配器一起使用:

要指示这些适配器使用新的 RedisMetadataStore,请声明一个名为 metadataStore 的 Spring Bean。Feed 入站通道适配器和 feed 入站通道适配器都会自动获取并使用声明的 RedisMetadataStore。以下示例展示了如何声明这样的 Bean:

<bean name="metadataStore" class="o.s.i.redis.store.metadata.RedisMetadataStore">
    <constructor-arg name="connectionFactory" ref="redisConnectionFactory"/>
</bean>

RedisMetadataStoreRedisProperties 提供支持。与之交互使用 BoundHashOperations,这反过来要求整个 Properties 存储需要一个 key。对于 MetadataStore,这个 key 扮演着区域(region)的角色,这在分布式环境中,当多个应用程序使用同一个 Redis 服务器时非常有用。默认情况下,这个 key 的值为 MetaData

从版本 4.0 开始,这个存储实现了 ConcurrentMetadataStore,使其能够在多个应用程序实例之间可靠地共享,其中只有一个实例被允许存储或修改键的值。

您不能在 Redis 集群中使用 RedisMetadataStore.replace()(例如,在 AbstractPersistentAcceptOnceFileListFilter 中),因为目前不支持用于原子性的 WATCH 命令。

Redis Store 入站通道适配器

Redis 存储入站通道适配器是一个轮询消费者(polling consumer),它从 Redis 集合读取数据并将其作为 Message 载荷发送。以下示例展示了如何配置 Redis 存储入站通道适配器:

<int-redis:store-inbound-channel-adapter id="listAdapter"
    connection-factory="redisConnectionFactory"
    key="myCollection"
    channel="redisChannel"
    collection-type="LIST" >
    <int:poller fixed-rate="2000" max-messages-per-poll="10"/>
</int-redis:store-inbound-channel-adapter>

前面的示例展示了如何使用 store-inbound-channel-adapter 元素配置 Redis 存储入站通道适配器,并为各种属性提供值,例如:

  • keykey-expression:所使用的集合的键名。

  • collection-type:此适配器支持的集合类型的枚举。支持的集合类型有 LISTSETZSETPROPERTIESMAP

  • connection-factory:对 o.s.data.redis.connection.RedisConnectionFactory 实例的引用。

  • redis-template:对 o.s.data.redis.core.RedisTemplate 实例的引用。

  • 所有其他入站适配器通用的其他属性(例如 'channel')。

您不能同时设置 redis-templateconnection-factory

默认情况下,适配器使用 StringRedisTemplate。它对键、值、哈希键和哈希值使用 StringRedisSerializer 实例。如果您的 Redis 存储包含使用其他技术序列化的对象,则必须提供配置了适当序列化器的 RedisTemplate。例如,如果使用将其 extract-payload-elements 设置为 false 的 Redis 存储出站适配器写入存储,则必须按如下方式配置 RedisTemplate

<bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate">
    <property name="connectionFactory" ref="redisConnectionFactory"/>
    <property name="keySerializer">
        <bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/>
    </property>
    <property name="hashKeySerializer">
        <bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/>
    </property>
</bean>

RedisTemplate 对键和哈希键使用 String 序列化器,对值和哈希值使用默认的 JDK 序列化器。

由于 key 具有字面值,前面的示例相对简单和静态。有时,您可能需要在运行时根据某些条件更改键的值。为此,请改用 key-expression,其中提供的表达式可以是任何有效的 SpEL 表达式。

此外,您可能希望对从 Redis 集合读取的成功处理的数据执行一些后处理。例如,您可能希望在处理后移动或删除该值。您可以使用 Spring Integration 2.2 添加的事务同步功能来执行此操作。以下示例使用 key-expression 和事务同步:

<int-redis:store-inbound-channel-adapter id="zsetAdapterWithSingleScoreAndSynchronization"
        connection-factory="redisConnectionFactory"
        key-expression="'presidents'"
        channel="otherRedisChannel"
        auto-startup="false"
        collection-type="ZSET">
            <int:poller fixed-rate="1000" max-messages-per-poll="2">
                <int:transactional synchronization-factory="syncFactory"/>
            </int:poller>
</int-redis:store-inbound-channel-adapter>

<int:transaction-synchronization-factory id="syncFactory">
	<int:after-commit expression="payload.removeByScore(18, 18)"/>
</int:transaction-synchronization-factory>

<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager"/>

您可以使用 transactional 元素将轮询器声明为事务性的。此元素可以引用真实的事务管理器(例如,如果您的流程的某些其他部分调用 JDBC)。如果您没有“真实”事务,则可以使用 o.s.i.transaction.PseudoTransactionManager,它是 Spring 的 PlatformTransactionManager 的实现,并在没有实际事务时启用 Redis 适配器的事务同步功能。

这并不会使 Redis 活动本身具有事务性。它允许在成功(提交)之前或之后,或者在失败(回滚)之后进行操作的同步。

一旦您的轮询器是事务性的,您可以在 transactional 元素上设置一个 o.s.i.transaction.TransactionSynchronizationFactory 实例。TransactionSynchronizationFactory 创建 TransactionSynchronization 实例。为了方便起见,我们公开了一个默认的基于 SpEL 的 TransactionSynchronizationFactory,它允许您配置 SpEL 表达式,并将其执行与事务协调(同步)。支持 before-commit、after-commit 和 after-rollback 的表达式,以及发送评估结果(如果有)的通道(每种事件一个)。对于每个子元素,您可以指定 expressionchannel 属性。如果只存在 channel 属性,接收到的消息将作为特定同步场景的一部分发送到该通道。如果只存在 expression 属性并且表达式结果为非空值,则会生成一个以结果作为载荷的消息,并发送到默认通道(NullChannel)并在日志中显示(在 DEBUG 级别)。如果您希望评估结果发送到特定通道,请添加 channel 属性。如果表达式结果为 null 或 void,则不生成消息。

RedisStoreMessageSource 添加了一个 store 属性,其中包含绑定到事务 IntegrationResourceHolderRedisStore 实例,可以从 TransactionSynchronizationProcessor 实现中访问该实例。

有关事务同步的更多信息,请参阅 事务同步

RedisStore 出站通道适配器

RedisStore 出站通道适配器允许您将消息载荷写入 Redis 集合,如下面的示例所示:

<int-redis:store-outbound-channel-adapter id="redisListAdapter"
          collection-type="LIST"
          channel="requestChannel"
          key="myCollection" />

前面的配置通过使用 store-inbound-channel-adapter 元素配置了 Redis 存储出站通道适配器。它提供了各种属性的值,例如:

  • keykey-expression:所使用的集合的键名。

  • extract-payload-elements:如果设置为 true(默认值)且载荷是“多值”对象(即 CollectionMap)的实例,则使用“addAll”和“putAll”语义存储。否则,如果设置为 false,无论载荷的类型如何,都将作为单个条目存储。如果载荷不是“多值”对象的实例,则忽略此属性的值,载荷始终作为单个条目存储。

  • collection-type:此适配器支持的 Collection 类型枚举。支持的集合类型有 LISTSETZSETPROPERTIESMAP

  • map-key-expression:SpEL 表达式,返回存储条目的键名。它仅在 collection-typeMAPPROPERTIES 且 'extract-payload-elements' 为 false 时应用。

  • connection-factory:对 o.s.data.redis.connection.RedisConnectionFactory 实例的引用。

  • redis-template:对 o.s.data.redis.core.RedisTemplate 实例的引用。

  • 所有其他入站适配器通用的其他属性(例如 'channel')。

您不能同时设置 redis-templateconnection-factory
默认情况下,适配器使用 StringRedisTemplate。它对键、值、哈希键和哈希值使用 StringRedisSerializer 实例。但是,如果 extract-payload-elements 设置为 false,则将使用一个 RedisTemplate,该 RedisTemplate 对键和哈希键使用 StringRedisSerializer 实例,对值和哈希值使用 JdkSerializationRedisSerializer 实例。使用 JDK 序列化器时,重要的是要理解,无论值是否实际上是集合,都对所有值使用 Java 序列化。如果您需要对值的序列化进行更多控制,请考虑提供自己的 RedisTemplate,而不是依赖这些默认设置。

由于 key 和其他属性具有字面值,前面的示例相对简单和静态。有时,您可能需要在运行时根据某些条件动态更改值。为此,请使用它们的 -expression 等价物(key-expressionmap-key-expression 等),其中提供的表达式可以是任何有效的 SpEL 表达式。

Redis 出站命令网关

Spring Integration 4.0 引入了 Redis 命令网关,允许您使用通用的 RedisConnection#execute 方法执行任何标准 Redis 命令。以下列表显示了 Redis 出站网关的可用属性:

<int-redis:outbound-gateway
        request-channel=""  (1)
        reply-channel=""  (2)
        requires-reply=""  (3)
        reply-timeout=""  (4)
        connection-factory=""  (5)
        redis-template=""  (6)
        arguments-serializer=""  (7)
        command-expression=""  (8)
        argument-expressions=""  (9)
        use-command-variable=""  (10)
        arguments-strategy="" /> (11)
1 该端点从中接收 Message 实例的 MessageChannel
2 此端点发送回复 Message 实例的 MessageChannel
3 指定此出站网关是否必须返回非 null 值。默认为 true。当 Redis 返回 null 值时,会抛出 ReplyRequiredException
4 等待发送回复消息的超时时间(以毫秒为单位)。通常用于基于队列的有限回复通道。
5 RedisConnectionFactory Bean 的引用。默认为 redisConnectionFactory。它与 'redis-template' 属性互斥。
6 RedisTemplate Bean 的引用。它与 'connection-factory' 属性互斥。
7 org.springframework.data.redis.serializer.RedisSerializer 实例的引用。如有必要,它用于将每个命令参数序列化为 byte[]。
8 返回命令键的 SpEL 表达式。默认为 redis_command 消息头。它的评估结果不得为 null
9 逗号分隔的 SpEL 表达式,评估后作为命令参数。与 arguments-strategy 属性互斥。如果两个属性都不提供,则使用 payload 作为命令参数。参数表达式可以评估为 'null' 以支持可变数量的参数。
10 一个 boolean 标志,用于指定当配置了 argument-expressions 时,评估后的 Redis 命令字符串是否在 o.s.i.redis.outbound.ExpressionArgumentsStrategy 中的表达式评估上下文中作为 #cmd 变量可用。否则,此属性将被忽略。
11 o.s.i.redis.outbound.ArgumentsStrategy 实例的引用。它与 argument-expressions 属性互斥。如果两个属性都不提供,则使用 payload 作为命令参数。

您可以使用 <int-redis:outbound-gateway> 作为通用组件来执行任何所需的 Redis 操作。以下示例展示了如何从 Redis 原子数获取递增的值:

<int-redis:outbound-gateway request-channel="requestChannel"
    reply-channel="replyChannel"
    command-expression="'INCR'"/>

Message 载荷的名称应为 redisCounter,该名称可由 org.springframework.data.redis.support.atomic.RedisAtomicInteger bean 定义提供。

RedisConnection#execute 方法的返回类型是通用的 Object。实际结果取决于命令类型。例如,MGET 返回 List<byte[]>。有关命令、其参数和结果类型的更多信息,请参阅 Redis 规范

Redis 队列出站网关

Spring Integration 引入了 Redis 队列出站网关,用于执行请求和回复场景。它将对话 UUID 推送到提供的 queue,将以该 UUID 为键的值推送到 Redis 列表,并等待来自键为 UUID 加上 .reply 的 Redis 列表的回复。每次交互使用不同的 UUID。以下列表显示了 Redis 出站网关的可用属性:

<int-redis:queue-outbound-gateway
        request-channel=""  (1)
        reply-channel=""  (2)
        requires-reply=""  (3)
        reply-timeout=""  (4)
        connection-factory=""  (5)
        queue=""  (6)
        order=""  (7)
        serializer=""  (8)
        extract-payload=""/>  (9)
1 该端点从中接收 Message 实例的 MessageChannel
2 此端点发送回复 Message 实例的 MessageChannel
3 指定此出站网关是否必须返回非 null 值。此值默认为 false。否则,当 Redis 返回 null 值时,会抛出 ReplyRequiredException
4 等待发送回复消息的超时时间(以毫秒为单位)。通常用于基于队列的有限回复通道。
5 RedisConnectionFactory Bean 的引用。默认为 redisConnectionFactory。它与 'redis-template' 属性互斥。
6 出站网关向其发送对话 UUID 的 Redis 列表的名称。
7 当注册多个网关时,此出站网关的顺序。
8 RedisSerializer Bean 引用。它可以是空字符串,表示“没有序列化器”。在这种情况下,来自入站 Redis 消息的原始 byte[] 作为 Message 载荷发送到 channel。默认情况下,它是 JdkSerializationRedisSerializer
9 指定此端点是否期望 Redis 队列中的数据包含完整的 Message 实例。如果此属性设置为 true,则 serializer 不能是空字符串,因为消息需要某种形式的反序列化(默认为 JDK 序列化)。

Redis 队列入站网关

Spring Integration 4.1 引入了 Redis 队列入站网关,用于执行请求和回复场景。它从提供的 queue 中弹出一个对话 UUID,从 Redis 列表中弹出以该 UUID 为键的值,并将回复推送到键为 UUID 加上 .reply 的 Redis 列表。以下列表显示了 Redis 队列入站网关的可用属性:

<int-redis:queue-inbound-gateway
        request-channel=""  (1)
        reply-channel=""  (2)
        executor=""  (3)
        reply-timeout=""  (4)
        connection-factory=""  (5)
        queue=""  (6)
        order=""  (7)
        serializer=""  (8)
        receive-timeout=""  (9)
        expect-message=""  (10)
        recovery-interval=""/>  (11)
1 此端点向其发送从 Redis 数据创建的 Message 实例的 MessageChannel
2 此端点等待回复 Message 实例的 MessageChannel。可选 - replyChannel 消息头仍然在使用。
3 对 Spring TaskExecutor(或标准 JDK Executor)Bean 的引用。它用于底层的监听任务。默认为 SimpleAsyncTaskExecutor
4 等待发送回复消息的超时时间(以毫秒为单位)。通常用于基于队列的有限回复通道。
5 RedisConnectionFactory Bean 的引用。默认为 redisConnectionFactory。它与 'redis-template' 属性互斥。
6 用于对话 UUID 的 Redis 列表的名称。
7 当注册多个网关时,此入站网关的顺序。
8 RedisSerializer Bean 引用。它可以是空字符串,表示“没有序列化器”。在这种情况下,来自入站 Redis 消息的原始 byte[] 作为 Message 载荷发送到 channel。默认为 JdkSerializationRedisSerializer。(注意,在版本 4.3 之前的版本中,它默认为 StringRedisSerializer。要恢复该行为,请提供对 StringRedisSerializer 的引用。)
9 等待接收消息被获取的超时时间(以毫秒为单位)。通常用于基于队列的有限请求通道。
10 指定此端点是否期望 Redis 队列中的数据包含完整的 Message 实例。如果此属性设置为 true,则 serializer 不能是空字符串,因为消息需要某种形式的反序列化(默认为 JDK 序列化)。
11 监听任务在“right pop”操作出现异常后,在重新启动监听任务之前应该睡眠的时间(以毫秒为单位)。
task-executor 必须配置多个线程进行处理;否则,当 RedisQueueMessageDrivenEndpoint 尝试在错误后重新启动监听任务时,可能会发生死锁。可以使用 errorChannel 处理这些错误以避免重启,但最好不要让应用程序暴露在可能的死锁情况中。有关可能的 TaskExecutor 实现,请参阅 Spring Framework 参考手册

Redis Stream 出站通道适配器

Spring Integration 5.4 引入了 Reactive Redis Stream 出站通道适配器,用于将消息载荷写入 Redis Stream。出站通道适配器使用 ReactiveStreamOperations.add(…​)Record 添加到流中。以下示例展示了如何为 Redis Stream 出站通道适配器使用 Java 配置和服务类。

@Bean
@ServiceActivator(inputChannel = "messageChannel")
public ReactiveRedisStreamMessageHandler reactiveValidatorMessageHandler(
        ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
    ReactiveRedisStreamMessageHandler reactiveStreamMessageHandler =
        new ReactiveRedisStreamMessageHandler(reactiveRedisConnectionFactory, "myStreamKey"); (1)
    reactiveStreamMessageHandler.setSerializationContext(serializationContext); (2)
    reactiveStreamMessageHandler.setHashMapper(hashMapper); (3)
    reactiveStreamMessageHandler.setExtractPayload(true); (4)
    return reactiveStreamMessageHandler;
}
1 使用 ReactiveRedisConnectionFactory 和流名称构造 ReactiveRedisStreamMessageHandler 实例以添加记录。另一个构造函数变体基于 SpEL 表达式,用于根据请求消息评估流键。
2 设置一个 RedisSerializationContext,用于在将记录键和值添加到流之前对其进行序列化。
3 设置 HashMapper,它提供 Java 类型与 Redis 哈希/映射之间的契约。
4 如果为 'true',通道适配器将从请求消息中提取载荷作为要添加的流记录。否则,将整个消息用作值。默认为 true

Redis Stream 入站通道适配器

Spring Integration 5.4 引入了 Reactive Stream 入站通道适配器,用于从 Redis Stream 读取消息。入站通道适配器根据自动确认标志使用 StreamReceiver.receive(…​)StreamReceiver.receiveAutoAck() 从 Redis Stream 读取记录。以下示例展示了如何为 Redis Stream 入站通道适配器使用 Java 配置。

@Bean
public ReactiveRedisStreamMessageProducer reactiveRedisStreamProducer(
       ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
ReactiveRedisStreamMessageProducer messageProducer =
            new ReactiveRedisStreamMessageProducer(reactiveRedisConnectionFactory, "myStreamKey"); (1)
    messageProducer.setStreamReceiverOptions( (2)
                StreamReceiver.StreamReceiverOptions.builder()
                      .pollTimeout(Duration.ofMillis(100))
                      .build());
    messageProducer.setAutoStartup(true); (3)
    messageProducer.setAutoAck(false); (4)
    messageProducer.setCreateConsumerGroup(true); (5)
    messageProducer.setConsumerGroup("my-group"); (6)
    messageProducer.setConsumerName("my-consumer"); (7)
    messageProducer.setOutputChannel(fromRedisStreamChannel); (8)
    messageProducer.setReadOffset(ReadOffset.latest()); (9)
    messageProducer.extractPayload(true); (10)
    return messageProducer;
}
1 使用 ReactiveRedisConnectionFactory 和流键构造 ReactiveRedisStreamMessageProducer 实例以读取记录。
2 一个 StreamReceiver.StreamReceiverOptions,用于使用反应式基础设施消费 Redis 流。
3 一个 SmartLifecycle 属性,用于指定此端点是否应在应用程序上下文启动后自动启动。默认为 true。如果为 false,则应手动启动 RedisStreamMessageProducermessageProducer.start()
4 如果为 false,则接收到的消息不会自动确认。消息的确认将延迟到消费消息的客户端。默认为 true
5 如果为 true,将创建一个消费者组。在创建消费者组期间,如果流尚不存在,也会创建流。消费者组跟踪消息传递并区分消费者。默认为 false
6 设置消费者组名称。默认为定义的 Bean 名称。
7 设置消费者名称。从组 my-groupmy-consumer 读取消息。
8 此端点将消息发送到的消息通道。
9 定义读取消息的偏移量。默认为 ReadOffset.latest()
10 如果为 'true',通道适配器将从 Record 中提取载荷值。否则,将整个 Record 用作载荷。默认为 true

如果 autoAck 设置为 false,Redis Stream 中的 Record 不会被 Redis 驱动程序自动确认,而是在要生成的消息中添加一个 IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK 消息头,其值为 SimpleAcknowledgment 实例。当基于该记录的消息的业务逻辑完成后,目标集成流负责调用其 acknowledge() 回调。即使在反序列化期间发生异常并且配置了 errorChannel 时,也需要类似的逻辑。因此,目标错误处理器必须决定是 ack 还是 nack 该失败消息。除了 IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACKReactiveRedisStreamMessageProducer 还会将这些消息头填充到要生成的消息中:RedisHeaders.STREAM_KEYRedisHeaders.STREAM_MESSAGE_IDRedisHeaders.CONSUMER_GROUPRedisHeaders.CONSUMER

从版本 5.5 开始,您可以在 ReactiveRedisStreamMessageProducer 上显式配置 StreamReceiver.StreamReceiverOptionsBuilder 选项,包括新引入的 onErrorResume 函数,如果在反序列化错误发生时 Redis Stream 消费者应继续轮询,则需要此函数。默认函数将消息发送到错误通道(如果提供),并可能对失败的消息进行确认,如上所述。所有这些 StreamReceiver.StreamReceiverOptionsBuilder 都与外部提供的 StreamReceiver.StreamReceiverOptions 互斥。

Redis 锁注册表

Spring Integration 4.0 引入了 RedisLockRegistry。某些组件(例如,聚合器和重排序器)使用从 LockRegistry 实例获取的锁,以确保一次只有一个线程操作一个组。DefaultLockRegistry 在单个组件内执行此功能。您现在可以在这些组件上配置外部锁注册表。当您将其与共享 MessageGroupStore 一起使用时,可以使用 RedisLockRegistry 在多个应用程序实例之间提供此功能,这样一次只有一个实例可以操作该组。

当本地线程释放锁时,另一个本地线程通常可以立即获取锁。如果使用不同注册表实例的线程释放锁,则可能需要长达 100ms 才能获取锁。

为避免“挂起”的锁(当服务器故障时),此注册表中的锁在默认 60 秒后过期,但您可以在注册表上配置此值。锁通常只持有更短的时间。

由于键可能会过期,尝试解锁已过期的锁会导致抛出异常。但是,受此类锁保护的资源可能已遭到破坏,因此应将此类异常视为严重情况。您应将过期时间设置得足够大以防止这种情况发生,但也要设置得足够低,以便在服务器故障后能在合理的时间内恢复锁。

从版本 5.0 开始,RedisLockRegistry 实现了 ExpirableLockRegistry,它会移除距离上次获取时间超过 age 且当前未被锁定的锁。

从版本 5.5.6 开始,RedisLockRegistry 支持通过 RedisLockRegistry.setCacheCapacity() 自动清理 RedisLockRegistry.locks 中 redis 锁的缓存。有关更多信息,请参阅其 JavaDocs。

从版本 5.5.13 开始,RedisLockRegistry 暴露了一个 setRedisLockType(RedisLockType) 选项,用于确定应以哪种模式进行 Redis 锁获取:

  • RedisLockType.SPIN_LOCK - 通过周期性循环(100ms)检查是否可以获取锁来获取锁。默认值。

  • RedisLockType.PUB_SUB_LOCK - 通过 Redis 发布/订阅(pub-sub)订阅来获取锁。

发布/订阅是首选模式 - 客户端与 Redis 服务器之间的网络通信更少,性能更高 - 当订阅在其他进程中收到解锁通知时,锁会立即获取。但是,Redis 在主/副本连接(例如在 AWS ElastiCache 环境中)中不支持发布/订阅,因此默认选择忙等待模式,以使注册表在任何环境中都能工作。

从版本 6.4 开始,RedisLockRegistry.RedisLock.unlock() 方法不再抛出 IllegalStateException,而是抛出 ConcurrentModificationException,如果锁的所有权已过期。

从版本 6.4 开始,添加了 RedisLockRegistry.setRenewalTaskScheduler() 以配置用于定期续订锁的调度器。设置后,锁成功获取后,将每隔过期时间的 1/3 自动续订,直到解锁或 Redis 键被移除。