深入浅出的Redis分布式锁|得物技术
1. 分布式锁
1.1 分布式锁介绍
分布式锁是控制不同系统之间访问共享资源的一种锁实现,如果不同的系统或同一个系统的不同主机之间共享了某个资源时,往往需要互斥来防止彼此干扰来保证一致性。
1.2 为什么需要分布式锁
在单机部署的系统中,使用线程锁来解决高并发的问题,多线程访问共享变量的问题达到数据一致性,如使用synchornized、ReentrantLock等。但是在后端集群部署的系统中,程序在不同的JVM虚拟机中运行,且因为synchronized或ReentrantLock都只能保证同一个JVM进程中保证有效,所以这时就需要使用分布式锁了。这里就不再赘述synchornized锁的原理,想了解可以读这篇文章。
1.3 分布式锁需要具备的条件
分布式锁需要具备互斥性、不会死锁和容错等。互斥性,在于不管任何时候,应该只能有一个线程持有一把锁;不会死锁在于即使是持有锁的客户端意外宕机或发生进程被kill等情况时也能释放锁,不至于导致整个服务死锁。容错性指的是只要大多数节点正常工作,客户端应该都能获取和释放锁。
2. 分布式锁的实现方式
目前主流的分布式锁的实现方式,基于数据库实现分布式锁、基于Redis实现分布式锁、基于ZooKeeper实现分布式锁,本篇文章主要介绍了Redis实现的分布式锁。
2.1 由单机部署到集群部署锁的演变
一开始在redis设置一个默认值key:ticket 对应的值为20,并搭建一个Spring Boot服务,用来模拟多窗口卖票现象,配置类的代码就不一一列出了。
2.1.1 单机模式解决并发问题
一开始的时候在redis预设置的门票值ticket=20,那么当一个请求进来之后,会判断是否余票是否是大于0,若大于0那么就将余票减一,再重新写入Redis中,倘若库存小于0,那么就会打印错误日志。
4j
public class RedisLockController {
private Redisson redisson;
private StringRedisTemplate stringRedisTemplate;
"/lock") (
public String deductTicket() throws InterruptedException {
String lockKey = "ticket";
int ticketCount = Integer.parseInt(stringRedisTemplate.opsForValue().get(lockKey));
if (ticketCount > 0) {
int realTicketCount = ticketCount - 1;
log.info("扣减成功,剩余票数:" + realTicketCount + "");
stringRedisTemplate.opsForValue().set(lockKey, realTicketCount + "");
} else {
log.error("扣减失败,余票不足");
}
return "end";
}
}
代码运行分析:这里明显有一个问题,就是当前若有两个线程同时请求进来,那么两个线程同时请求这段代码时,如图thread 1 和thread 2同时,两个线程从Redis拿到的数据都是20,那么执行完成后thread 1 和thread 2又将减完后的库存ticket=19重新写入Redis,那么数据就会产生问题,实际上两个线程各减去了一张票数,然而实际写进就减了一次票数,就出现了数据不一致的现象。
这种问题很好解决,上述问题的产生其实就是从Redis中拿数据和减余票不是原子操作,那么此时只需要将按下图代码给这俩操作加上synchronized同步代码快就能解决这个问题。
4j
public class RedisLockController {
private Redisson redisson;
private StringRedisTemplate stringRedisTemplate;
"/lock") (
public String deductTicket() throws InterruptedException {
String lockKey = "ticket";
synchronized (this) {
int ticketCount = Integer.parseInt(stringRedisTemplate.opsForValue().get(lockKey));
if (ticketCount > 0) {
int realTicketCount = ticketCount - 1;
log.info("扣减成功,剩余票数:" + realTicketCount + "");
stringRedisTemplate.opsForValue().set(lockKey, realTicketCount + "");
} else {
log.error("扣减失败,余票不足");
}
}
return "end";
}
}
代码运行分析:此时当多个线程执行到第14行的位置时,只会有一个线程能够获取锁,进入synchronized代码块中执行,当该线程执行完成后才会释放锁,等下个线程进来之后就会重新给这段代码上锁再执行。说简单些就是让每个线程排队执行代码块中的代码,从而保证了线程的安全。
上述的这种做法如果后端服务只有一台机器,那毫无疑问是没问题的,但是现在互联网公司或者是一般软件公司,后端服务都不可能只用一台机器,最少都是2台服务器组成的后端服务集群架构,那么synchronized加锁就显然没有任何作用了。
如下图所示,若后端是两个微服务构成的服务集群,由nginx将多个的请求负载均衡转发到不同的后端服务上,由于synchronize代码块只能在同一个JVM进程中生效,两个请求能够同时进两个服务,所以上面代码中的synchronized就一点作用没有了。
用JMeter工具随便测试一下,就很简单能发现上述代码的bug。实际上synchronized和juc包下个那些锁都是只能用于JVM进程维度的锁,并不能运用在集群或分布式部署的环境中。
2.1.2 集群模式解决并发问题
通过上面的实验很容易就发现了synchronized等JVM进程级别的锁并不能解决分布式场景中的并发问题,就是为了应对这种场景产生了分布式锁。
本篇文章介绍了Redis实现的分布式锁,可以通过Redis的setnx(只在键key不存在的情况下, 将键key的值设置为value。若键key已经存在, 则SETNX命令不做任何动作。)的指令来解决的,这样就可以解决上面集群环境的锁不唯一的情况。
public class RedisLockController {
private Redisson redisson;
private StringRedisTemplate stringRedisTemplate;
public String deductTicket() throws InterruptedException {
String lockKey = "ticket";
// redis setnx 操作
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "dewu");
if (Boolean.FALSE.equals(result)) {
return "error";
}
int ticketCount = Integer.parseInt(stringRedisTemplate.opsForValue().get(lockKey));
if (ticketCount > 0) {
int realTicketCount = ticketCount - 1;
log.info("扣减成功,剩余票数:" + realTicketCount + "");
stringRedisTemplate.opsForValue().set(lockKey, realTicketCount + "");
} else {
log.error("扣减失败,余票不足");
}
stringRedisTemplate.delete(lockKey);
return "end";
}
}
代码运行分析:代码是有问题的,就是当执行扣减余票操作时,若业务代码报了异常,那么就会导致后面的删除Redis的key代码没有执行到,就会使Redis的key没有删掉的情况,那么Redis的这个key就会一直存在Redis中,后面的线程再进来执行下面这行代码都是执行不成功的,就会导致线程死锁,那么问题就会很严重了。
为了解决上述问题其实很简单,只要加上一个try...finally即可,这样业务代码即使抛了异常也可以正常的释放锁。setnx + try ... finally解决,具体代码如下:
public class RedisLockController {
private Redisson redisson;
private StringRedisTemplate stringRedisTemplate;
public String deductTicket() throws InterruptedException {
String lockKey = "ticket";
// redis setnx 操作
try {
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "dewu");
if (Boolean.FALSE.equals(result)) {
return "error";
}
int ticketCount = Integer.parseInt(stringRedisTemplate.opsForValue().get(lockKey));
if (ticketCount > 0) {
int realTicketCount = ticketCount - 1;
log.info("扣减成功,剩余票数:" + realTicketCount + "");
stringRedisTemplate.opsForValue().set(lockKey, realTicketCount + "");
} else {
log.error("扣减失败,余票不足");
}
} finally {
stringRedisTemplate.delete(lockKey);
}
return "end";
}
}
代码运行分析:上述问题解决了,但是又会有新的问题,当程序执行到try代码块中某个位置服务宕机或者服务重新发布,这样就还是会有上述的Redis的key没有删掉导致死锁的情况。这样可以使用Redis的过期时间来进行设置key,setnx + 过期时间解决,如下代码所示:
public class RedisLockController {
private Redisson redisson;
private StringRedisTemplate stringRedisTemplate;
public String deductTicket() throws InterruptedException {
String lockKey = "ticket";
// redis setnx 操作
try {
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "dewu");
//程序执行到这
stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);
if (Boolean.FALSE.equals(result)) {
return "error";
}
int ticketCount = Integer.parseInt(stringRedisTemplate.opsForValue().get(lockKey));
if (ticketCount > 0) {
int realTicketCount = ticketCount - 1;
log.info("扣减成功,剩余票数:" + realTicketCount + "");
stringRedisTemplate.opsForValue().set(lockKey, realTicketCount + "");
} else {
log.error("扣减失败,余票不足");
}
} finally {
stringRedisTemplate.delete(lockKey);
}
return "end";
}
}
代码运行分析:上述代码解决了因为程序执行过程中宕机导致的锁没有释放导致的死锁问题,但是如果代码像上述的这种写法仍然还是会有问题,当程序执行到第18行时,程序宕机了,此时Redis的过期时间并没有设置,也会导致线程死锁的现象。可以用了Redis设置的原子命设置过期时间的命令,原子性过期时间的setnx命令,如下代码所示:
public class RedisLockController {
private Redisson redisson;
private StringRedisTemplate stringRedisTemplate;
public String deductTicket() throws InterruptedException {
String lockKey = "ticket";
// redis setnx 操作
try {
Boolean result = stringRedisTemplate.opsForValue().setIfPresent(lockKey, "dewu", 10, TimeUnit.SECONDS);
if (Boolean.FALSE.equals(result)) {
return "error";
}
int ticketCount = Integer.parseInt(stringRedisTemplate.opsForValue().get(lockKey));
if (ticketCount > 0) {
int realTicketCount = ticketCount - 1;
log.info("扣减成功,剩余票数:" + realTicketCount + "");
stringRedisTemplate.opsForValue().set(lockKey, realTicketCount + "");
} else {
log.error("扣减失败,余票不足");
}
} finally {
stringRedisTemplate.delete(lockKey);
}
return "end";
}
}
代码运行分析:通过设置原子性过期时间命令可以很好的解决上述这种程序执行过程中突然宕机的情况。这种Redis分布式锁的实现看似已经没有问题了,但在高并发场景下任会存在问题,一般软件公司并发量不是很高的情况下,这种实现分布式锁的方式已经够用了,即使出了些小的数据不一致的问题,也是能够接受的,但是如果是在高并发的场景下,上述的这种实现方式还是会存在很大问题。
如上面代码所示,该分布式锁的过期时间是10s,假如thread 1执行完成时间需要15s,且当thread 1线程执行到10s时,Redis的key恰好就是过期就直接释放锁了,此时thread 2就可以获得锁执行代码了,假如thread 2线程执行完成时间需要8s,那么当thread 2线程执行到第5s时,恰好thread 1线程执行了释放锁的代码————stringRedisTemplate.delete(lockKey); 此时,就会发现thread 1线程删除的锁并不是其自己的加锁,而是thread 2加的锁;那么thread 3就又可以进来了,那么假如一共执行5s,那么当thread 3执行到第3s时,thread 2又会恰好执行到释放锁的代码,那么thread 2又删除了thread 3 加的锁。
在高并发场景下,倘若遇到上述问题,那将是灾难性的bug,只要高并发存在,那么这个分布式锁就会时而加锁成功时而加锁失败。
解决上述问题其实也很简单,让每个线程加的锁时给Redis设置一个唯一id的value,每次释放锁的时候先判断一下线程的唯一id与Redis 存的值是否相同,若相同即可释放锁。设置线程id的原子性过期时间的setnx命令,具体代码如下:
public class RedisLockController {
private Redisson redisson;
private StringRedisTemplate stringRedisTemplate;
public String deductTicket() throws InterruptedException {
String lockKey = "ticket";
String threadUniqueKey = UUID.randomUUID().toString();
// redis setnx 操作
try {
Boolean result = stringRedisTemplate.opsForValue().setIfPresent(lockKey, threadUniqueKey, 10, TimeUnit.SECONDS);
if (Boolean.FALSE.equals(result)) {
return "error";
}
int ticketCount = Integer.parseInt(stringRedisTemplate.opsForValue().get(lockKey));
if (ticketCount > 0) {
int realTicketCount = ticketCount - 1;
log.info("扣减成功,剩余票数:" + realTicketCount + "");
stringRedisTemplate.opsForValue().set(lockKey, realTicketCount + "");
} else {
log.error("扣减失败,余票不足");
}
} finally {
if (Objects.equals(stringRedisTemplate.opsForValue().get(lockKey), threadUniqueKey)) {
stringRedisTemplate.delete(lockKey);
}
}
return "end";
}
}
代码运行分析:上述实现的Redis分布式锁已经能够满足大部分应用场景了,但是还是略有不足,比如当线程进来需要的执行时间超过了Redis key的过期时间,那么此时已经释放了,你其他线程就可以立马获得锁执行代码,就又会产生bug了。
分布式锁Redis key的过期时间不管设置成多少都不合适,比如将过期时间设置为30s,那么如果业务代码出现了类似慢SQL、查询数据量很大那么过期时间就不好设置了。那么这里有没有什么更好的方案呢?答案是有的——锁续命。
那么锁续命方案的原来就在于当线程加锁成功时,会开一个分线程,取锁过期时间的1/3时间点定时执行任务,如上图的锁为例,每10s判断一次锁是否存在(即Redis的key),若锁还存在那么就直接重新设置锁的过期时间,若锁已经不存在了那么就直接结束当前的分线程。
2.2 Redison框架实现Redis分布式锁
上述“锁续命”方案说起来简单,但是实现起来还是挺复杂的,于是市面上有很多开源框架已经帮我们实现好了,所以就不需要自己再去重复造轮子再去写一个分布式锁了,所以本次就拿Redison框架来举例,主要是可以学习这种设计分布式锁的思想。
2.2.1 Redison分布式锁的使用
Redison实现的分布式锁,使用起来还是非常简单的,具体代码如下:
@RestController
@Slf4j
public class RedisLockController {
@Resource
private Redisson redisson;
@Resource
private StringRedisTemplate stringRedisTemplate;
@RequestMapping("/lock")
public String deductTicket() throws InterruptedException {
//传入Redis的key
String lockKey = "ticket";
// redis setnx 操作
RLock lock = redisson.getLock(lockKey);
try {
//加锁并且实现锁续命
lock.lock();
int ticketCount = Integer.parseInt(stringRedisTemplate.opsForValue().get(lockKey));
if (ticketCount > 0) {
int realTicketCount = ticketCount - 1;
log.info("扣减成功,剩余票数:" + realTicketCount + "");
stringRedisTemplate.opsForValue().set(lockKey, realTicketCount + "");
} else {
log.error("扣减失败,余票不足");
}
} finally {
//释放锁
lock.unlock();
}
return "end";
}
}
2.2.2 Redison分布式锁的原理
Redison实现分布式锁的原理流程如下图所示,当线程1加锁成功,并开始执行业务代码时,Redison框架会开启一个后台线程,每隔锁过期时间的1/3时间定时判断一次是否还持有锁(Redis中的key是否还存在),若不持有那么就直接结束当前的后台线程,若还持有锁,那么就重新设置锁的过期时间。当线程1加锁成功后,那么线程2就会加锁失败,此时线程2就会就会做类似于CAS的自旋操作,一直等待线程1释放了之后线程2才能加锁成功。
2.2.3 Redison分布式锁的源码分析
Redison底层实现分布式锁时使用了大量的lua脚本保证了其加锁操作的各种原子性。Redison实现分布式锁使用lua脚本的好处主要是能保证Redis的操作是原子性的,Redis会将整个脚本作为一个整体执行,中间不会被其他命令插入。
Redisson核心使用lua脚本加锁源码分析:
方法名为tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command):
//使用lua脚本加锁方法
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
//当第一个线程进来会直接执行这段逻辑
//判断传入的Redis的key是否存在,即String lockKey = "ticket";
"if (redis.call('exists', KEYS[1]) == 0) then " +
//如果不存在那么就设置这个key为传入值、当前线程id 即参数ARGV[2]值(即getLockName(threadId)),并且将线程id的value值设置为1
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
//再给这个key设置超时时间,超时时间即参数ARGV[1](即internalLockLeaseTime的值)的时间
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
//当第二个线程进来,Redis中的key已经存在(锁已经存在),那么直接进这段逻辑
//判断这个Redis key是否存在且当前的这个key是否是当前线程设置的
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
//如果是的话,那么就进入重入锁的逻辑,利用hincrby指令将第一个线程进来将线程id的value值设置为1再加1
//然后每次释放锁的时候就会减1,直到这个值为0,这把锁就释放了,这点与juc的可重锁类似
//“hincrby”指令为Redis hash结构的加法
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
//倘若不是本线程加的锁,而是其他线程加的锁,由于上述lua脚本都是有线程id的校验,那么上面的两段lua脚本都不会执行
//那么此时这里就会将当前这个key的过期时间返回
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); // KEYS[1]) ARGV[1] ARGV[2]
}
// getName()传入KEYS[1],表示传入解锁的keyName,这里是 String lockKey = "ticket";
// internalLockLeaseTime传入ARGV[1],表示锁的超时时间,默认是30秒
// getLockName(threadId)传入ARGV[2],表示锁的唯一标识线程id
设置监听器方法:方法名tryAcquireOnceAsync(long leaseTime, TimeUnit unit, final long threadId)。
//设置监听器方法:
private RFuture<Boolean> tryAcquireOnceAsync(long leaseTime, TimeUnit unit, final long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
}
//加锁成功这里会返回一个null值,即ttlRemainingFuture为null
//若线程没有加锁成功,那么这里返回的就是这个别的线程加过的锁的剩余的过期时间,即ttlRemainingFuture为过期时间
RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
//如果还持有这个锁,则开启定时任务不断刷新该锁的过期时间
//这里给当前业务加了个监听器
ttlRemainingFuture.addListener(new FutureListener<Boolean>() {
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
return;
}
Boolean ttlRemaining = future.getNow();
// lock acquired
if (ttlRemaining) {
//定时任务执行方法
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
定时任务执行方法: 方法名scheduleExpirationRenewal(final long threadId):
//定时任务执行方法
private void scheduleExpirationRenewal(final long threadId) {
if (expirationRenewalMap.containsKey(getEntryName())) {
return;
}
//这里new了一个TimerTask()定时任务器
//这里定时任务会推迟执行,推迟的时间是设置的锁过期时间的1/3,
//很容易就能发现是一开始锁的过期时间默认值30s,具体可见private long lockWatchdogTimeout = 30 * 1000;
//过期时间单位是秒
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
public void run(Timeout timeout) throws Exception {
RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
//这里又是一个lua脚本
//这里lua脚本先判断了一下,Redis的key是否存在且设置key的线程id是否是参数ARGV[2]值
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
//如果这个线程创建的Redis的key即锁仍然存在,那么久给锁的过期时间重新设值为internalLockLeaseTime,也就是初始值30s
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
//Redis的key过期时间重新设置成功后,这里的lua脚本返回的就是1
"return 1; " +
"end; " +
//如果主线程已经释放了这个锁,那么这里的lua脚本就会返回0,直接结束“看门狗”的程序
"return 0;",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
future.addListener(new FutureListener<Boolean>() {
public void operationComplete(Future<Boolean> future) throws Exception {
expirationRenewalMap.remove(getEntryName());
if (!future.isSuccess()) {
log.error("Can't update lock " + getName() + " expiration", future.cause());
return;
}
if (future.getNow()) {
// reschedule itself
scheduleExpirationRenewal(threadId);
}
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
task.cancel();
}
}
//上面源码分析过了,当加锁成功后tryAcquireAsync()返回的值为null, 那么这个方法的返回值也为null
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(leaseTime, unit, threadId));
}
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
//获得当前线程id
long threadId = Thread.currentThread().getId();
//由上面的源码分析可以得出,当加锁成功后,这个ttl就是null
//若线程没有加锁成功,那么这里返回的就是这个别的线程加过的锁的剩余的过期时间
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
//如果加锁成功后,这个ttl就是null,那么这个方法后续就不需要做任何逻辑
//若没有加锁成功这里ttl的值不为null,为别的线程加过锁的剩余的过期时间,就会继续往下执行
if (ttl == null) {
return;
}
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try {
//若没有加锁成功的线程,会在这里做一个死循环,即自旋
while (true) {
//一直死循环尝试加锁,这里又是上面的加锁逻辑了
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
//这里不会疯狂自旋,这里会判断锁失效之后才会继续进行自旋,这样可以节省一点CPU资源
// waiting for message
if (ttl >= 0) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit));
}
Redison底层解锁源码分析:
public void unlock() {
// 调用异步解锁方法
Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId()));
//当释放锁的线程和已存在锁的线程不是同一个线程,返回null
if (opStatus == null) {
throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + Thread.currentThread().getId());
}
//根据执行lua脚本返回值判断是否取消续命订阅
if (opStatus) {
// 取消续命订阅
cancelExpirationRenewal();
}
}
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
//如果锁已经不存在, 发布锁释放的消息,返回1
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
//如果释放锁的线程和已存在锁的线程不是同一个线程,返回null
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
//当前线程持有锁,用hincrby命令将锁的可重入次数-1,即线程id的value值-1
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
//若线程id的value值即可重入锁的次数大于0 ,就更新过期时间,返回0
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
//否则证明锁已经释放,删除key并发布锁释放的消息,返回1
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
}
// getName()传入KEYS[1],表示传入解锁的keyName
// getChannelName()传入KEYS[2],表示redis内部的消息订阅channel
// LockPubSub.unlockMessage传入ARGV[1],表示向其他redis客户端线程发送解锁消息
// internalLockLeaseTime传入ARGV[2],表示锁的超时时间,默认是30秒
// getLockName(threadId)传入ARGV[3],表示锁的唯一标识线程id
void cancelExpirationRenewal() {
// 将该线程从定时任务中删除
Timeout task = expirationRenewalMap.remove(getEntryName());
if (task != null) {
task.cancel();
}
}
上述情况如果是单台Redis,那么利用Redison开源框架实现Redis的分布式锁已经很完美了,但是往往生产环境的的Redis一般都是哨兵主从架构,Redis的主从架构有别与Zookeeper的主从,客户端只能请求Redis主从架构的Master节点,Slave节点只能做数据备份,Redis从Master同步数据到Slave并不需要同步完成后才能继续接收新的请求,那么就会存在一个主从同步的问题。
当Redis的锁设置成功,正在执行业务代码,当Redis向从服务器同步时,Redis的Maste节点宕机了,Redis刚刚设置成功的锁还没来得及同步到Slave节点,那么此时Redis的主从哨兵模式就会重新选举出新的Master节点,那么这个新的Master节点其实就是原来的Slave节点,此时后面请求进来的线程都会请求这个新的Master节点,然而选举后产生的新Master节点实际上是没有那把锁的,那么从而导致了锁的失效。
上述问题用Redis主从哨兵架构实现的分布式锁在这种极端情况下是无法避免的,但是一般情况下生产上这种故障的概率极低,即使偶尔有问题也是可以接受的。
如果想使分布式锁变的百分百可靠,那可以选用Zookeeper作为分布式锁,就能完美的解决这个问题。由于zk的主从数据同步有别与Redis主从同步,zk的强一致性使得当客户端请求zk的Leader节点加锁时,当Leader将这个锁同步到了zk集群的大部分节点时,Leader节点才会返回客户端加锁成功,此时当Leader节点宕机之后,zk内部选举产生新的Leader节点,那么新的客户款访问新的Leader节点时,这个锁也会存在,所以zk集群能够完美解决上述Redis集群的问题。
由于Redis和Zookeeper的设计思路不一样,任何分布式架构都需要满足CAP理论,“鱼和熊掌不可兼得”,要么选择AP要么选择CP,很显然Redis是AP结构,而zk是属于CP架构,也导致了两者的数据同步本质上的区别。
其实设计Redis分布式锁有种RedLock的思想就是借鉴zk实现分布式锁的这个特点,这种Redis的加锁方式在Redison框架中也有提供api,具体使用也很简单,这里就不一一赘述了。其主要思想如下图所示:
这种实现方式,我认为生产上并不推荐使用。很简单原本只需要对一个Redis加锁,设置成功返回即可,但是现在需要对多个Redis进行加锁,无形之中增加了好几次网络IO,万一第一个Redis加锁成功后,后面几个Redis在加锁过程中出现了类似网络异常的这种情况,那第一个Redis的数据可能就需要做数据回滚操作了,那为了解决一个极低概率发生的问题又引入了多个可能产生的新问题,很显然得不偿失。并且这里还有可能出现更多乱七八糟的问题,所以我认为这种Redis分布式锁的实现方式极其不推荐生产使用。
退一万说如果真的需要这种强一致性的分布式锁的话,那为什么不直接用zk实现的分布式锁呢,性能肯定也比这个RedLock的性能要好。
3. 分布式锁使用场景
这里着重讲一下分布式锁的两种以下使用场景:
3.1 热点缓存key重建优化
一般情况下互联网公司基本都是使用“缓存”加过期时间的策略,这样不仅加快数据读写, 而且还能保证数据的定期更新,这种策略能够满足大部分需求,但是也会有一种特殊情况会有问题:原本就存在一个冷门的key,因为某个热点新闻的出现,突然这个冷门的key请求量暴增成了使其称为了一个热点key,此时缓存失效,并且又无法在很短时间内重新设置缓存,那么缓存失效的瞬间,就会有大量线程来访问到后端,造成数据库负载加大,从而可能会让应用崩溃。
例如:“Air Force one”原本就是一个冷门的key存在于缓存中,微博突然有个明星穿着“Air Force one”上了热搜,那么就会有很多明星的粉丝来得物app购买“Air Force one”,此时的“Air Force one”就直接成为了一个热点key,那么此时“Air Force one”这个key如果缓存恰好失效了之后,就会有大量的请求同时访问到db,会给后端造成很大的压力,甚至会让系统宕机。
要解决这个问题只需要用一个简单的分布式锁即可解决这个问题,只允许一个线程去重建缓存,其他线程等待重建缓存的线程执行完, 重新从缓存获取数据即可。可见下面的实例伪代码:
//分布式锁解决热点缓存,代码如下:
public String getCache(String key) {
//从缓存获取数据
String value = stringRedisTemplate.opsForValue().get(key);
//传入Redis的key
try {
if (Objects.isNull(value)) {
//这里只允许一个线程进入,重新设置缓存
String mutexKey = key;
//从db 获取数据
value = mysql.getDataFromMySQL();
//写回缓存
stringRedisTemplate.opsForValue().setIfPresent(mutexKey, "poizon", 60, TimeUnit.SECONDS);
//删除key
stringRedisTemplate.delete(mutexKey);
} else {
Thread.sleep(100);
getCache(key);
}
} catch (InterruptedException e) {
log.error("getCache is error", e);
}
return value;
}
3.2 解决缓存与数据库数据不一致问题
如果业务对数据的缓存与数据库需要强一致时,且并发量不是很高的情况下的情况下时,就可以直接加一个分布式读写锁就可以直接解决这个问题了。可以直接利用可以加分布式读写锁保证并发读写或写写的时候按顺序排好队,读读的时候相当于无锁。
并发量不是很高且业务对缓存与数据库有着强一致对要求时,通过这种方式实现最简单,且效果立竿见影。倘若在这种场景下,如果还监听binlog通过消息的方式延迟双删的方式去保证数据一致性的话,引入了新的中间件增加了系统的复杂度,得不偿失。
3.3超高并发场景下的分布式锁设计理论
与ConcurrentHashMap的设计思想有点类似,用分段锁来实现,这个是之前在网上看到的实现思路,本人并没有实际使用过,不知道水深不深,但是可以学习一下实现思路。
假如A商品的库存是2000个,现在可以将该A商品的2000个库存利用类似ConcurrentHashMap的原理将不同数量段位的库存的利用取模或者是hash算法让其扩容到不同的节点上去,这样这2000的库存就水平扩容到了多个Redis节点上,然后请求Redis拿库存的时候请求原本只能从一个Redis上取数据,现在可以从五个Redis上取数据,从而可以大大提高并发效率。
4. 总结与思考
综上可知,Redis分布式锁并不是绝对安全,Redis分布式锁在某种极端情况下是无法避免的,但是一般情况下生产上这种故障的概率极低,即使偶尔有问题也是可以接受。
CAP 原则指的是在一个分布式系统中,一致性(Consistency)、可用性(Availability)、分区容错性(Partition tolerance)这三个要素最多只能同时实现两点,不可能三者兼顾。鱼和熊掌不可兼得”,要么选择AP要么选择CP,选择Redis作为分布式锁的组件在于其单线程内存操作效率很高,且在高并发场景下也可以保持很好的性能。
如果一定要要求分布式锁百分百可靠,那可以选用Zookeeper或者MySQL作为分布式锁,就能完美的解决锁安全的问题,但是选择了一致性那就要失去可用性,所以Zookeeper或者MySQL实现的分布式锁的性能远不如Redis实现的分布式锁。
最后着重感谢组内同学对本篇blog的建议与支持,若有不足的地方烦请指出,大家可以一起探讨学习,共同进步。😁😁
*文/harmony
要是觉得文章对你有帮助的话,欢迎评论转发点赞~