元数据存储

许多外部系统、服务或资源是非事务性的(例如 Twitter、RSS、文件系统),并且无法将数据标记为已读。此外,有时您可能需要在一些集成解决方案中实现企业集成模式 幂等接收器。为了实现此目标并存储端点在与外部系统进行下一次交互之前的某些先前状态,或者处理下一条消息,Spring Integration 提供了元数据存储组件,作为 org.springframework.integration.metadata.MetadataStore 接口的实现,具有通用的键值契约。

元数据存储旨在存储各种类型的通用元数据(例如,已处理的最后一个提要条目的发布日期),以帮助诸如提要适配器之类的组件处理重复项。如果组件没有直接提供对 MetadataStore 的引用,则查找元数据存储的算法如下:首先,在应用程序上下文中查找具有 metadataStore ID 的 bean。如果找到,则使用它。否则,创建一个新的 SimpleMetadataStore 实例,它是一个内存中的实现,仅在当前运行的应用程序上下文的生命周期内持久化元数据。这意味着,在重新启动后,您可能会遇到重复条目。

如果您需要在应用程序上下文重新启动之间持久化元数据,框架提供了以下持久性 MetadataStores

PropertiesPersistingMetadataStore 基于属性文件和 PropertiesPersister

默认情况下,它只在应用程序上下文正常关闭时持久化状态。它实现了 Flushable,因此您可以通过调用 flush() 来随意持久化状态。以下示例展示了如何使用 XML 配置 'PropertiesPersistingMetadataStore'。

<bean id="metadataStore"
    class="org.springframework.integration.metadata.PropertiesPersistingMetadataStore"/>

或者,您可以提供自己的 MetadataStore 接口实现(例如,JdbcMetadataStore),并在应用程序上下文中将其配置为 bean。

从 4.0 版本开始,SimpleMetadataStorePropertiesPersistingMetadataStoreRedisMetadataStore 实现了 ConcurrentMetadataStore。这些提供了原子更新,可以在多个组件或应用程序实例之间使用。

幂等接收器和元数据存储

元数据存储对于实现 EIP 幂等接收器 模式很有用,当需要过滤已处理的传入消息时,您可以丢弃它或在丢弃时执行其他逻辑。以下配置展示了如何执行此操作的示例。

<int:filter input-channel="serviceChannel"
			output-channel="idempotentServiceChannel"
			discard-channel="discardChannel"
			expression="@metadataStore.get(headers.businessKey) == null"/>

<int:publish-subscribe-channel id="idempotentServiceChannel"/>

<int:outbound-channel-adapter channel="idempotentServiceChannel"
                              expression="@metadataStore.put(headers.businessKey, '')"/>

<int:service-activator input-channel="idempotentServiceChannel" ref="service"/>

幂等条目的 value 可以是过期日期,在此日期之后,该条目应由一些计划的清理程序从元数据存储中删除。

MetadataStoreListener

一些元数据存储(目前只有 zookeeper)支持注册监听器以接收项目更改时的事件,如下例所示。

public interface MetadataStoreListener {

	void onAdd(String key, String value);

	void onRemove(String key, String oldValue);

	void onUpdate(String key, String newValue);
}

有关更多信息,请参阅 Javadoc。如果您只对部分事件感兴趣,可以对 MetadataStoreListenerAdapter 进行子类化。