源码分析:Redisson 分布式锁过程分析
封面图来自:Redisson性能压测权威发布 http://www.redis.cn/articles/20170704108.html
一 摘要
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。
通常使用最为广泛的就是它提供的基于Redis的分布式锁功能。本篇也集中对Redisson的分布式锁实现进行分析。
使用的Redisson版本为3.12.2,maven引入依赖信息:
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.12.2</version>
</dependency>
二 锁过程源码
如下代码所示,是我们适用Redisson获取和释放分布式锁的一个demo:
RedissonClient redisson = Redisson.create();
RLock lock = redisson.getLock("anyLock");
lock.lock();
// 其他代码....
lock.unlock();
其中,Redisson.create();是默认的创建方法,内容为:
public static RedissonClient create() {
Config config = new Config();
((SingleServerConfig)config.useSingleServer().setTimeout(1000000)).setAddress("redis://127.0.0.1:6379");
return create(config);
}
可见,这里使用了本地的redis集群,和默认的6379端口。
这里重点分析加锁过程,也就是lock.lock(); 方法部分,来看Redisson是怎样实现加锁,以及可能得锁续期等watchdog的动作,下面是RedissonLock类中的lock()方法:
public void lock() {
try {
this.lock(-1L, (TimeUnit)null, false);
} catch (InterruptedException var2) {
throw new IllegalStateException();
}
}
这里继续向下调用了一个含参数的lock()方法,设置了释放时间(默认设置了-1),TimeUnit(null),是否可中断(false),我们继续看这个方法:
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = this.tryAcquire(leaseTime, unit, threadId);
if (ttl != null) {
RFuture<RedissonLockEntry> future = this.subscribe(threadId);
if (interruptibly) {
this.commandExecutor.syncSubscriptionInterrupted(future);
} else {
this.commandExecutor.syncSubscription(future);
}
try {
while(true) {
ttl = this.tryAcquire(leaseTime, unit, threadId);
if (ttl == null) {
return;
}
if (ttl >= 0L) {
try {
((RedissonLockEntry)future.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException var13) {
if (interruptibly) {
throw var13;
}
((RedissonLockEntry)future.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else if (interruptibly) {
((RedissonLockEntry)future.getNow()).getLatch().acquire();
} else {
((RedissonLockEntry)future.getNow()).getLatch().acquireUninterruptibly();
}
}
} finally {
this.unsubscribe(future, threadId);
}
}
}
这一部分代码较长,我们按照步骤整理一下:
1、获取当前线程的线程id;
2、tryAquire尝试获取锁,并返回ttl
3、如果ttl为空,则结束流程;否则进入后续逻辑;
4、this.subscribe(threadId)订阅当前线程,返回一个RFuture;
5、下一步涉及是否可中断标记的判断,如果可中断,调用
this.commandExecutor.syncSubscriptionInterrupted(future);
否则,调用:
this.commandExecutor.syncSubscription(future);
6、通过while(true)循环,一直尝试获取锁:ttl = this.tryAcquire(leaseTime, unit, threadId);
中止条件:1)ttl == null;2)如果ttl>=0,((RedissonLockEntry)future.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
这个过程中,会判断iterruptibly,为true时会处理中断
7、fially代码块,会解除订阅
this.unsubscribe(future, threadId);
三 详细分析
redisson watchdog 使用和原理这篇文章整理了一张加锁流程图,我们引用如下:
下面详细分析Redisson获取锁、锁等待、释放锁的详细实现过程。
3.1 获取锁
3.1.1 核心获取锁的方法-tryLockInnerAsync
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
this.internalLockLeaseTime = unit.toMillis(leaseTime);
return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
}
第二行的代码很长,我们对文本做一些换行处理:
return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command,
(redis.call('exists', KEYS[1]) == 0) then
KEYS[1], ARGV[2], 1);
KEYS[1], ARGV[1]);
return nil;
end;
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
KEYS[1], ARGV[2], 1);
KEYS[1], ARGV[1]);
return nil;
end;
return redis.call('pttl', KEYS[1]);",
Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
这样就比较容易看出,重点是一系列的redis命令。分析如下:
RedissonLock类tryLockInnerAsync通过eval命令执行Lua代码完成加锁操作。KEYS[1]为锁在redis中的key,key对应value为map结构,ARGV[1]为锁超时时间,ARGV[2]为锁value中的key。ARGV[2]由UUID+threadId组成,用来标记锁被谁持有。
1)第一个If判断key是否存在,不存在则完成加锁操作
redis.call('hset', KEYS[1], ARGV[2], 1);创建key[1] map中添加key:ARGV[2] ,value:1;
redis.call('pexpire', KEYS[1], ARGV[1]);设置key[1]过期时间,避免发生死锁。eval命令执行Lua代码的时候,Lua代码将被当成一个命令去执行,并且直到eval命令执行完成,Redis才会执行其他命令。可避免第一条命令执行成功第二条命令执行失败导致死锁。
2)第二个if判断key存在且当前线程已经持有锁, 重入:
redis.call('hexists', KEYS[1], ARGV[2]);判断redis中锁的标记值是否与当前请求的标记值相同,相同代表该线程已经获取锁;
redis.call('hincrby', KEYS[1], ARGV[2], 1);记录同一线程持有锁之后累计加锁次数,实现锁重入;
redis.call('pexpire', KEYS[1], ARGV[1]); 重置锁超时时间。
(3)key存在被其他线程获取的锁, 等待:
redis.call('pttl', KEYS[1]); 加锁失败返回锁过期时间。
其中pexpire语句的重置锁超时时间,实际上就是Redisson的watch dog机制。
3.1.2 commandExecutor.evalWriteAsync
继续向下,commandExecutor.evalWriteAsync:
public <T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
NodeSource source = this.getNodeSource(key);
return this.evalAsync(source, false, codec, evalCommandType, script, keys, params);
}
3.1.3 syncSubscriptionInterrupted
public void syncSubscriptionInterrupted(RFuture<?> future) throws InterruptedException {
MasterSlaveServersConfig config = this.connectionManager.getConfig();
int timeout = config.getTimeout() + config.getRetryInterval() * config.getRetryAttempts();
if (!future.await((long)timeout)) {
((RPromise)future).tryFailure(new RedisTimeoutException("Subscribe timeout: (" + timeout + "ms). Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."));
}
future.sync();
}
关于超时时间的计算,使用的是config中的Timeout时间+重试周期x重试次数;当RFuture等待超时时,就会使用tryFailure抛出RedisTimeoutException的异常信息,提示订阅失败。
3.2 锁等待 lockInterruptibly
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
this.lock(leaseTime, unit, true);
}
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = this.tryAcquire(leaseTime, unit, threadId);
if (ttl != null) {
RFuture<RedissonLockEntry> future = this.subscribe(threadId);
if (interruptibly) {
this.commandExecutor.syncSubscriptionInterrupted(future);
} else {
this.commandExecutor.syncSubscription(future);
}
try {
while(true) {
ttl = this.tryAcquire(leaseTime, unit, threadId);
if (ttl == null) {
return;
}
if (ttl >= 0L) {
try {
((RedissonLockEntry)future.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException var13) {
if (interruptibly) {
throw var13;
}
((RedissonLockEntry)future.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else if (interruptibly) {
((RedissonLockEntry)future.getNow()).getLatch().acquire();
} else {
((RedissonLockEntry)future.getNow()).getLatch().acquireUninterruptibly();
}
}
} finally {
this.unsubscribe(future, threadId);
}
}
}
这段代码也很长,简单总结如下:
(1)步骤一:调用加锁操作;
(2)步骤二:步骤一中加锁操作失败,订阅消息,利用redis的pubsub提供一个通知机制来减少不断的重试,避免发生活锁。
注:
活锁:是指线程1可以使用资源,但它很礼貌,让其他线程先使用资源,线程2也可以使用资源,但它很绅士,也让其他线程先使用资源。这样你让我,我让你,最后两个线程都无法使用资源。
(3)步骤三:
getLath()获取RedissionLockEntry实例latch变量,由于permits为0,所以调用acquire()方法后线程阻塞。
3.3 释放锁 - unlockInnerAsync
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;", Arrays.asList(this.getName(), this.getChannelName()), new Object[]{LockPubSub.UNLOCK_MESSAGE, this.internalLockLeaseTime, this.getLockName(threadId)});
}
与获取锁代码类似,还是一个比较长的redis命令,我们把redis命令格式化整理后如下:
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
return nil;
end;
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if (counter > 0) then
redis.call('pexpire', KEYS[1], ARGV[2]); return 0;
else
redis.call('del', KEYS[1]);
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end;
return nil;
(1)第一个if判断锁对应key存在,value中是否存在当前要释放锁的标示,不存在返回nil,确保锁只能被持有的线程释放;
(2)对应key存在,value中存在当前要释放锁的标示,将锁标示对应值-1,第二个if判断锁标示对应的值是否大于0,大于0,表示有锁重入情况发生,重新设置锁过期时间;
(3)对应key存在,value中存在当前要释放锁的标示,将锁标示对应值-1后等于0,调用del操作释放锁,并publish消息,将获取锁被阻塞的线程恢复重新获取锁;
这里的代码中,涉及了一个重要的类:LockPubSub,下面是释放时执行的release()方法:
订阅者接收到publish消息后,执行release操作,调用acquire被阻塞的线程将继续执行获取锁操作。
3.4 其他-CommandSyncService
在命令执行时,我们可以看到Redisson是通过this.commandExecutor执行的,而这个是在 Redisson的构造方法中做的初始化:
protected Redisson(Config config) {
this.config = config;
Config configCopy = new Config(config);
this.connectionManager = ConfigSupport.createConnectionManager(configCopy);
this.evictionScheduler = new EvictionScheduler(this.connectionManager.getCommandExecutor());
this.writeBehindService = new WriteBehindService(this.connectionManager.getCommandExecutor());
}
四 Redisson与Jedis分布式锁实现对比
在某业务中,使用的是基于Jedis封装得分布式锁操作工具,虽然并非是Jedis提供的标准实现,但从中可以了解一下分布式锁的不同实现:
4.1 获取锁
下面是某业务封装的jedis获取分布式锁和释放的工具:
public static boolean tryGetDistributedLock(JedisPool jedisPool, String lockKey, String requestId, int expireTime) throws RedisToolException {
Jedis jedis = null;
boolean var6;
try {
jedis = jedisPool.getResource();
String result = jedis.set(lockKey, requestId, "NX", "PX", expireTime);
var6 = "OK".equals(result);
} catch (Throwable var10) {
throw new RedisToolException(var10.getMessage());
} finally {
if (jedis != null) {
jedis.close();
}
}
return var6;
}
继续向下,查看jedis.set方法:
public String set(String key, String value, String nxxx, String expx, int time) {
this.checkIsInMultiOrPipeline();
this.client.set(key, value, nxxx, expx, time);
return this.client.getStatusCodeReply();
}
可见,同样是为了
保证设置锁key 和 设置超时时间两个动作的原子性,Redisson是使用lua脚本,而Jedis是通过Redis提供的set命令。
早期必须lua脚本来实现,是因为redis旧版本没有提供这个新的set命令,不支持一个命令中同时设置key和超时时间。
4.2 释放锁
public static boolean releaseDistributedLock(JedisPool jedisPool, String lockKey, String requestId) {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Jedis jedis = null;
boolean var6;
try {
jedis = jedisPool.getResource();
Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
var6 = RELEASE_SUCCESS.equals(result);
} finally {
if (jedis != null) {
jedis.close();
}
}
return var6;
}
Jedis在释放锁的时候,还是通过脚本来实现的判断和删除key,保证操作的原子性。
4.3 锁续期支持
4.3.1 Redisson的watch dog
Redisson提供了订阅和watch dog机制,当业务线程还在执行的锁超时时,如果开启了watch dog,那么可以实现自动续期。但事实上,这个机制比较耗费资源,所以一般不建议开启,除非业务确实有较强的这方面需求。
官方文档对watch dog的描述:
lockWatchdogTimeout(监控锁的看门狗超时,单位:毫秒)
默认值:30000
监控锁的看门狗超时时间单位为毫秒。该参数只适用于分布式锁的加锁请求中未明确使用leaseTimeout参数的情况。
如果该看门狗未使用lockWatchdogTimeout去重新调整一个分布式锁的lockWatchdogTimeout超时,那么这个锁
将变为失效状态。这个参数可以用来避免由Redisson客户端节点宕机或其他原因造成死锁的情况。
4.3.2 基于Jedis实现
除了tryGetDistributedLock之外,还提供了一种过期时间怕短的锁方法,当锁即将超时时,会抛出超时异常,这样业务在捕获异常后,可以选择继续获取锁、或回滚事务并释放锁等动作,把主动权交给业务方。示例代码如下:
public static void lock(JedisPool jedisPool, String lockKey, String requestId, int expireTime, int timeout) throws Exception {
if (timeout > expireTime) {
throw new Exception("timeout 必须大于 expireTime");
} else {
Random random = new Random();
while(timeout > 0) {
boolean lock = tryGetDistributedLock(jedisPool, lockKey, requestId, expireTime);
if (lock) {
return;
}
int applyTime = random.nextInt(100);
timeout -= applyTime;
Thread.sleep((long)applyTime);
}
throw new LockTimeOutException("distributedLock timeout");
}
}
五 总结
本文基于Redisson3.12.2版本源码,对Redisson分布式锁过程进行了分析。从获取锁、释放锁的过程,可以大概了解Redisson的主要设计思想。此外,还对基于Jedis实现的一个分布式锁示例与Redisson进行对比,来看基于Redis的分布式锁的两种不同实现方式。