分布式锁
在许多情况下,对某个上下文(甚至是单个消息)执行的操作必须以独占方式进行。一个例子是聚合器组件,我们必须检查当前消息的消息组状态,以确定是释放组还是仅将该消息添加到将来考虑。为此,Java 提供了一个带有 java.util.concurrent.locks.Lock 实现的 API。然而,当应用程序是分布式的和/或在集群中运行时,问题变得更加复杂。在这种情况下,锁定具有挑战性,需要一些共享状态及其特定的方法来实现排他性要求。
Spring Integration 提供了一个 LockRegistry 抽象,其内存中的 DefaultLockRegistry 实现基于 ReentrantLock API。LockRegistry 的 obtain(Object) 方法需要一个用于特定上下文的 lock key。例如,聚合器使用 correlationKey 来锁定其组周围的操作。这样,不同的锁可以并发使用。此 obtain(Object) 方法返回一个 java.util.concurrent.locks.Lock 实例(取决于 LockRegistry 实现),因此,其余逻辑与标准 Java 并发算法相同。
从版本 6.2 开始,LockRegistry 提供了 executeLocked() API(此接口中的 default 方法)以在锁定状态下执行某些任务。此 API 的行为类似于众所周知的 JdbcTemplate、JmsTemplate 或 RestTemplate。以下示例演示了此 API 的用法
LockRegistry registry = new DefaultLockRegistry();
...
registry.executeLocked("someLockKey", () -> someExclusiveResourceCall());
该方法重新抛出任务调用中的异常,如果 Lock 被中断,则抛出 InterruptedException。此外,带有 Duration 的变体在 lock.tryLock() 返回 false 时抛出 java.util.concurrent.TimeoutException。
Spring Integration 为分布式锁提供了以下 LockRegistry 实现
Spring Cloud AWS 还提供了 DynamoDbLockRegistry。
从版本 7.0 开始,引入了 DistributedLock 接口,提供了新方法 lock(Duration ttl) 和 tryLock(long time, TimeUnit unit, Duration ttl),用于获取具有自定义生存时间 (TTL) 的锁。JdbcLock 和 RedisLock 都实现了 DistributedLock 接口以支持自定义生存时间功能。LockRegistry<L extends Lock> 现在是扩展 Lock 的类型的通用接口。RenewableLockRegistry 接口现在提供了新的 renewLock(Object lockKey, Duration ttl) 方法,允许您使用自定义生存时间值续订锁。JdbcLockRegistry 和 RedisLockRegistry 都使用类型参数 DistributedLock 实现了 LockRegistry 和 RenewableLockRegistry 接口。
以下是如何从注册表中获取 DistributedLock 并以特定生存时间值获取它的示例
DistributedLock lock = registry.obtain("foo");
Duration timeToLive = Duration.ofMillis(500);
if(lock.tryLock(100, TimeUnit.MILLISECONDS, timeToLive)){
try {
// do something
} catch (Exception e) {
// handle exception
} finally{
lock. unlock();
}
}