JDBC 消息存储
Spring Integration 提供了两种 JDBC 特定的消息存储实现。JdbcMessageStore 适用于聚合器和认领单模式。JdbcChannelMessageStore 实现则为消息通道提供了更具针对性和可扩展性的实现。
请注意,你可以使用 JdbcMessageStore 来支持消息通道,但 JdbcChannelMessageStore 是为此目的而优化的。
从 5.0.11、5.1.2 版本开始,JdbcChannelMessageStore 的索引已得到优化。如果你的存储中有大型消息组,你可能希望修改索引。此外,PriorityChannel 的索引已被注释掉,因为除非你使用由 JDBC 支持的此类通道,否则不需要该索引。 |
当使用 OracleChannelMessageStoreQueryProvider 时,必须添加优先级通道索引,因为它包含在查询的提示中。 |
初始化数据库
在使用 JDBC 消息存储组件之前,你应该为目标数据库配置适当的对象。
Spring Integration 附带了一些可用于初始化数据库的示例脚本。在 spring-integration-jdbc JAR 文件中,你可以在 org.springframework.integration.jdbc 包中找到脚本。它为一系列常见数据库平台提供了创建和删除脚本示例。使用这些脚本的一种常见方法是在Spring JDBC 数据源初始化器中引用它们。请注意,这些脚本作为示例和所需表及列名的规范提供。你可能需要对其进行增强以用于生产环境(例如,添加索引声明)。
从版本 6.2 开始,JdbcMessageStore、JdbcChannelMessageStore、JdbcMetadataStore 和 DefaultLockRepository 实现了 SmartLifecycle,并在 start() 方法中对其各自的表执行 SELECT COUNT 查询,以确保目标数据库中存在所需的表(根据提供的表前缀)。如果所需的表不存在,应用程序上下文将无法启动。可以通过 setCheckDatabaseOnStart(false) 禁用此检查。
通用 JDBC 消息存储
JDBC 模块提供了一个由数据库支持的 Spring Integration MessageStore(在认领单模式中很重要)和 MessageGroupStore(在聚合器等有状态模式中很重要)的实现。JdbcMessageStore 实现了这两个接口,并且支持在 XML 中配置存储实例,示例如下
<int-jdbc:message-store id="messageStore" data-source="dataSource"/>
你可以指定一个 JdbcTemplate 而不是 DataSource。
以下示例显示了一些其他可选属性
<int-jdbc:message-store id="messageStore" data-source="dataSource" table-prefix="MY_INT_"/>
在前面的示例中,我们为存储生成的查询中的表名指定了一个前缀。表名默认前缀为 INT_。
支持消息通道
如果你打算使用 JDBC 支持消息通道,我们建议使用 JdbcChannelMessageStore 实现。它仅与消息通道一起使用。
支持的数据库
JdbcChannelMessageStore 使用数据库特定的 SQL 查询从数据库检索消息。因此,你必须在 JdbcChannelMessageStore 上设置 ChannelMessageStoreQueryProvider 属性。此 channelMessageStoreQueryProvider 为你指定的特定数据库提供 SQL 查询。Spring Integration 支持以下关系数据库
-
PostgreSQL
-
HSQLDB
-
MySQL
-
Oracle
-
Derby
-
H2
-
SqlServer
-
Sybase
-
DB2
如果你的数据库未列出,你可以实现 ChannelMessageStoreQueryProvider 接口并提供你自己的自定义查询。
版本 4.0 在表中添加了 MESSAGE_SEQUENCE 列,以确保即使在同一毫秒内存储消息时也能实现先进先出 (FIFO) 队列。
从版本 6.2 开始,ChannelMessageStoreQueryProvider 公开了 isSingleStatementForPoll 标志,其中 PostgresChannelMessageStoreQueryProvider 返回 true,并且其轮询查询现在基于单个 DELETE…RETURNING 语句。如果仅支持单个轮询语句,JdbcChannelMessageStore 会参考 isSingleStatementForPoll 选项并跳过单独的 DELETE 语句。
自定义消息插入
从版本 5.0 开始,通过重载 ChannelMessageStorePreparedStatementSetter 类,你可以在 JdbcChannelMessageStore 中提供自定义的消息插入实现。你可以使用它来设置不同的列,更改表结构或序列化策略。例如,你可以将其结构存储为 JSON 字符串,而不是默认序列化为 byte[]。
以下示例使用 setValues 的默认实现来存储常用列,并覆盖行为以将消息载荷存储为 varchar
public class JsonPreparedStatementSetter extends ChannelMessageStorePreparedStatementSetter {
@Override
public void setValues(PreparedStatement preparedStatement, Message<?> requestMessage,
Object groupId, String region, boolean priorityEnabled) throws SQLException {
// Populate common columns
super.setValues(preparedStatement, requestMessage, groupId, region, priorityEnabled);
// Store message payload as varchar
preparedStatement.setString(6, requestMessage.getPayload().toString());
}
}
|
通常,我们不建议使用关系数据库作为队列。相反,如果可能,请考虑使用 JMS 或 AMQP 支持的通道。更多参考,请参阅以下资源 如果你仍然计划将数据库用作队列,请考虑使用 PostgreSQL 及其通知机制,这将在后续章节中介绍。 |
并发轮询
在轮询消息通道时,你可以选择使用 TaskExecutor 引用配置关联的 Poller。
|
但请记住,如果你使用由 JDBC 支持的消息通道,并且计划使用多线程以事务方式轮询通道和消息存储,你应该确保使用支持多版本并发控制 (MVCC) 的关系数据库。否则,锁定可能是一个问题,并且在使用多线程时的性能可能无法达到预期。例如,Apache Derby 在这方面存在问题。 为了获得更好的 JDBC 队列吞吐量,并避免在不同线程可能从队列轮询同一
|
优先级通道
从版本 4.0 开始,JdbcChannelMessageStore 实现了 PriorityCapableChannelMessageStore 并提供了 priorityEnabled 选项,使其可以用作 priority-queue 实例的 message-store 引用。为此,INT_CHANNEL_MESSAGE 表包含一个 MESSAGE_PRIORITY 列来存储 PRIORITY 消息头的值。此外,一个新的 MESSAGE_SEQUENCE 列使我们能够实现健壮的先进先出 (FIFO) 轮询机制,即使在同一毫秒内存储具有相同优先级的多个消息时也是如此。消息从数据库中按 order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE 顺序轮询(选择)。
我们不建议将同一个 JdbcChannelMessageStore bean 用于优先级和非优先级队列通道,因为 priorityEnabled 选项应用于整个存储,并且无法为队列通道保留正确的 FIFO 队列语义。但是,可以使用同一个 INT_CHANNEL_MESSAGE 表(甚至 region)来支持两种 JdbcChannelMessageStore 类型。要配置这种情况,你可以像以下示例所示,从一个消息存储 bean 扩展另一个。 |
<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
<property name="dataSource" ref="dataSource"/>
<property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>
<int:channel id="queueChannel">
<int:queue message-store="channelStore"/>
</int:channel>
<bean id="priorityStore" parent="channelStore">
<property name="priorityEnabled" value="true"/>
</bean>
<int:channel id="priorityChannel">
<int:priority-queue message-store="priorityStore"/>
</int:channel>
分区消息存储
通常,将 JdbcMessageStore 用作一组应用程序或同一应用程序中节点的全局存储。为了提供一些防止命名冲突的保护并控制数据库元数据配置,消息存储允许以两种方式对表进行分区。一种方法是通过更改前缀来使用单独的表名(如前所述)。另一种方法是指定 region 名称来在单个表中对数据进行分区。第二种方法的一个重要用例是当 MessageStore 管理支持 Spring Integration 消息通道的持久化队列时。持久化通道的消息数据在存储中以通道名称为键。因此,如果通道名称不是全局唯一的,通道可能会获取并非为其准备的数据。为了避免这种危险,你可以使用消息存储 region 来为具有相同逻辑名称的不同物理通道分开数据。
PostgreSQL:接收推送通知
PostgreSQL 提供了一个监听和通知框架,用于在数据库表操作时接收推送通知。Spring Integration 利用此机制(从版本 6.0 开始)来允许在向 JdbcChannelMessageStore 添加新消息时接收推送通知。使用此功能时,必须定义一个数据库触发器,该触发器可以在 spring-integration-jdbc 模块中包含的 schema-postgresql.sql 文件的注释中找到。
推送通知通过 PostgresChannelMessageTableSubscriber 类接收,该类允许其订阅者在任何给定的 region 和 groupId 有新消息到达时接收回调。即使消息是在不同的 JVM 上追加到同一个数据库中,也能收到这些通知。PostgresSubscribableChannel 实现使用 PostgresChannelMessageTableSubscriber.Subscription 契约来从存储中拉取消息,作为对上述 PostgresChannelMessageTableSubscriber 通知的回应。
例如,可以按如下方式接收某些组的推送通知
@Bean
public JdbcChannelMessageStore messageStore(DataSource dataSource) {
JdbcChannelMessageStore messageStore = new JdbcChannelMessageStore(dataSource);
messageStore.setChannelMessageStoreQueryProvider(new PostgresChannelMessageStoreQueryProvider());
return messageStore;
}
@Bean
public PostgresChannelMessageTableSubscriber subscriber(
@Value("${spring.datasource.url}") String url,
@Value("${spring.datasource.username}") String username,
@Value("${spring.datasource.password}") String password) {
return new PostgresChannelMessageTableSubscriber(() ->
DriverManager.getConnection(url, username, password).unwrap(PgConnection.class));
}
@Bean
public PostgresSubscribableChannel channel(
PostgresChannelMessageTableSubscriber subscriber,
JdbcChannelMessageStore messageStore) {
return new PostgresSubscribableChannel(messageStore, "some group", subscriber);
}
事务支持
从版本 6.0.5 开始,在 PostgresSubscribableChannel 上指定 PlatformTransactionManager 将在事务中通知订阅者。订阅者中的异常将导致事务回滚并将消息放回消息存储中。事务支持默认不启用。
重试
从版本 6.0.5 开始,通过向 PostgresSubscribableChannel 提供 RetryTemplate,可以指定重试策略。默认情况下,不执行重试。
|
任何活动的 对于独占连接的这种需求,也建议一个 JVM 只运行一个 |