JDBC 通道消息存储 JSON 序列化

7.0 版本引入了对 JdbcChannelMessageStore 的 JSON 序列化支持。默认情况下,Spring Integration 使用 Java 序列化将消息存储在数据库中。新的 JSON 序列化选项提供了一种替代的序列化机制。

安全注意事项: JSON 序列化将消息内容以文本形式存储在数据库中,这可能会暴露敏感数据。在使用 JSON 序列化生产环境之前,请确保适当的数据库访问控制、静态加密,并考虑您组织的数据保护要求。

配置

JSON (反)序列化有两个可用组件

  • JsonChannelMessageStorePreparedStatementSetter - 将消息序列化为 JSON

  • JsonMessageRowMapper - 从 JSON 反序列化消息

@Bean
public JdbcChannelMessageStore messageStore(DataSource dataSource) {
    JdbcChannelMessageStore store = new JdbcChannelMessageStore(dataSource);
    store.setChannelMessageStoreQueryProvider(
        new PostgresChannelMessageStoreQueryProvider());

    // Enable JSON serialization
    store.setPreparedStatementSetter(
        new JsonChannelMessageStorePreparedStatementSetter());
    store.setMessageRowMapper(
        new JsonMessageRowMapper("com.example"));

    return store;
}

字符串参数 ("com.example") 指定了用于反序列化的其他受信任包。这些包将添加到默认的受信任包中(请参阅受信任包部分)。出于安全考虑,只有受信任包中的类才能被反序列化。

数据库模式修改

JSON 序列化需要修改数据库模式。具有 BLOB/BYTEA 列类型的默认模式不能用于 JSON 序列化。

MESSAGE_CONTENT 列必须更改为可以存储 JSON 的基于文本的类型。

  • PostgreSQL

  • MySQL

  • H2

  • 其他数据库

对于 PostgreSQL,可以使用 JSONB 类型。

-- JSONB (enables JSON queries)
ALTER TABLE INT_CHANNEL_MESSAGE
ALTER COLUMN MESSAGE_CONTENT TYPE JSONB
USING MESSAGE_CONTENT::text::jsonb;

对于 MySQL,可以使用 JSON 类型。

-- JSON type (enables JSON functions)
ALTER TABLE INT_CHANNEL_MESSAGE
MODIFY COLUMN MESSAGE_CONTENT JSON;

对于 H2 数据库,可以使用 CLOB 类型。

ALTER TABLE INT_CHANNEL_MESSAGE
ALTER COLUMN MESSAGE_CONTENT CLOB;

对于任何支持大型文本列(CLOB、TEXT 等)的数据库,MESSAGE_CONTENT 列可以修改为适当的文本类型。

JSON 序列化的示例模式

以下示例演示如何为基于 JSON 的消息存储创建专用表。

  • PostgreSQL

  • MySQL

  • H2

CREATE TABLE JSON_CHANNEL_MESSAGE (
   MESSAGE_ID CHAR(36) NOT NULL,
   GROUP_KEY CHAR(36) NOT NULL,
   CREATED_DATE BIGINT NOT NULL,
   MESSAGE_PRIORITY BIGINT,
   MESSAGE_SEQUENCE BIGINT NOT NULL DEFAULT nextval('JSON_MESSAGE_SEQ'),
   MESSAGE_CONTENT JSONB, -- JSON message content
   REGION VARCHAR(100) NOT NULL,
   CONSTRAINT JSON_CHANNEL_MESSAGE_PK
      PRIMARY KEY (REGION, GROUP_KEY, CREATED_DATE, MESSAGE_SEQUENCE)
);
CREATE TABLE JSON_CHANNEL_MESSAGE (
   MESSAGE_ID CHAR(36) NOT NULL,
   GROUP_KEY CHAR(36) NOT NULL,
   CREATED_DATE BIGINT NOT NULL,
   MESSAGE_PRIORITY BIGINT,
   MESSAGE_SEQUENCE BIGINT NOT NULL AUTO_INCREMENT UNIQUE,
   MESSAGE_CONTENT JSON, -- JSON message content
   REGION VARCHAR(100) NOT NULL,
   CONSTRAINT JSON_CHANNEL_MESSAGE_PK
      PRIMARY KEY (REGION, GROUP_KEY, CREATED_DATE, MESSAGE_SEQUENCE)
);
CREATE TABLE JSON_CHANNEL_MESSAGE (
   MESSAGE_ID CHAR(36) NOT NULL,
   GROUP_KEY CHAR(36) NOT NULL,
   CREATED_DATE BIGINT NOT NULL,
   MESSAGE_PRIORITY BIGINT,
   MESSAGE_SEQUENCE BIGINT NOT NULL DEFAULT NEXT VALUE FOR JSON_MESSAGE_SEQ,
   MESSAGE_CONTENT CLOB, -- JSON message content
   REGION VARCHAR(100) NOT NULL,
   CONSTRAINT JSON_CHANNEL_MESSAGE_PK
      PRIMARY KEY (REGION, GROUP_KEY, CREATED_DATE, MESSAGE_SEQUENCE)
);

JSON 结构

使用基于 Jackson 的序列化时,消息使用 Jackson 的多态类型处理以以下 JSON 结构存储

{
  "@class": "org.springframework.messaging.support.GenericMessage",
  "payload": {
    "@class": "com.example.OrderMessage",
    "orderId": "ORDER-12345",
    "amount": 1299.99
  },
  "headers": {
    "@class": "java.util.HashMap",
    "priority": ["java.lang.String", "HIGH"],
    "id": ["java.util.UUID", "a1b2c3d4-..."],
    "timestamp": ["java.lang.Long", 1234567890]
  }
}

@class 属性提供正确反序列化多态类型所需的类型信息。

查询 JSON 内容(可选)

如果使用原生 JSON 列类型(PostgreSQL JSONB 或 MySQL JSON),则可以直接查询消息内容。

PostgreSQL JSONB 查询

-- Find messages by payload field
SELECT * FROM JSON_CHANNEL_MESSAGE
WHERE MESSAGE_CONTENT @> '{"payload": {"orderId": "ORDER-12345"}}';

-- Find high-priority messages
SELECT * FROM JSON_CHANNEL_MESSAGE
WHERE MESSAGE_CONTENT -> 'headers' @> '{"priority": ["java.lang.String", "HIGH"]}';

MySQL JSON 函数

-- Find messages by payload field
SELECT * FROM JSON_CHANNEL_MESSAGE
WHERE JSON_EXTRACT(MESSAGE_CONTENT, '$.payload.orderId') = 'ORDER-12345';

-- Find high-priority messages
SELECT * FROM JSON_CHANNEL_MESSAGE
WHERE JSON_EXTRACT(MESSAGE_CONTENT, '$.headers.priority[1]') = 'HIGH';

如果使用 TEXTCLOB 列类型,则这些 JSON 特定的查询不可用。但是,JSON 序列化仍然可以通过 Spring Integration 进行存储和检索。

受信任的包

JacksonMessagingUtils.messagingAwareMapper() 根据受信任的包列表验证所有反序列化的类,以防止安全漏洞。

默认受信任的包包括:- java.util - java.lang - org.springframework.messaging.support - org.springframework.integration.support - org.springframework.integration.message - org.springframework.integration.store - org.springframework.integration.history - org.springframework.integration.handler

可以在构造函数中指定其他包并附加到此列表

// Trust additional packages for custom payload types
new JsonMessageRowMapper("com.example.orders", "com.example.payments")

自定义 JsonObjectMapper

对于高级场景,可以提供自定义的 JsonObjectMapper

import org.springframework.integration.support.json.JacksonJsonObjectMapper;
import org.springframework.integration.support.json.JacksonMessagingUtils;
import tools.jackson.databind.ObjectMapper;
import tools.jackson.databind.SerializationFeature;

@Bean
public JdbcChannelMessageStore messageStore(DataSource dataSource) {
    ObjectMapper customMapper = JacksonMessagingUtils.messagingAwareMapper("com.example");
    customMapper.enable(SerializationFeature.INDENT_OUTPUT);
    customMapper.registerModule(new CustomModule());

    JacksonJsonObjectMapper jsonObjectMapper = new JacksonJsonObjectMapper(customMapper);

    JdbcChannelMessageStore store = new JdbcChannelMessageStore(dataSource);
    store.setPreparedStatementSetter(
        new JsonChannelMessageStorePreparedStatementSetter(jsonObjectMapper));
    store.setMessageRowMapper(
        new JsonMessageRowMapper(jsonObjectMapper));

    return store;
}

自定义 JsonObjectMapper 应该针对 Spring Integration 消息序列化进行适当配置。建议从 JacksonMessagingUtils.messagingAwareMapper() 开始并在此基础上进行定制。在 JsonChannelMessageStorePreparedStatementSetterJsonMessageRowMapper 中必须使用相同的配置以实现一致的序列化和反序列化。

© . This site is unofficial and not affiliated with VMware.