JDBC 通道消息存储 JSON 序列化
7.0 版本引入了对 JdbcChannelMessageStore 的 JSON 序列化支持。默认情况下,Spring Integration 使用 Java 序列化将消息存储在数据库中。新的 JSON 序列化选项提供了一种替代的序列化机制。
|
安全注意事项: JSON 序列化将消息内容以文本形式存储在数据库中,这可能会暴露敏感数据。在使用 JSON 序列化生产环境之前,请确保适当的数据库访问控制、静态加密,并考虑您组织的数据保护要求。 |
配置
JSON (反)序列化有两个可用组件
-
JsonChannelMessageStorePreparedStatementSetter- 将消息序列化为 JSON -
JsonMessageRowMapper- 从 JSON 反序列化消息
@Bean
public JdbcChannelMessageStore messageStore(DataSource dataSource) {
JdbcChannelMessageStore store = new JdbcChannelMessageStore(dataSource);
store.setChannelMessageStoreQueryProvider(
new PostgresChannelMessageStoreQueryProvider());
// Enable JSON serialization
store.setPreparedStatementSetter(
new JsonChannelMessageStorePreparedStatementSetter());
store.setMessageRowMapper(
new JsonMessageRowMapper("com.example"));
return store;
}
字符串参数 ("com.example") 指定了用于反序列化的其他受信任包。这些包将添加到默认的受信任包中(请参阅受信任包部分)。出于安全考虑,只有受信任包中的类才能被反序列化。
数据库模式修改
|
JSON 序列化需要修改数据库模式。具有 |
MESSAGE_CONTENT 列必须更改为可以存储 JSON 的基于文本的类型。
-
PostgreSQL
-
MySQL
-
H2
-
其他数据库
对于 PostgreSQL,可以使用 JSONB 类型。
-- JSONB (enables JSON queries)
ALTER TABLE INT_CHANNEL_MESSAGE
ALTER COLUMN MESSAGE_CONTENT TYPE JSONB
USING MESSAGE_CONTENT::text::jsonb;
对于 MySQL,可以使用 JSON 类型。
-- JSON type (enables JSON functions)
ALTER TABLE INT_CHANNEL_MESSAGE
MODIFY COLUMN MESSAGE_CONTENT JSON;
对于 H2 数据库,可以使用 CLOB 类型。
ALTER TABLE INT_CHANNEL_MESSAGE
ALTER COLUMN MESSAGE_CONTENT CLOB;
对于任何支持大型文本列(CLOB、TEXT 等)的数据库,MESSAGE_CONTENT 列可以修改为适当的文本类型。
JSON 序列化的示例模式
以下示例演示如何为基于 JSON 的消息存储创建专用表。
-
PostgreSQL
-
MySQL
-
H2
CREATE TABLE JSON_CHANNEL_MESSAGE (
MESSAGE_ID CHAR(36) NOT NULL,
GROUP_KEY CHAR(36) NOT NULL,
CREATED_DATE BIGINT NOT NULL,
MESSAGE_PRIORITY BIGINT,
MESSAGE_SEQUENCE BIGINT NOT NULL DEFAULT nextval('JSON_MESSAGE_SEQ'),
MESSAGE_CONTENT JSONB, -- JSON message content
REGION VARCHAR(100) NOT NULL,
CONSTRAINT JSON_CHANNEL_MESSAGE_PK
PRIMARY KEY (REGION, GROUP_KEY, CREATED_DATE, MESSAGE_SEQUENCE)
);
CREATE TABLE JSON_CHANNEL_MESSAGE (
MESSAGE_ID CHAR(36) NOT NULL,
GROUP_KEY CHAR(36) NOT NULL,
CREATED_DATE BIGINT NOT NULL,
MESSAGE_PRIORITY BIGINT,
MESSAGE_SEQUENCE BIGINT NOT NULL AUTO_INCREMENT UNIQUE,
MESSAGE_CONTENT JSON, -- JSON message content
REGION VARCHAR(100) NOT NULL,
CONSTRAINT JSON_CHANNEL_MESSAGE_PK
PRIMARY KEY (REGION, GROUP_KEY, CREATED_DATE, MESSAGE_SEQUENCE)
);
CREATE TABLE JSON_CHANNEL_MESSAGE (
MESSAGE_ID CHAR(36) NOT NULL,
GROUP_KEY CHAR(36) NOT NULL,
CREATED_DATE BIGINT NOT NULL,
MESSAGE_PRIORITY BIGINT,
MESSAGE_SEQUENCE BIGINT NOT NULL DEFAULT NEXT VALUE FOR JSON_MESSAGE_SEQ,
MESSAGE_CONTENT CLOB, -- JSON message content
REGION VARCHAR(100) NOT NULL,
CONSTRAINT JSON_CHANNEL_MESSAGE_PK
PRIMARY KEY (REGION, GROUP_KEY, CREATED_DATE, MESSAGE_SEQUENCE)
);
JSON 结构
使用基于 Jackson 的序列化时,消息使用 Jackson 的多态类型处理以以下 JSON 结构存储
{
"@class": "org.springframework.messaging.support.GenericMessage",
"payload": {
"@class": "com.example.OrderMessage",
"orderId": "ORDER-12345",
"amount": 1299.99
},
"headers": {
"@class": "java.util.HashMap",
"priority": ["java.lang.String", "HIGH"],
"id": ["java.util.UUID", "a1b2c3d4-..."],
"timestamp": ["java.lang.Long", 1234567890]
}
}
@class 属性提供正确反序列化多态类型所需的类型信息。
查询 JSON 内容(可选)
如果使用原生 JSON 列类型(PostgreSQL JSONB 或 MySQL JSON),则可以直接查询消息内容。
PostgreSQL JSONB 查询
-- Find messages by payload field
SELECT * FROM JSON_CHANNEL_MESSAGE
WHERE MESSAGE_CONTENT @> '{"payload": {"orderId": "ORDER-12345"}}';
-- Find high-priority messages
SELECT * FROM JSON_CHANNEL_MESSAGE
WHERE MESSAGE_CONTENT -> 'headers' @> '{"priority": ["java.lang.String", "HIGH"]}';
MySQL JSON 函数
-- Find messages by payload field
SELECT * FROM JSON_CHANNEL_MESSAGE
WHERE JSON_EXTRACT(MESSAGE_CONTENT, '$.payload.orderId') = 'ORDER-12345';
-- Find high-priority messages
SELECT * FROM JSON_CHANNEL_MESSAGE
WHERE JSON_EXTRACT(MESSAGE_CONTENT, '$.headers.priority[1]') = 'HIGH';
|
如果使用 |
受信任的包
JacksonMessagingUtils.messagingAwareMapper() 根据受信任的包列表验证所有反序列化的类,以防止安全漏洞。
默认受信任的包包括:- java.util - java.lang - org.springframework.messaging.support - org.springframework.integration.support - org.springframework.integration.message - org.springframework.integration.store - org.springframework.integration.history - org.springframework.integration.handler
可以在构造函数中指定其他包并附加到此列表
// Trust additional packages for custom payload types
new JsonMessageRowMapper("com.example.orders", "com.example.payments")
自定义 JsonObjectMapper
对于高级场景,可以提供自定义的 JsonObjectMapper
import org.springframework.integration.support.json.JacksonJsonObjectMapper;
import org.springframework.integration.support.json.JacksonMessagingUtils;
import tools.jackson.databind.ObjectMapper;
import tools.jackson.databind.SerializationFeature;
@Bean
public JdbcChannelMessageStore messageStore(DataSource dataSource) {
ObjectMapper customMapper = JacksonMessagingUtils.messagingAwareMapper("com.example");
customMapper.enable(SerializationFeature.INDENT_OUTPUT);
customMapper.registerModule(new CustomModule());
JacksonJsonObjectMapper jsonObjectMapper = new JacksonJsonObjectMapper(customMapper);
JdbcChannelMessageStore store = new JdbcChannelMessageStore(dataSource);
store.setPreparedStatementSetter(
new JsonChannelMessageStorePreparedStatementSetter(jsonObjectMapper));
store.setMessageRowMapper(
new JsonMessageRowMapper(jsonObjectMapper));
return store;
}
|
自定义 |