vlambda博客
学习文章列表

基于Redis+Lua实现分布式锁模拟秒杀扣减库存业务

最近和几个小伙伴聊了聊基于Redis的分布式锁实现秒杀扣减库存业务的一些技术细节,刚好最近钻研了一段时间,本篇内容通过1个详细的案例,把这个实现方案作个记录,当做自己对知识的总结积累。

本案例我们通过以下6个部分来讲解基于Redis+Lua实现分布式锁的详细过程,案例背景是模拟秒杀扣减库存的经典业务:

1、什么是分布式锁 ?

要说起分布式锁,首先要提到与分布式锁相对应的是线程锁、进程锁。

线程锁:主要用来给方法、代码块加锁。当某个方法或代码块使用锁,在同一时刻仅有一个线程执行该方法或该代码段。线程锁只在同一JVM中有效果,因为线程锁的实现在根本上是不同线程之间对于同一JVM共享其内存实现的,比如synchronized是共享对象头,显示锁Lock是共享某个变量(state)。

进程锁:为了控制同一操作系统中多个进程访问某个共享资源,因为进程具有独立性,各个进程无法访问其他进程的资源,因此无法通过synchronized等线程锁实现进程锁。

分布式锁:当多个进程(多个应用程序)不在同一个系统中,比如微服务部署多节点,就要用分布式锁控制多个进程对资源的访问。

2、什么场景下需要分布式锁 ?

随着互联网的高速发展,一些电商系统承载的业务体量也在大大增加,那么企业为了应对巨大的业务体量,很多时候会用到微服务,单个服务会部署多个节点,以此来处理瞬间激增的用户请求,比如秒杀促销的时候,某个界面一下子有10000人同时抢购,那么这10000个人的请求会按照特定的负载均衡策略,把10000个用户请求路由到不同的多个节点去处理,大大减轻了单台应用处理业务的能力,提升了效率,提升了用户的体验度。那么这个时候就要用到分布式锁去满足技术方案。

3、用了分布式锁会带来什么好处 ?

接着上面的举例,如果没用到分布式锁,那么很可能出出现,用户1下单的代码过程中,还没等用户1处理完订单数据,用户2的请求进入到用户1下单的线程中了,这2个线程在同1个应用中执行,是共享的同1个JVM内存,那么可能会出现订单数据错乱,最终导致订单业务完全发生重大严重错误。但是如果采用了分布式锁,并且使用得当的情况下完全可以避免这个问题。这时就凸显分布式锁的重要作用了。

4、实现分布式锁可以采用哪些方案实现 ?

目前行业中分布式锁实现众所周知的主要有3种方案:
1.采用数据库的事务锁

2.采用Zookeeper框架

3.基于redis实现

目前主流的比较成熟并且比较大众化的方案就是基于redis实现,当然再组合上Lua,实现分布式锁实现过程会更简单,功能更强大。本案例笔者就以springboot+Jedis+redis+Lua通过具体的例子详细讲解分布式锁的实现过程和思路。

5、分布式锁需要满足的4个的必要条件

  1. 互斥性。在任意时刻,只有一个客户端线程能持有锁。

  2. 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也要保证后续其他客户端能加锁。

  3. 具有容错性。只要大部分的Redis节点正常运行,客户端就可以加锁和解锁。

  4. 解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。

6、代码实现

(1)、引入依赖

首先我们要通过Maven引入Jedis开源组件,在pom.xml文件加入下面的代码。

<dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.9.0</version></dependency>

(2)、正确加锁方式

@Slf4j@Componentpublic class RedisPool {private static final String LOCK_SUCCESS = "OK";private static final String SET_IF_NOT_EXIST = "NX";    private static final String SET_WITH_EXPIRE_TIME = "PX";private static final Long RELEASE_SUCCESS = 1L;
// 锁的过期时间private static int EXPIRE_TIME = 500;
private static JedisPool pool;//jedis连接池对象
private static int maxTotal = 20;//最大连接数
private static int maxIdle = 10;//最大空闲连接数
private static int minIdle = 5;//最小空闲连接数
private static boolean testOnBorrow = true;//在取连接时测试连接的可用性
private static boolean testOnReturn = false;//再还连接时不测试连接的可用性
static { initPool();//初始化连接池 }public static Jedis getJedis(){return pool.getResource();    }public static void close(Jedis jedis){ jedis.close();    }@Autowiredpublic static JedisPool getPool(String host,int port) { JedisPoolConfig config = new JedisPoolConfig();//连接池配置类 config.setMaxTotal(maxTotal); config.setMaxIdle(maxIdle); config.setMinIdle(minIdle); config.setTestOnBorrow(testOnBorrow); config.setTestOnReturn(testOnReturn); config.setBlockWhenExhausted(true);return new JedisPool(config, host, port); }private static void initPool(){ JedisPoolConfig config = new JedisPoolConfig();//连接池配置类 config.setMaxTotal(maxTotal); config.setMaxIdle(maxIdle); config.setMinIdle(minIdle); config.setTestOnBorrow(testOnBorrow); config.setTestOnReturn(testOnReturn); config.setBlockWhenExhausted(true); pool = new JedisPool(config, "XXXXXXXX", 6379, 5000, "*******");    }//加锁方法//加锁之后返回锁的持有者(锁的value使用唯一时间戳标志每个客户端,保证只有锁的持有者才可以释放锁)public static String lock(Jedis jedis, String key,Long waitEnd,String requestId) {try {// 1秒内数次加锁如果失败,则不断请求重新获取锁,超过1秒还没能加锁,就加锁失败(为了每个线程拥有公平的机会获取锁)while (System.currentTimeMillis() < waitEnd) {// 1秒类不断尝试加锁(加锁之后返回锁的持有者) String result = jedis.set(key, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, EXPIRE_TIME);
if (LOCK_SUCCESS.equals(result)) {return requestId; } } } catch (Exception ex) { log.error("lock error", ex); }return null; }}

(3)、加锁方法代码解读

我们加锁关键的1行代码:jedis.set(String key, String value, String nxxx, String expx, int time),这个set()方法一共有五个形参:

  • 第一个为key,我们一般使用某1业务的唯一属性来当做作为key加锁,因为key是唯一的。

  • 第二个为value,我们传的是requestId,很多童鞋可能不明白,有key作为锁不就够了吗,为什么还要用到value?原因就是我们在上面讲分布式锁要满足第四个条件解铃还须系铃人,通过给value赋值为requestId,我们就知道这把锁是哪个请求加的了,在解锁的时候就可以有依据。requestId可以使用时间戳,UUID等等生成。

  • 第三个为nxxx,这个参数我们填的是NX,意思是SET IF NOT EXIST,即当key不存在时,我们进行set操作;若key已经存在,则不做任何操作;

  • 第四个为expx,这个参数我们传的是PX,意思是我们要给这个key加一个过期的设置,具体时间由第五个参数决定。

  • 第五个为time,与第四个参数相呼应,代表key的过期时间,单位是毫秒

      总的来说,执行上面的set()方法就只会导致两种结果:

     1. 当前没有锁(key不存在),那么就进行加锁操作,并对锁设置个有效期,返回锁的持有者客户端。

     2. 对于当前key已有锁存在,不做任何操作。

      心细的朋友就会发现,我们的加锁代码满足我们描述的三个条件。首先,set()加入了NX参数,可以保证如果已有key存在,则函数不会调用成功,也就是只有一个客户端能持有锁,满足互斥性。其次,由于我们对锁设置了过期时间,即使锁的持有者后续发生崩溃而没有解锁,锁也会因为到了过期时间而自动释放锁(即key被删除),不会发生死锁。最后,因为我们将value赋值为requestId,代表加锁的客户端请求标识(锁的持有者),那么在客户端在解锁的时候就可以进行校验是否是同一个客户端,同1个客户端只能释放自己的锁,而不能释放别的客户端的锁。由于我们只考虑Redis单机部署的场景,所以容错性我们暂不考虑。

(4)、错误的加锁代码解析

比较常见的错误示例就是使用jedis.setnx()jedis.expire()这2个操作组合实现加锁,代码如下:

public static void tryLock1(Jedis jedis, String lockKey, String requestId, int expireTime) { Long result = jedis.setnx(lockKey, requestId);if (result == 1) {// 若在这里程序突然崩溃,则无法设置过期时间,将发生死锁 jedis.expire(lockKey, expireTime); }}

setnx()方法作用就是SET IF NOT EXIST,expire()方法就是给锁加一个过期时间。乍一看好像和前面的set()方法结果一样,然而由于这是两条Redis命令,不具有原子性,如果程序在执行完setnx()之后突然崩溃,导致锁没有设置过期时间。那么将会发生死锁。网上之所以有人这样实现,是因为低版本的jedis并不支持多参数的set()方法。

(5)、正确的释放锁

private static final Long RELEASE_SUCCESS = 1L;
public static boolean unLock(Jedis jedis, String lockKey, String requestId) {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
if (RELEASE_SUCCESS.equals(result)) {return true; }return false; }

可以看到,我们解锁只需要两行代码就搞定了!第一行代码,我们写了一个简单的Lua脚本代码,第二行代码,我们将Lua代码传到jedis.eval()方法里,并使参数KEYS[1]赋值为lockKey,ARGV[1]赋值为requestId。eval()方法是将Lua代码交给Redis服务端执行。

那么这段Lua代码的功能是什么呢?其实很简单,首先获取锁对应的value值,检查是否与requestId相等,如果相等则删除锁(解锁)。那么为什么要使用Lua语言来实现呢?因为要确保上述多个操作是原子性的。简单来说,就是在eval命令执行Lua代码的时候,Lua代码将被当成一个整体命令去执行,并且直到eval命令执行完成,Redis才会执行其他命令。是不是瞬间感觉高大上。

(6)、错误的释放锁代码解析

最常见的解锁代码就是直接使用jedis.del()方法删除锁,这种不先判断锁的拥有者而直接解锁的方式,会导致不管客户端有没有加锁,任何客户端都可以随时进行解锁,即使这把锁不是它的,或者即使没有加锁。

public static void wrongReleaseLock(Jedis jedis, String lockKey) { jedis.del(lockKey);}

(7)、实战案例代码

public static void main(String[] args) {  //使用固定线程数为4 的线程池处理并发请求 ExecutorService pool1 = new ThreadPoolExecutor(4, 10, 1000, TimeUnit.MILLISECONDS, new PriorityBlockingQueue<Runnable>(),Executors.defaultThreadFactory()); //10个人抢购编码为101的共计5个商品 for(int i=0;i<50;i++) {//模拟100个人同时抢购5件商品 String userCode = 10*(i+1)+""; //System.out.println("用户"+(i+1)+"的编码:"+userCode); pool1.execute(new ThreadTask("101",userCode,1L)); }  }

这里模拟50个用户的秒杀请求,抢购5件商品,该商品的sku编码为sku-101,提前把这个数量放到redis里,基于这个sku的数量作为库存进行扣减。如下如所示:

/**  * @author yk * @date 2020年9月7日 上午10:28:46  * @version 1.0  * @since jdk1.8 * @description <> */ public class ThreadTask implements Runnable,Comparable<ThreadTask>{  private String userCode;  private Long num;  private String key;  private static int WAIT_TIME = 2 * 1000; private int priority;  public int getPriority() { return priority; }  public void setPriority(int priority) { this.priority = priority; }  public ThreadTask() {  }  public ThreadTask(int priority) { this.priority = priority; }  //当前对象和其他对象做比较,当前优先级大就返回-1,优先级小就返回1,值越小优先级越高 public int compareTo(ThreadTask o) { return this.priority>o.priority?-1:1; }  public ThreadTask(String key,String userCode,Long num){ this.userCode = userCode; this.num = num; this.key = key; }  private static String STOCK_KEY_PREFIX = "sku-";  @Override public void run() {  //让线程阻塞,使后续任务进入缓存队列 System.out.println("当前系统线程:"+Thread.currentThread().getName()); Jedis jedis = RedisPool.getJedis(); Long waitEnd = System.currentTimeMillis() + WAIT_TIME; String bb = jedis.get("sku-101"); String value = RedisPool.lock(jedis,key,waitEnd,userCode);//加锁成功,获取锁的持有者 String lockKey = null; if(!StringUtils.isBlank(value) && value.equals(userCode)){//加锁成功后查询库存 try { //synchronized (key){ lockKey = key; Long stock = Long.valueOf(jedis.get(STOCK_KEY_PREFIX+lockKey)); if( stock >= num){ String script = "return redis.call('incrby', KEYS[1], ARGV[1])"; Object eval = jedis.eval(script,Lists.newArrayList(STOCK_KEY_PREFIX+lockKey), Lists.newArrayList(String.valueOf(0-num))); if(null != eval && Integer.valueOf(String.valueOf(eval)) >= 0){ System.out.println("用户【"+userCode+"】秒杀商品【"+lockKey+"】成功,库存剩余:"+String.valueOf(eval)); }  }else{ System.out.println("用户【"+userCode+"】秒杀商品【"+lockKey+"】失败,库存剩余:"+stock.toString()); } Thread.sleep(100);//这里为了更真实的模拟多线程并发,这里线程获取到锁后,线程休眠100ms //} } catch (Exception e) { // TODO: handle exception } finally { System.out.println("用户【"+userCode+"】秒杀商品【"+lockKey+"】的线程释放锁"); RedisPool.unLock(lockKey,userCode);//处理完扣减库存业务释放锁,把抢购这件商品的机会留给其余用户 }  }else{ System.out.println("当前用户【"+userCode+"】抢购商品【"+key+"】的线程加锁失败,未抢购到,请再试"); } }}

执行上面main方法,无论执行几次,打印出如下日志,可见都不会出现超卖现象,并且加锁和加锁的都是同1个客户端,每个用户加锁并且只能释放自己的锁。

当前系统线程:pool-1-thread-1当前系统线程:pool-1-thread-3当前系统线程:pool-1-thread-4当前系统线程:pool-1-thread-2用户【20】秒杀商品【101】成功,库存剩余:4用户【20】秒杀商品【101】的线程释放锁当前系统线程:pool-1-thread-2用户【10】秒杀商品【101】成功,库存剩余:3用户【10】秒杀商品【101】的线程释放锁当前系统线程:pool-1-thread-1用户【50】秒杀商品【101】成功,库存剩余:2用户【50】秒杀商品【101】的线程释放锁当前系统线程:pool-1-thread-2用户【40】秒杀商品【101】成功,库存剩余:1用户【40】秒杀商品【101】的线程释放锁当前系统线程:pool-1-thread-4当前用户【30】抢购商品【101】的线程加锁失败,未抢购到,请再试当前系统线程:pool-1-thread-3用户【70】秒杀商品【101】成功,库存剩余:0用户【70】秒杀商品【101】的线程释放锁当前系统线程:pool-1-thread-1用户【110】秒杀商品【101】失败,库存剩余:0用户【110】秒杀商品【101】的线程释放锁当前系统线程:pool-1-thread-2用户【190】秒杀商品【101】失败,库存剩余:0用户【190】秒杀商品【101】的线程释放锁当前系统线程:pool-1-thread-4用户【490】秒杀商品【101】失败,库存剩余:0用户【490】秒杀商品【101】的线程释放锁当前系统线程:pool-1-thread-2当前用户【350】抢购商品【101】的线程加锁失败,未抢购到,请再试当前系统线程:pool-1-thread-3用户【480】秒杀商品【101】失败,库存剩余:0用户【480】秒杀商品【101】的线程释放锁当前用户【500】抢购商品【101】的线程加锁失败,未抢购到,请再试当前系统线程:pool-1-thread-4当前系统线程:pool-1-thread-1用户【460】秒杀商品【101】失败,库存剩余:0用户【460】秒杀商品【101】的线程释放锁当前系统线程:pool-1-thread-3用户【450】秒杀商品【101】失败,库存剩余:0用户【450】秒杀商品【101】的线程释放锁当前系统线程:pool-1-thread-4用户【420】秒杀商品【101】失败,库存剩余:0用户【420】秒杀商品【101】的线程释放锁当前用户【470】抢购商品【101】的线程加锁失败,未抢购到,请再试当前系统线程:pool-1-thread-2当前系统线程:pool-1-thread-4用户【410】秒杀商品【101】失败,库存剩余:0用户【410】秒杀商品【101】的线程释放锁当前用户【440】抢购商品【101】的线程加锁失败,未抢购到,请再试当前系统线程:pool-1-thread-1用户【430】秒杀商品【101】失败,库存剩余:0用户【430】秒杀商品【101】的线程释放锁用户【400】秒杀商品【101】失败,库存剩余:0用户【400】秒杀商品【101】的线程释放锁

总结

本文主要介绍了使用Java代码正确实现Redis分布式锁,对于加锁和解锁也分别给出了两个比较经典的错误示例。其实想要通过Redis实现分布式锁并不难,只要保证能满足上面可靠性里的四个必要条件。本案例讲解到这来,欢迎大家交流。