基于Redis+Lua实现分布式锁模拟秒杀扣减库存业务
最近和几个小伙伴聊了聊基于Redis的分布式锁实现秒杀扣减库存业务的一些技术细节,刚好最近钻研了一段时间,本篇内容通过1个详细的案例,把这个实现方案作个记录,当做自己对知识的总结积累。
本案例我们通过以下6个部分来讲解基于Redis+Lua实现分布式锁的详细过程,案例背景是模拟秒杀扣减库存的经典业务:
1、什么是分布式锁 ?
要说起分布式锁,首先要提到与分布式锁相对应的是线程锁、进程锁。
线程锁:主要用来给方法、代码块加锁。当某个方法或代码块使用锁,在同一时刻仅有一个线程执行该方法或该代码段。线程锁只在同一JVM中有效果,因为线程锁的实现在根本上是不同线程之间对于同一JVM共享其内存实现的,比如synchronized是共享对象头,显示锁Lock是共享某个变量(state)。
进程锁:为了控制同一操作系统中多个进程访问某个共享资源,因为进程具有独立性,各个进程无法访问其他进程的资源,因此无法通过synchronized等线程锁实现进程锁。
分布式锁:当多个进程(多个应用程序)不在同一个系统中,比如微服务部署多节点,就要用分布式锁控制多个进程对资源的访问。
2、什么场景下需要分布式锁 ?
随着互联网的高速发展,一些电商系统承载的业务体量也在大大增加,那么企业为了应对巨大的业务体量,很多时候会用到微服务,单个服务会部署多个节点,以此来处理瞬间激增的用户请求,比如秒杀促销的时候,某个界面一下子有10000人同时抢购,那么这10000个人的请求会按照特定的负载均衡策略,把10000个用户请求路由到不同的多个节点去处理,大大减轻了单台应用处理业务的能力,提升了效率,提升了用户的体验度。那么这个时候就要用到分布式锁去满足技术方案。
3、用了分布式锁会带来什么好处 ?
接着上面的举例,如果没用到分布式锁,那么很可能出出现,用户1下单的代码过程中,还没等用户1处理完订单数据,用户2的请求进入到用户1下单的线程中了,这2个线程在同1个应用中执行,是共享的同1个JVM内存,那么可能会出现订单数据错乱,最终导致订单业务完全发生重大严重错误。但是如果采用了分布式锁,并且使用得当的情况下完全可以避免这个问题。这时就凸显分布式锁的重要作用了。
4、实现分布式锁可以采用哪些方案实现 ?
目前行业中分布式锁实现众所周知的主要有3种方案:
1.采用数据库的事务锁
2.采用Zookeeper框架
3.基于redis实现
目前主流的比较成熟并且比较大众化的方案就是基于redis实现,当然再组合上Lua,实现分布式锁实现过程会更简单,功能更强大。本案例笔者就以springboot+Jedis+redis+Lua通过具体的例子详细讲解分布式锁的实现过程和思路。
5、分布式锁需要满足的4个的必要条件
互斥性。在任意时刻,只有一个客户端线程能持有锁。
不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也要保证后续其他客户端能加锁。
具有容错性。只要大部分的Redis节点正常运行,客户端就可以加锁和解锁。
解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。
6、代码实现
(1)、引入依赖
首先我们要通过Maven引入Jedis
开源组件,在pom.xml
文件加入下面的代码。
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
(2)、正确加锁方式
4j
public class RedisPool {
private static final String LOCK_SUCCESS = "OK";
private static final String SET_IF_NOT_EXIST = "NX";
private static final String SET_WITH_EXPIRE_TIME = "PX";
private static final Long RELEASE_SUCCESS = 1L;
// 锁的过期时间
private static int EXPIRE_TIME = 500;
private static JedisPool pool;//jedis连接池对象
private static int maxTotal = 20;//最大连接数
private static int maxIdle = 10;//最大空闲连接数
private static int minIdle = 5;//最小空闲连接数
private static boolean testOnBorrow = true;//在取连接时测试连接的可用性
private static boolean testOnReturn = false;//再还连接时不测试连接的可用性
static {
initPool();//初始化连接池
}
public static Jedis getJedis(){
return pool.getResource();
}
public static void close(Jedis jedis){
jedis.close();
}
public static JedisPool getPool(String host,int port) {
JedisPoolConfig config = new JedisPoolConfig();//连接池配置类
config.setMaxTotal(maxTotal);
config.setMaxIdle(maxIdle);
config.setMinIdle(minIdle);
config.setTestOnBorrow(testOnBorrow);
config.setTestOnReturn(testOnReturn);
config.setBlockWhenExhausted(true);
return new JedisPool(config, host, port);
}
private static void initPool(){
JedisPoolConfig config = new JedisPoolConfig();//连接池配置类
config.setMaxTotal(maxTotal);
config.setMaxIdle(maxIdle);
config.setMinIdle(minIdle);
config.setTestOnBorrow(testOnBorrow);
config.setTestOnReturn(testOnReturn);
config.setBlockWhenExhausted(true);
pool = new JedisPool(config, "XXXXXXXX", 6379, 5000, "*******");
}
//加锁方法
//加锁之后返回锁的持有者(锁的value使用唯一时间戳标志每个客户端,保证只有锁的持有者才可以释放锁)
public static String lock(Jedis jedis, String key,Long waitEnd,String requestId) {
try {
// 1秒内数次加锁如果失败,则不断请求重新获取锁,超过1秒还没能加锁,就加锁失败(为了每个线程拥有公平的机会获取锁)
while (System.currentTimeMillis() < waitEnd) {// 1秒类不断尝试加锁(加锁之后返回锁的持有者)
String result = jedis.set(key, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, EXPIRE_TIME);
if (LOCK_SUCCESS.equals(result)) {
return requestId;
}
}
} catch (Exception ex) {
log.error("lock error", ex);
}
return null;
}
}
(3)、加锁方法代码解读
我们加锁关键的1行代码:jedis.set(String key, String value, String nxxx, String expx, int time)
,这个set()方法一共有五个形参:
第一个为key,我们一般使用某1业务的唯一属性来当做作为key加锁,因为key是唯一的。
第二个为value,我们传的是requestId,很多童鞋可能不明白,有key作为锁不就够了吗,为什么还要用到value?原因就是我们在上面讲分布式锁要满足第四个条件解铃还须系铃人,通过给value赋值为requestId,我们就知道这把锁是哪个请求加的了,在解锁的时候就可以有依据。requestId可以使用时间戳,UUID等等生成。
第三个为nxxx,这个参数我们填的是NX,意思是SET IF NOT EXIST,即当key不存在时,我们进行set操作;若key已经存在,则不做任何操作;
第四个为expx,这个参数我们传的是PX,意思是我们要给这个key加一个过期的设置,具体时间由第五个参数决定。
第五个为time,与第四个参数相呼应,代表key的过期时间,单位是毫秒
总的来说,执行上面的set()方法就只会导致两种结果:
1. 当前没有锁(key不存在),那么就进行加锁操作,并对锁设置个有效期,返回锁的持有者客户端。
2. 对于当前key已有锁存在,不做任何操作。
心细的朋友就会发现,我们的加锁代码满足我们描述的三个条件。首先,set()加入了NX参数,可以保证如果已有key存在,则函数不会调用成功,也就是只有一个客户端能持有锁,满足互斥性。其次,由于我们对锁设置了过期时间,即使锁的持有者后续发生崩溃而没有解锁,锁也会因为到了过期时间而自动释放锁(即key被删除),不会发生死锁。最后,因为我们将value赋值为requestId,代表加锁的客户端请求标识(锁的持有者),那么在客户端在解锁的时候就可以进行校验是否是同一个客户端,同1个客户端只能释放自己的锁,而不能释放别的客户端的锁。由于我们只考虑Redis单机部署的场景,所以容错性我们暂不考虑。
(4)、错误的加锁代码解析
比较常见的错误示例就是使用jedis.setnx()
和jedis.expire()这2个操作
组合实现加锁,代码如下:
public static void tryLock1(Jedis jedis, String lockKey, String requestId, int expireTime) {
Long result = jedis.setnx(lockKey, requestId);
if (result == 1) {
// 若在这里程序突然崩溃,则无法设置过期时间,将发生死锁
jedis.expire(lockKey, expireTime);
}
}
setnx()方法作用就是SET IF NOT EXIST,expire()方法就是给锁加一个过期时间。乍一看好像和前面的set()方法结果一样,然而由于这是两条Redis命令,不具有原子性,如果程序在执行完setnx()之后突然崩溃,导致锁没有设置过期时间。那么将会发生死锁。网上之所以有人这样实现,是因为低版本的jedis并不支持多参数的set()方法。
(5)、正确的释放锁
private static final Long RELEASE_SUCCESS = 1L;
public static boolean unLock(Jedis jedis, String lockKey, String requestId) {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
if (RELEASE_SUCCESS.equals(result)) {
return true;
}
return false;
}
可以看到,我们解锁只需要两行代码就搞定了!第一行代码,我们写了一个简单的Lua脚本代码,第二行代码,我们将Lua代码传到jedis.eval()
方法里,并使参数KEYS[1]赋值为lockKey,ARGV[1]赋值为requestId。eval()方法是将Lua代码交给Redis服务端执行。
那么这段Lua代码的功能是什么呢?其实很简单,首先获取锁对应的value值,检查是否与requestId相等,如果相等则删除锁(解锁)。那么为什么要使用Lua语言来实现呢?因为要确保上述多个操作是原子性的。简单来说,就是在eval命令执行Lua代码的时候,Lua代码将被当成一个整体命令去执行,并且直到eval命令执行完成,Redis才会执行其他命令。是不是瞬间感觉高大上。
(6)、错误的释放锁代码解析
最常见的解锁代码就是直接使用jedis.del()
方法删除锁,这种不先判断锁的拥有者而直接解锁的方式,会导致不管客户端有没有加锁,任何客户端都可以随时进行解锁,即使这把锁不是它的,或者即使没有加锁。
public static void wrongReleaseLock(Jedis jedis, String lockKey) {
jedis.del(lockKey);
}
(7)、实战案例代码
public static void main(String[] args) {
//使用固定线程数为4 的线程池处理并发请求
ExecutorService pool1 = new ThreadPoolExecutor(4, 10, 1000, TimeUnit.MILLISECONDS, new PriorityBlockingQueue<Runnable>(),Executors.defaultThreadFactory());
//10个人抢购编码为101的共计5个商品
for(int i=0;i<50;i++) {//模拟100个人同时抢购5件商品
String userCode = 10*(i+1)+"";
//System.out.println("用户"+(i+1)+"的编码:"+userCode);
pool1.execute(new ThreadTask("101",userCode,1L));
}
}
这里模拟50个用户的秒杀请求,抢购5件商品,该商品的sku编码为sku-101,提前把这个数量放到redis里,基于这个sku的数量作为库存进行扣减。如下如所示:
/**
* @author yk
* @date 2020年9月7日 上午10:28:46
* @version 1.0
* @since jdk1.8
* @description <>
*/
public class ThreadTask implements Runnable,Comparable<ThreadTask>{
private String userCode;
private Long num;
private String key;
private static int WAIT_TIME = 2 * 1000;
private int priority;
public int getPriority() {
return priority;
}
public void setPriority(int priority) {
this.priority = priority;
}
public ThreadTask() {
}
public ThreadTask(int priority) {
this.priority = priority;
}
//当前对象和其他对象做比较,当前优先级大就返回-1,优先级小就返回1,值越小优先级越高
public int compareTo(ThreadTask o) {
return this.priority>o.priority?-1:1;
}
public ThreadTask(String key,String userCode,Long num){
this.userCode = userCode;
this.num = num;
this.key = key;
}
private static String STOCK_KEY_PREFIX = "sku-";
public void run() {
//让线程阻塞,使后续任务进入缓存队列
System.out.println("当前系统线程:"+Thread.currentThread().getName());
Jedis jedis = RedisPool.getJedis();
Long waitEnd = System.currentTimeMillis() + WAIT_TIME;
String bb = jedis.get("sku-101");
String value = RedisPool.lock(jedis,key,waitEnd,userCode);//加锁成功,获取锁的持有者
String lockKey = null;
if(!StringUtils.isBlank(value) && value.equals(userCode)){//加锁成功后查询库存
try {
//synchronized (key){
lockKey = key;
Long stock = Long.valueOf(jedis.get(STOCK_KEY_PREFIX+lockKey));
if( stock >= num){
String script = "return redis.call('incrby', KEYS[1], ARGV[1])";
Object eval = jedis.eval(script,Lists.newArrayList(STOCK_KEY_PREFIX+lockKey), Lists.newArrayList(String.valueOf(0-num)));
if(null != eval && Integer.valueOf(String.valueOf(eval)) >= 0){
System.out.println("用户【"+userCode+"】秒杀商品【"+lockKey+"】成功,库存剩余:"+String.valueOf(eval));
}
}else{
System.out.println("用户【"+userCode+"】秒杀商品【"+lockKey+"】失败,库存剩余:"+stock.toString());
}
Thread.sleep(100);//这里为了更真实的模拟多线程并发,这里线程获取到锁后,线程休眠100ms
//}
} catch (Exception e) {
// TODO: handle exception
} finally {
System.out.println("用户【"+userCode+"】秒杀商品【"+lockKey+"】的线程释放锁");
RedisPool.unLock(lockKey,userCode);//处理完扣减库存业务释放锁,把抢购这件商品的机会留给其余用户
}
}else{
System.out.println("当前用户【"+userCode+"】抢购商品【"+key+"】的线程加锁失败,未抢购到,请再试");
}
}
}
执行上面main方法,无论执行几次,打印出如下日志,可见都不会出现超卖现象,并且加锁和加锁的都是同1个客户端,每个用户加锁并且只能释放自己的锁。
当前系统线程:pool-1-thread-1
当前系统线程:pool-1-thread-3
当前系统线程:pool-1-thread-4
当前系统线程:pool-1-thread-2
用户【20】秒杀商品【101】成功,库存剩余:4
用户【20】秒杀商品【101】的线程释放锁
当前系统线程:pool-1-thread-2
用户【10】秒杀商品【101】成功,库存剩余:3
用户【10】秒杀商品【101】的线程释放锁
当前系统线程:pool-1-thread-1
用户【50】秒杀商品【101】成功,库存剩余:2
用户【50】秒杀商品【101】的线程释放锁
当前系统线程:pool-1-thread-2
用户【40】秒杀商品【101】成功,库存剩余:1
用户【40】秒杀商品【101】的线程释放锁
当前系统线程:pool-1-thread-4
当前用户【30】抢购商品【101】的线程加锁失败,未抢购到,请再试
当前系统线程:pool-1-thread-3
用户【70】秒杀商品【101】成功,库存剩余:0
用户【70】秒杀商品【101】的线程释放锁
当前系统线程:pool-1-thread-1
用户【110】秒杀商品【101】失败,库存剩余:0
用户【110】秒杀商品【101】的线程释放锁
当前系统线程:pool-1-thread-2
用户【190】秒杀商品【101】失败,库存剩余:0
用户【190】秒杀商品【101】的线程释放锁
当前系统线程:pool-1-thread-4
用户【490】秒杀商品【101】失败,库存剩余:0
用户【490】秒杀商品【101】的线程释放锁
当前系统线程:pool-1-thread-2
当前用户【350】抢购商品【101】的线程加锁失败,未抢购到,请再试
当前系统线程:pool-1-thread-3
用户【480】秒杀商品【101】失败,库存剩余:0
用户【480】秒杀商品【101】的线程释放锁
当前用户【500】抢购商品【101】的线程加锁失败,未抢购到,请再试
当前系统线程:pool-1-thread-4
当前系统线程:pool-1-thread-1
用户【460】秒杀商品【101】失败,库存剩余:0
用户【460】秒杀商品【101】的线程释放锁
当前系统线程:pool-1-thread-3
用户【450】秒杀商品【101】失败,库存剩余:0
用户【450】秒杀商品【101】的线程释放锁
当前系统线程:pool-1-thread-4
用户【420】秒杀商品【101】失败,库存剩余:0
用户【420】秒杀商品【101】的线程释放锁
当前用户【470】抢购商品【101】的线程加锁失败,未抢购到,请再试
当前系统线程:pool-1-thread-2
当前系统线程:pool-1-thread-4
用户【410】秒杀商品【101】失败,库存剩余:0
用户【410】秒杀商品【101】的线程释放锁
当前用户【440】抢购商品【101】的线程加锁失败,未抢购到,请再试
当前系统线程:pool-1-thread-1
用户【430】秒杀商品【101】失败,库存剩余:0
用户【430】秒杀商品【101】的线程释放锁
用户【400】秒杀商品【101】失败,库存剩余:0
用户【400】秒杀商品【101】的线程释放锁
总结
本文主要介绍了使用Java代码正确实现Redis分布式锁,对于加锁和解锁也分别给出了两个比较经典的错误示例。其实想要通过Redis实现分布式锁并不难,只要保证能满足上面可靠性里的四个必要条件。本案例讲解到这来,欢迎大家交流。