批处理和事务

没有重试的简单批处理

考虑以下没有重试的嵌套批处理的简单示例。它展示了批处理的常见场景:处理输入源直到耗尽,并在处理“块”结束时定期提交。

1   |  REPEAT(until=exhausted) {
|
2   |    TX {
3   |      REPEAT(size=5) {
3.1 |        input;
3.2 |        output;
|      }
|    }
|
|  }

输入操作(3.1)可以是基于消息的接收(例如来自 JMS)或基于文件的读取,但为了恢复并继续处理以完成整个作业,它必须是事务性的。3.2 的操作也是如此。它必须是事务性的或幂等的。

如果 REPEAT(3)处的块由于 3.2 处的数据库异常而失败,则 TX(2)必须回滚整个块。

简单无状态重试

对于非事务性操作,例如调用 Web 服务或其他远程资源,使用重试也很有用,如下例所示

0   |  TX {
1   |    input;
1.1 |    output;
2   |    RETRY {
2.1 |      remote access;
|    }
|  }

这实际上是重试最实用的应用之一,因为远程调用比数据库更新更容易失败并可重试。只要远程访问(2.1)最终成功,事务 TX(0)就会提交。如果远程访问(2.1)最终失败,则保证事务 TX(0)回滚。

典型的重复重试模式

最典型的批处理模式是在块的内部块中添加重试,如下例所示

1   |  REPEAT(until=exhausted, exception=not critical) {
|
2   |    TX {
3   |      REPEAT(size=5) {
|
4   |        RETRY(stateful, exception=deadlock loser) {
4.1 |          input;
5   |        } PROCESS {
5.1 |          output;
6   |        } SKIP and RECOVER {
|          notify;
|        }
|
|      }
|    }
|
|  }

内部 RETRY(4)块标记为“有状态”。有关有状态重试的描述,请参见 典型用例。这意味着,如果重试 PROCESS(5)块失败,则 RETRY(4)的行为如下

  1. 抛出异常,在块级别回滚事务 TX(2),并允许将该项重新提交到输入队列。

  2. 当该项再次出现时,它可能会被重试,具体取决于现有的重试策略,并再次执行 PROCESS(5)。第二次及后续尝试可能会再次失败并重新抛出异常。

  3. 最终,该项将最后一次出现。重试策略不允许再次尝试,因此 PROCESS(5)永远不会执行。在这种情况下,我们将遵循 RECOVER(6)路径,有效地“跳过”正在接收和处理的项。

请注意,计划中用于 RETRY(4)的符号明确显示输入步骤(4.1)是重试的一部分。它还明确表明,存在两种处理备用路径:正常情况,如 PROCESS(5)所示,以及恢复路径,如 RECOVER(6)的单独块中所示。这两个备用路径完全不同。在正常情况下,只执行其中一个。

在特殊情况下(例如特殊的 TranscationValidException 类型),重试策略可能能够确定在 PROCESS(5)刚刚失败后,可以在最后一次尝试中采用 RECOVER(6)路径,而不是等待该项重新提交。这不是默认行为,因为它需要详细了解 PROCESS(5)块内部发生的情况,而这些信息通常不可用。例如,如果输出在失败之前包含写入访问,则应重新抛出异常以确保事务完整性。

外部 REPEAT (1) 中的完成策略对于计划的成功至关重要。如果输出 (5.1) 失败,它可能会抛出异常(通常会,如描述的那样),在这种情况下,事务 TX (2) 失败,并且异常可能会向上传播到外部批处理 REPEAT (1)。我们不希望整个批处理停止,因为如果我们再次尝试,RETRY (4) 仍然可能成功,所以我们在外部 REPEAT (1) 中添加 exception=not critical

但是,请注意,如果 TX (2) 失败,并且我们确实尝试再次执行,由于外部完成策略,内部 REPEAT (3) 中下一个处理的项目不能保证是刚刚失败的项目。它可能是,但这取决于输入 (4.1) 的实现。因此,输出 (5.1) 可能会在新的项目或旧项目上再次失败。批处理的客户端不应该假设每次 RETRY (4) 尝试都将处理与上次失败的尝试相同的项目。例如,如果 REPEAT (1) 的终止策略是在 10 次尝试后失败,它会在 10 次连续尝试后失败,但不一定是在同一个项目上。这与整体重试策略一致。内部 RETRY (4) 了解每个项目的历史记录,并且可以决定是否对其进行另一次尝试。

异步块处理

通过将外部批处理配置为使用 AsyncTaskExecutor,可以并发执行 典型示例 中的内部批次或块。外部批处理在所有块完成之前等待完成。以下示例显示了异步块处理

1   |  REPEAT(until=exhausted, concurrent, exception=not critical) {
|
2   |    TX {
3   |      REPEAT(size=5) {
|
4   |        RETRY(stateful, exception=deadlock loser) {
4.1 |          input;
5   |        } PROCESS {
|          output;
6   |        } RECOVER {
|          recover;
|        }
|
|      }
|    }
|
|  }

异步项目处理

原则上,典型示例 中块中的单个项目也可以并发处理。在这种情况下,事务边界必须移动到单个项目的级别,以便每个事务都在单个线程上,如下面的示例所示

1   |  REPEAT(until=exhausted, exception=not critical) {
|
2   |    REPEAT(size=5, concurrent) {
|
3   |      TX {
4   |        RETRY(stateful, exception=deadlock loser) {
4.1 |          input;
5   |        } PROCESS {
|          output;
6   |        } RECOVER {
|          recover;
|        }
|      }
|
|    }
|
|  }

此方案牺牲了简单方案中将所有事务资源打包在一起的优化优势。它只有在处理成本 (5) 远高于事务管理成本 (3) 时才有用。

批处理和事务传播之间的交互

批处理重试和事务管理之间存在比我们理想情况下更紧密的耦合。特别是,如果事务管理器不支持 NESTED 传播,则无法使用无状态重试来重试数据库操作。

以下示例使用无重复重试

1   |  TX {
|
1.1 |    input;
2.2 |    database access;
2   |    RETRY {
3   |      TX {
3.1 |        database access;
|      }
|    }
|
|  }

同样,出于相同的原因,即使 RETRY (2) 最终成功,内部事务 TX (3) 也会导致外部事务 TX (1) 失败。

不幸的是,正如以下示例所示,相同的效果会从重试块向上渗透到周围的重复批处理(如果有)。

1   |  TX {
|
2   |    REPEAT(size=5) {
2.1 |      input;
2.2 |      database access;
3   |      RETRY {
4   |        TX {
4.1 |          database access;
|        }
|      }
|    }
|
|  }

现在,如果 TX (3) 回滚,它可能会污染 TX (1) 的整个批处理,并迫使其在结束时回滚。

非默认传播怎么样?

  • 在前面的示例中,TX (3) 上的 PROPAGATION_REQUIRES_NEW 可以防止外部 TX (1) 被污染,前提是两个事务最终都成功。但如果 TX (3) 提交而 TX (1) 回滚,则 TX (3) 保持提交状态,因此我们违反了 TX (1) 的事务契约。如果 TX (3) 回滚,TX (1) 不一定会回滚(但在实践中它可能会回滚,因为重试会抛出回滚异常)。

  • TX (3) 上的 PROPAGATION_NESTED 按照我们在重试情况下(以及对于包含跳过的批处理)的要求工作:TX (3) 可以提交,但随后可以被外部事务 TX (1) 回滚。如果 TX (3) 回滚,TX (1) 在实践中会回滚。此选项仅在某些平台上可用,不包括 Hibernate 或 JTA,但它是唯一始终有效的选项。

因此,如果重试块包含任何数据库访问,则 NESTED 模式是最佳选择。

特殊情况:具有正交资源的事务

对于没有嵌套数据库事务的简单情况,默认传播始终可以。考虑以下示例,其中 SESSIONTX 不是全局 XA 资源,因此它们的资源是正交的

0   |  SESSION {
1   |    input;
2   |    RETRY {
3   |      TX {
3.1 |        database access;
|      }
|    }
|  }

这里有一个事务性消息,SESSION (0),但它不参与与PlatformTransactionManager的其他事务,因此当TX (3) 开始时不会传播。在RETRY (2) 块之外没有数据库访问。如果TX (3) 失败,然后最终在重试时成功,SESSION (0) 可以提交(独立于TX 块)。这类似于传统的“尽力而为的一阶段提交”场景。最糟糕的情况是在RETRY (2) 成功并且SESSION (0) 无法提交(例如,因为消息系统不可用)时出现重复消息。

无状态重试无法恢复

在前面显示的典型示例中,无状态重试和有状态重试之间的区别很重要。实际上,最终是事务约束迫使了这种区别,而这种约束也使区别的原因显而易见。

我们从观察开始,除非将项目处理包装在事务中,否则无法跳过失败的项目并成功提交块的其余部分。因此,我们将典型的批处理执行计划简化为如下所示

0   |  REPEAT(until=exhausted) {
|
1   |    TX {
2   |      REPEAT(size=5) {
|
3   |        RETRY(stateless) {
4   |          TX {
4.1 |            input;
4.2 |            database access;
|          }
5   |        } RECOVER {
5.1 |          skip;
|        }
|
|      }
|    }
|
|  }

前面的示例显示了一个无状态的RETRY (3),它有一个RECOVER (5) 路径,在最终尝试失败后启动。无状态标签表示该块重复执行,直到达到某个限制,而不会重新抛出任何异常。这只有在事务TX (4) 具有嵌套传播的情况下才有效。

如果内部TX (4) 具有默认传播属性并回滚,它会污染外部TX (1)。事务管理器假定内部事务已损坏事务资源,因此无法再次使用它。

对嵌套传播的支持非常少,因此我们选择在当前版本的 Spring Batch 中不支持使用无状态重试进行恢复。可以通过使用前面显示的典型模式来实现相同的效果(以重复更多处理为代价)。