批处理和事务
无需重试的简单批处理
考虑以下无需重试的嵌套批处理的简单示例。它展示了批处理的常见场景:处理输入源直到耗尽,并在处理“块”结束时定期提交。
1 | REPEAT(until=exhausted) { | 2 | TX { 3 | REPEAT(size=5) { 3.1 | input; 3.2 | output; | } | } | | }
输入操作 (3.1) 可以是基于消息的接收(例如来自 JMS)或基于文件的读取,但为了恢复并继续处理以有机会完成整个作业,它必须是事务性的。3.2 的操作也同样适用。它必须是事务性的或幂等的。
如果由于 3.2 处的数据库异常导致 `REPEAT` (3) 处的块失败,则 `TX` (2) 必须回滚整个块。
简单的无状态重试
对于非事务性操作(例如对 Web 服务或其他远程资源的调用)使用重试也很有用,如下例所示
0 | TX { 1 | input; 1.1 | output; 2 | RETRY { 2.1 | remote access; | } | }
这实际上是重试最常用的应用之一,因为远程调用比数据库更新更容易失败且更易于重试。只要远程访问 (2.1) 最终成功,事务 `TX` (0) 就会提交。如果远程访问 (2.1) 最终失败,则保证事务 `TX` (0) 回滚。
典型的重复重试模式
最典型的批处理模式是在块的内部块中添加重试,如下例所示
1 | REPEAT(until=exhausted, exception=not critical) { | 2 | TX { 3 | REPEAT(size=5) { | 4 | RETRY(stateful, exception=deadlock loser) { 4.1 | input; 5 | } PROCESS { 5.1 | output; 6 | } SKIP and RECOVER { | notify; | } | | } | } | | }
内部 `RETRY` (4) 块标记为“有状态”。参见 典型用例 以了解有状态重试的描述。这意味着,如果重试 `PROCESS` (5) 块失败,则 `RETRY` (4) 的行为如下:
-
抛出异常,回滚块级别的 `TX` (2) 事务,并允许将项目重新提交到输入队列。
-
当项目重新出现时,它可能会根据现有的重试策略进行重试,并再次执行 `PROCESS` (5)。第二次及后续尝试也可能再次失败并重新抛出异常。
-
最终,项目最后一次重新出现。重试策略不允许再次尝试,因此永远不会执行 `PROCESS` (5)。在这种情况下,我们将遵循 `RECOVER` (6) 路径,有效地“跳过”正在接收和处理的项目。
请注意,计划中用于 `RETRY` (4) 的符号明确表明输入步骤 (4.1) 是重试的一部分。它还明确指出存在两种处理备选路径:正常情况,如 `PROCESS` (5) 所示;以及恢复路径,如 `RECOVER` (6) 的单独块中所示。这两个备选路径是完全不同的。在正常情况下,只执行其中一个。
在特殊情况下(例如特殊的 `TranscationValidException` 类型),重试策略可能能够确定在 `PROCESS` (5) 刚刚失败后的最后一次尝试中可以采用 `RECOVER` (6) 路径,而无需等待项目重新提交。这不是默认行为,因为它需要详细了解 `PROCESS` (5) 块内部发生的情况,而这通常是不可用的。例如,如果输出在失败前包含写访问,则应重新抛出异常以确保事务完整性。
外部 `REPEAT` (1) 中的完成策略对于计划的成功至关重要。如果输出 (5.1) 失败,它可能会抛出异常(通常会抛出,如所述),在这种情况下,事务 `TX` (2) 失败,并且异常可能会传播到外部批处理 `REPEAT` (1)。我们不希望整个批处理停止,因为如果我们再次尝试,`RETRY` (4) 仍然可能成功,因此我们在外部 `REPEAT` (1) 中添加 `exception=not critical`。
但是,需要注意的是,如果TX
(2) 失败并且我们确实尝试再次执行,那么由于外部完成策略,在内部REPEAT
(3) 中接下来处理的项并不保证是刚刚失败的那一项。它可能是,但这取决于输入的实现 (4.1)。因此,输出 (5.1) 可能会在一个新的项或旧的项上再次失败。批处理的客户端不应假设每次RETRY
(4) 尝试都会处理与上次失败相同的项。例如,如果REPEAT
(1) 的终止策略是在10次尝试后失败,则它会在10次连续尝试后失败,但不一定是在同一项上。这与整体重试策略一致。内部RETRY
(4) 了解每个项目的历史记录,并可以决定是否再次尝试。
异步块处理
在典型示例中,可以通过将外部批处理配置为使用AsyncTaskExecutor
来并发执行内部批处理或块。外部批处理等待所有块完成之后再完成。以下示例显示了异步块处理
1 | REPEAT(until=exhausted, concurrent, exception=not critical) { | 2 | TX { 3 | REPEAT(size=5) { | 4 | RETRY(stateful, exception=deadlock loser) { 4.1 | input; 5 | } PROCESS { | output; 6 | } RECOVER { | recover; | } | | } | } | | }
异步项处理
原则上,典型示例中块中的各个项也可以并发处理。在这种情况下,事务边界必须移动到单个项级别,以便每个事务都在单个线程上,如下例所示
1 | REPEAT(until=exhausted, exception=not critical) { | 2 | REPEAT(size=5, concurrent) { | 3 | TX { 4 | RETRY(stateful, exception=deadlock loser) { 4.1 | input; 5 | } PROCESS { | output; 6 | } RECOVER { | recover; | } | } | | } | | }
此方案牺牲了简单方案所具有的优化优势,即所有事务资源都被一起分块。只有当处理 (5) 的成本远高于事务管理 (3) 的成本时,它才有用。
批处理和事务传播之间的交互
批处理重试和事务管理之间存在比理想情况更紧密的耦合。特别是,如果事务管理器不支持 NESTED 传播,则无法使用无状态重试来重试数据库操作。
以下示例使用重试而不重复
1 | TX { | 1.1 | input; 2.2 | database access; 2 | RETRY { 3 | TX { 3.1 | database access; | } | } | | }
同样,由于同样的原因,即使RETRY
(2) 最终成功,内部事务TX
(3) 也会导致外部事务TX
(1) 失败。
不幸的是,如果存在,同样的效果会从重试块向上渗透到周围的重复批处理,如下例所示
1 | TX { | 2 | REPEAT(size=5) { 2.1 | input; 2.2 | database access; 3 | RETRY { 4 | TX { 4.1 | database access; | } | } | } | | }
现在,如果 TX (3) 回滚,它可能会污染 TX (1) 的整个批处理,并强制它在结束时回滚。
非默认传播呢?
-
在前面的示例中,
TX
(3) 处的PROPAGATION_REQUIRES_NEW
可以防止如果两个事务最终都成功,则外部TX
(1) 被污染。但是,如果TX
(3) 提交而TX
(1) 回滚,则TX
(3) 保持提交状态,因此我们违反了TX
(1) 的事务契约。如果TX
(3) 回滚,TX
(1) 不一定会回滚(但在实践中它可能会回滚,因为重试抛出了回滚异常)。 -
TX
(3) 处的PROPAGATION_NESTED
在重试情况下(以及带有跳过的批处理)按我们的要求工作:TX
(3) 可以提交,但随后可以被外部事务TX
(1) 回滚。如果TX
(3) 回滚,则TX
(1) 在实践中也会回滚。此选项仅在某些平台上可用,不包括 Hibernate 或 JTA,但它是唯一始终有效的方法。
因此,如果重试块包含任何数据库访问,则NESTED
模式是最佳选择。
特殊情况:具有正交资源的事务
对于没有嵌套数据库事务的简单情况,默认传播始终是可以的。考虑以下示例,其中SESSION
和TX
不是全局XA
资源,因此它们的资源是正交的
0 | SESSION { 1 | input; 2 | RETRY { 3 | TX { 3.1 | database access; | } | } | }
这里有一个事务性消息SESSION
(0),但它不参与使用PlatformTransactionManager
的其他事务,因此当TX
(3) 启动时不会传播。RETRY
(2) 块之外没有数据库访问。如果TX
(3) 失败,然后最终在重试时成功,则SESSION
(0) 可以提交(独立于TX
块)。这类似于普通的“尽力而为的一次性提交”方案。最糟糕的情况是在RETRY
(2) 成功且SESSION
(0) 无法提交(例如,因为消息系统不可用)时出现重复消息。
无状态重试无法恢复
在前面显示的典型示例中,无状态重试和有状态重试之间的区别很重要。实际上,最终是一个事务约束强制了这种区别,而且这个约束也使区别的原因显而易见。
我们首先观察到,除非我们将项目处理包装在事务中,否则无法跳过失败的项目并成功提交块的其余部分。因此,我们将典型的批处理执行计划简化为如下所示
0 | REPEAT(until=exhausted) { | 1 | TX { 2 | REPEAT(size=5) { | 3 | RETRY(stateless) { 4 | TX { 4.1 | input; 4.2 | database access; | } 5 | } RECOVER { 5.1 | skip; | } | | } | } | | }
前面的示例显示了一个无状态RETRY
(3) 和一个在最终尝试失败后启动的RECOVER
(5) 路径。stateless
标签表示该块重复执行,直到达到某个限制而不向上抛出任何异常。这只在事务TX
(4) 具有嵌套传播时才有效。
如果内部TX
(4) 具有默认传播属性并回滚,它会污染外部TX
(1)。事务管理器假定内部事务已损坏事务资源,因此无法再次使用。
嵌套传播的支持非常少见,因此我们选择不在当前版本的 Spring Batch 中支持使用无状态重试进行恢复。通过使用前面显示的典型模式,始终可以实现相同的效果(以重复更多处理为代价)。