使用应用事件
为了让应用模块之间尽可能地解耦,它们主要的交互方式应该是事件的发布和消费。这避免了发起模块了解所有可能感兴趣的各方,这是实现应用模块集成测试(参见 应用模块集成测试)的关键方面。
我们通常会发现应用组件的定义如下
-
Java
-
Kotlin
@Service
@RequiredArgsConstructor
public class OrderManagement {
private final InventoryManagement inventory;
@Transactional
public void complete(Order order) {
// State transition on the order aggregate go here
// Invoke related functionality
inventory.updateStockFor(order);
}
}
@Service
class OrderManagement(val inventory: InventoryManagement) {
@Transactional
fun complete(order: Order) {
inventory.updateStockFor(order)
}
}
complete(…)
方法会在某种程度上产生功能引力,它会吸引相关的功能,从而与在其他应用模块中定义的 Spring bean 进行交互。这尤其使得该组件更难测试,因为仅仅创建 OrderManagement
的实例就需要那些依赖的 bean 的可用实例(参见 处理外向依赖)。这也意味着无论何时我们希望与业务事件的订单完成进一步集成功能,都必须修改该类。
我们可以按如下方式改变应用模块的交互
-
Java
-
Kotlin
@Service
@RequiredArgsConstructor
public class OrderManagement {
private final ApplicationEventPublisher events;
private final OrderInternal dependency;
@Transactional
public void complete(Order order) {
// State transition on the order aggregate go here
events.publishEvent(new OrderCompleted(order.getId()));
}
}
@Service
class OrderManagement(val events: ApplicationEventPublisher, val dependency: OrderInternal) {
@Transactional
fun complete(order: Order) {
events.publishEvent(OrderCompleted(order.id))
}
}
注意,我们没有依赖其他应用模块的 Spring bean,而是使用 Spring 的 ApplicationEventPublisher
在主聚合上完成状态转换后发布一个领域事件。有关更面向聚合的事件发布方法,请参阅 Spring Data 的应用事件发布机制了解详情。由于事件发布默认是同步发生的,因此整体安排的事务语义与上述示例相同。这既有好处,因为我们获得了一个非常简单的最终一致性模型(订单的状态更改和库存更新要么都成功,要么都不成功),但也有坏处,因为更多触发的相关功能将扩大事务边界,并可能导致整个事务失败,即使导致错误的功能并不关键。
另一种方法是将事件消费转移到事务提交时的异步处理,并将次要功能正是如此对待
-
Java
-
Kotlin
@Component
class InventoryManagement {
@Async
@TransactionalEventListener
void on(OrderCompleted event) { /* … */ }
}
@Component
class InventoryManagement {
@Async
@TransactionalEventListener
fun on(event: OrderCompleted) { /* … */ }
}
这有效地将原始事务与监听器的执行解耦。虽然这避免了原始业务事务的扩展,但也带来了一个风险:如果监听器因任何原因失败,事件发布将丢失,除非每个监听器都实现了自己的安全网。更糟糕的是,这甚至不能完全奏效,因为系统可能在方法被调用之前就失败了。
应用模块监听器
要在事务中运行事务性事件监听器,它需要相应地用 @Transactional
进行注解。
-
Java
-
Kotlin
@Component
class InventoryManagement {
@Async
@Transactional(propagation = Propagation.REQUIRES_NEW)
@TransactionalEventListener
void on(OrderCompleted event) { /* … */ }
}
@Component
class InventoryManagement {
@Async
@Transactional(propagation = Propagation.REQUIRES_NEW)
@TransactionalEventListener
fun on(event: OrderCompleted) { /* … */ }
}
为了简化通过事件集成模块的默认方式的声明,Spring Modulith 提供了 @ApplicationModuleListener
作为快捷方式。
-
Java
-
Kotlin
@Component
class InventoryManagement {
@ApplicationModuleListener
void on(OrderCompleted event) { /* … */ }
}
@Component
class InventoryManagement {
@ApplicationModuleListener
fun on(event: OrderCompleted) { /* … */ }
}
事件发布注册表
Spring Modulith 附带了一个事件发布注册表,它与 Spring Framework 的核心事件发布机制挂钩。在事件发布时,它会发现哪些事务性事件监听器将接收到事件,并在原始业务事务中为每个监听器(深蓝色)写入一个条目到事件发布日志中。

每个事务性事件监听器都被包装在一个切面中,如果监听器执行成功,该切面将该日志条目标记为已完成。如果监听器失败,日志条目保持不变,以便可以根据应用的需要部署重试机制。可以通过 spring.modulith.events.republish-outstanding-events-on-restart
属性启用事件的自动重新发布。

Spring Boot 事件注册表 Starter
使用事务性事件发布日志需要向您的应用添加一组构件。为了简化这项任务,Spring Modulith 提供了围绕要使用的持久化技术而设计的 starter POM,并默认使用基于 Jackson 的 EventSerializer
实现。以下 starter 可用
持久化技术 | 构件 | 描述 |
---|---|---|
JPA |
|
使用 JPA 作为持久化技术。 |
JDBC |
|
使用 JDBC 作为持久化技术。也可用于基于 JPA 的应用,但绕过您的 JPA 提供程序进行实际的事件持久化。 |
MongoDB |
|
使用 MongoDB 作为持久化技术。也启用 MongoDB 事务,并需要服务器配置为副本集才能进行交互。通过将 |
Neo4j |
|
在 Spring Data Neo4j 后面使用 Neo4j。 |
管理事件发布
事件发布可能需要在应用运行时以多种方式进行管理。未完成的发布可能需要在一定时间后重新提交给相应的监听器。另一方面,已完成的发布很可能需要从数据库中清除或移动到归档存储中。由于这类清理工作的需求因应用而异,Spring Modulith 提供了一个 API 来处理这两种类型的发布。该 API 通过您可以添加到应用的 spring-modulith-events-api
构件提供
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.modulith</groupId>
<artifactId>spring-modulith-events-api</artifactId>
<version>1.3.5</version>
</dependency>
dependencies {
implementation 'org.springframework.modulith:spring-modulith-events-api:1.3.5'
}
此构件包含两个主要抽象,它们作为 Spring Bean 可供应用代码使用
-
CompletedEventPublications
— 该接口允许访问所有已完成的事件发布,并提供一个 API,用于立即将它们全部从数据库中清除,或清除早于指定时长(例如,1 分钟)的已完成发布。 -
IncompleteEventPublications
— 该接口允许访问所有未完成的事件发布,以便重新提交符合指定谓词或早于相对于原始发布日期的指定Duration
的发布。
事件发布完成
当事务性或 @ApplicationModuleListener
执行成功完成后,事件发布会被标记为已完成。默认情况下,完成通过在 EventPublication
上设置完成日期来注册。这意味着已完成的发布将保留在事件发布注册表中,以便可以通过上面描述的 CompletedEventPublications
接口进行检查。这样做的一个后果是,您需要编写一些代码来定期清除旧的、已完成的 EventPublication
。否则,它们的持久化抽象(例如关系数据库表)将无限增长,并且与存储进行交互创建和完成新的 EventPublication
可能会变慢。
Spring Modulith 1.3 引入了一个配置属性 spring.modulith.events.completion-mode
来支持另外两种完成模式。它默认为 UPDATE
,由上面描述的策略支持。或者,可以将完成模式设置为 DELETE
,这将更改注册表的持久化机制,改为在完成时删除 EventPublication
。这意味着 CompletedEventPublications
将不再返回任何发布,但同时,您也不必再手动从持久化存储中清除已完成的事件。
第三种选项是 ARCHIVE
模式,它将条目复制到归档表、集合或节点中。对于该归档条目,将设置完成日期并移除原始条目。与 DELETE
模式相反,已完成的事件发布仍然可以通过 CompletedEventPublications
抽象进行访问。
事件发布仓库
为了实际写入事件发布日志,Spring Modulith 暴露了一个 EventPublicationRepository
SPI 以及支持事务的流行持久化技术的实现,如 JPA、JDBC 和 MongoDB。通过将相应的 JAR 添加到您的 Spring Modulith 应用来选择要使用的持久化技术。我们已准备好专用的 starter 来简化该任务。
基于 JDBC 的实现可以在设置相应的配置属性 (spring.modulith.events.jdbc.schema-initialization.enabled
) 为 true
时,为事件发布日志创建一个专用表。详情请查阅附录中的 Schema 概览。
外部化事件
应用模块之间交换的一些事件可能对外部系统感兴趣。Spring Modulith 允许将选定的事件发布到各种消息代理。要使用该支持,您需要执行以下步骤
-
将特定于代理的 Spring Modulith 构件添加到您的项目中。
-
通过使用 Spring Modulith 或 jMolecules 的
@Externalized
注解来标记要外部化的事件类型。 -
在注解的值中指定特定于代理的路由目标。
要了解如何使用其他方式选择要外部化的事件,或自定义它们在代理内的路由,请查阅 事件外部化基础知识。
支持的基础设施
代理 | 构件 | 描述 |
---|---|---|
Kafka |
|
使用 Spring Kafka 与代理交互。逻辑路由键将用作 Kafka 的 Topic 和 Message Key。 |
AMQP |
|
使用 Spring AMQP 与任何兼容代理交互。例如,需要显式声明对 Spring Rabbit 的依赖。逻辑路由键将用作 AMQP 路由键。 |
JMS |
|
使用 Spring 的核心 JMS 支持。不支持路由键。 |
SQS |
|
已弃用(详情参见 此处)。使用 Spring Cloud AWS SQS 支持。逻辑路由键将用作 SQS 消息组 ID。当设置路由键时,要求 SQS 队列配置为 FIFO 队列。 |
SNS |
|
已弃用(详情参见 此处)。使用 Spring Cloud AWS SNS 支持。逻辑路由键将用作 SNS 消息组 ID。当设置路由键时,要求 SNS 配置为启用基于内容的去重功能的 FIFO Topic。 |
Spring Messaging |
|
使用 Spring 的核心 |
事件外部化基础知识
事件外部化对发布的每个应用事件执行三个步骤。
-
确定事件是否应该被外部化——我们称之为“事件选择”。默认情况下,只有位于 Spring Boot 自动配置包内并用支持的
@Externalized
注解之一标记的事件类型才会被选择进行外部化。 -
准备消息(可选)——默认情况下,事件由相应的代理基础设施按原样序列化。可选的映射步骤允许开发者自定义或甚至完全替换原始事件,使用适合外部方的 Payload。对于 Kafka 和 AMQP,开发者还可以向要发布的消息添加 Header。
-
确定路由目标——消息代理客户端需要一个逻辑目标来发布消息。目标通常标识物理基础设施(Topic、Exchange 或 Queue,取决于代理),并且通常静态地从事件类型派生。除非在
@Externalized
注解中明确定义,Spring Modulith 使用应用本地类型名称作为目标。换句话说,在一个基础包为com.acme.app
的 Spring Boot 应用中,事件类型com.acme.app.sample.SampleEvent
将发布到sample.SampleEvent
。一些代理还允许定义一个更具动态性的路由键,它用于实际目标内的不同目的。默认情况下,不使用路由键。
基于注解的事件外部化配置
要通过 @Externalized
注解定义自定义路由键,可以在各个注解中的 target/value 属性使用 $target::$key
模式。target 和 key 都可以是一个 SpEL 表达式,事件实例将被配置为根对象。
-
Java
-
Kotlin
@Externalized("customer-created::#{#this.getLastname()}") (2)
class CustomerCreated {
String getLastname() { (1)
// …
}
}
@Externalized("customer-created::#{#this.getLastname()}") (2)
class CustomerCreated {
fun getLastname(): String { (1)
// …
}
}
CustomerCreated
事件通过 accessor 方法暴露客户的姓氏。然后该方法通过目标声明的 ::
分隔符后的 key 表达式 #this.getLastname()
使用。
如果键的计算变得更复杂,建议将其委托给一个接受事件作为参数的 Spring bean。
-
Java
-
Kotlin
@Externalized("…::#{@beanName.someMethod(#this)}")
@Externalized("…::#{@beanName.someMethod(#this)}")
编程式事件外部化配置
spring-modulith-events-api
构件包含 EventExternalizationConfiguration
,允许开发者自定义上述所有步骤。
-
Java
-
Kotlin
@Configuration
class ExternalizationConfiguration {
@Bean
EventExternalizationConfiguration eventExternalizationConfiguration() {
return EventExternalizationConfiguration.externalizing() (1)
.select(EventExternalizationConfiguration.annotatedAsExternalized()) (2)
.mapping(SomeEvent.class, event -> …) (3)
.headers(event -> …) (4)
.routeKey(WithKeyProperty.class, WithKeyProperty::getKey) (5)
.build();
}
}
@Configuration
class ExternalizationConfiguration {
@Bean
fun eventExternalizationConfiguration(): EventExternalizationConfiguration {
EventExternalizationConfiguration.externalizing() (1)
.select(EventExternalizationConfiguration.annotatedAsExternalized()) (2)
.mapping(SomeEvent::class.java) { event -> … } (3)
.headers() { event -> … } (4)
.routeKey(WithKeyProperty::class.java, WithKeyProperty::getKey) (5)
.build()
}
}
1 | 我们首先创建一个默认的 EventExternalizationConfiguration 实例。 |
2 | 我们通过调用上一步返回的 Selector 实例上的一个或多个 select(…) 方法来定制事件选择。这一步基本上禁用了应用基础包过滤器,因为我们现在只查找注解。存在按类型、按包、按包和注解轻松选择事件的便捷方法。此外,还有一步定义选择和路由的快捷方式。 |
3 | 我们为 SomeEvent 实例定义一个映射步骤。请注意,路由仍将由原始事件实例确定,除非您额外调用 router 上的 ….routeMapped() 方法。 |
4 | 我们为要发送的消息添加自定义 Header,可以像所示的那样通用添加,也可以针对特定 Payload 类型进行添加。 |
5 | 我们最终通过定义一个方法句柄来确定路由键,以提取事件实例的一个值。另外,可以通过使用上一步调用返回的 Router 实例上的通用 route(…) 方法为单个事件生成完整的 RoutingKey 。 |
测试已发布的事件
以下部分描述了一种仅关注跟踪 Spring 应用事件的测试方法。对于使用 @ApplicationModuleListener 的模块测试的更全面的方法,请查看 Scenario API 。 |
Spring Modulith 的 @ApplicationModuleTest
使能够将 PublishedEvents
实例注入到测试方法中,以验证在被测试的业务操作过程中是否发布了特定的一组事件。
-
Java
-
Kotlin
@ApplicationModuleTest
class OrderIntegrationTests {
@Test
void someTestMethod(PublishedEvents events) {
// …
var matchingMapped = events.ofType(OrderCompleted.class)
.matching(OrderCompleted::getOrderId, reference.getId());
assertThat(matchingMapped).hasSize(1);
}
}
@ApplicationModuleTest
class OrderIntegrationTests {
@Test
fun someTestMethod(events: PublishedEvents events) {
// …
val matchingMapped = events.ofType(OrderCompleted::class.java)
.matching(OrderCompleted::getOrderId, reference.getId())
assertThat(matchingMapped).hasSize(1)
}
}
注意 PublishedEvents
如何公开一个 API 来选择匹配特定标准的事件。验证是通过 AssertJ 断言来完成的,该断言验证预期的元素数量。如果您本来就使用 AssertJ 进行这些断言,您也可以使用 AssertablePublishedEvents
作为测试方法参数类型,并使用通过它提供的流式断言 API。
-
Java
-
Kotlin
@ApplicationModuleTest
class OrderIntegrationTests {
@Test
void someTestMethod(AssertablePublishedEvents events) {
// …
assertThat(events)
.contains(OrderCompleted.class)
.matching(OrderCompleted::getOrderId, reference.getId());
}
}
@ApplicationModuleTest
class OrderIntegrationTests {
@Test
fun someTestMethod(events: AssertablePublishedEvents) {
// …
assertThat(events)
.contains(OrderCompleted::class.java)
.matching(OrderCompleted::getOrderId, reference.getId())
}
}
注意 assertThat(…)
表达式返回的类型如何允许直接对已发布的事件定义约束。