vlambda博客
学习文章列表

基于redission的分布式锁

在分布式环境中,很多场景,如:秒杀、ID生成... 都需要分布式锁。分布式锁的实现,可以基于redis的setnx,zk的临时节点。。。今天我们介绍一种redis官方推荐的方法——redission。

1、pom.xml


   
     
     
   
  1. <dependency>

  2. <groupId>org.redisson</groupId>

  3. <artifactId>redisson</artifactId>

  4. <version>3.2.3</version>

  5. </dependency>

2、redissionUtils


   
     
     
   
  1. public class RedissionUtils {

  2. private static Logger logger = LoggerFactory.getLogger(RedissionUtils.class);


  3. private static RedissionUtils redisUtils;


  4. private RedissionUtils() {

  5. }


  6. /**

  7. * 提供单例模式

  8. *

  9. * @return

  10. */

  11. public static RedissionUtils getInstance() {

  12. if (redisUtils == null)

  13. synchronized (RedisUtils.class) {

  14. if (redisUtils == null)

  15. redisUtils = new RedissionUtils();

  16. }

  17. return redisUtils;

  18. }


  19. /**

  20. * 使用config创建Redisson Redisson是用于连接Redis Server的基础类

  21. *

  22. * @param config

  23. * @return

  24. */

  25. public RedissonClient getRedisson(Config config) {

  26. RedissonClient redisson = Redisson.create(config);

  27. logger.info("成功连接Redis Server");

  28. return redisson;

  29. }


  30. /**

  31. *

  32. * @param ip

  33. * @param port

  34. * @return

  35. */

  36. public RedissonClient getRedisson(String ip, String port) {

  37. Config config = new Config();

  38. config.useSingleServer().setAddress(ip + ":" + port);

  39. RedissonClient redisson = Redisson.create(config);

  40. logger.info("成功连接Redis Server" + "\t" + "连接" + ip + ":" + port + "服务器");

  41. return redisson;

  42. }


  43. /**

  44. * 关闭Redisson客户端连接

  45. *

  46. * @param redisson

  47. */

  48. public void closeRedisson(RedissonClient redisson) {

  49. redisson.shutdown();

  50. logger.info("成功关闭Redis Client连接");

  51. }


  52. /**

  53. * 获取字符串对象

  54. *

  55. * @param redisson

  56. * @param objectName

  57. * @return

  58. */

  59. public <T> RBucket<T> getRBucket(RedissonClient redisson, String objectName) {

  60. RBucket<T> bucket = redisson.getBucket(objectName);

  61. return bucket;

  62. }


  63. /**

  64. * 获取Map对象

  65. *

  66. * @param redisson

  67. * @param objectName

  68. * @return

  69. */

  70. public <K, V> RMap<K, V> getRMap(RedissonClient redisson, String objectName) {

  71. RMap<K, V> map = redisson.getMap(objectName);

  72. return map;

  73. }


  74. /**

  75. * 获取有序集合

  76. *

  77. * @param redisson

  78. * @param objectName

  79. * @return

  80. */

  81. public <V> RSortedSet<V> getRSortedSet(RedissonClient redisson,

  82. String objectName) {

  83. RSortedSet<V> sortedSet = redisson.getSortedSet(objectName);

  84. return sortedSet;

  85. }


  86. /**

  87. * 获取集合

  88. *

  89. * @param redisson

  90. * @param objectName

  91. * @return

  92. */

  93. public <V> RSet<V> getRSet(RedissonClient redisson, String objectName) {

  94. RSet<V> rSet = redisson.getSet(objectName);

  95. return rSet;

  96. }


  97. /**

  98. * 获取列表

  99. *

  100. * @param redisson

  101. * @param objectName

  102. * @return

  103. */

  104. public <V> RList<V> getRList(RedissonClient redisson, String objectName) {

  105. RList<V> rList = redisson.getList(objectName);

  106. return rList;

  107. }


  108. /**

  109. * 获取队列

  110. *

  111. * @param redisson

  112. * @param objectName

  113. * @return

  114. */

  115. public <V> RQueue<V> getRQueue(RedissonClient redisson, String objectName) {

  116. RQueue<V> rQueue = redisson.getQueue(objectName);

  117. return rQueue;

  118. }


  119. /**

  120. * 获取双端队列

  121. *

  122. * @param redisson

  123. * @param objectName

  124. * @return

  125. */

  126. public <V> RDeque<V> getRDeque(RedissonClient redisson, String objectName) {

  127. RDeque<V> rDeque = redisson.getDeque(objectName);

  128. return rDeque;

  129. }


  130. /**

  131. * 此方法不可用在Redisson 1.2 中 在1.2.2版本中 可用

  132. *

  133. * @param redisson

  134. * @param objectName

  135. * @return

  136. */

  137. /**

  138. * public <V> RBlockingQueue<V> getRBlockingQueue(RedissonClient

  139. * redisson,String objectName){ RBlockingQueue

  140. * rb=redisson.getBlockingQueue(objectName); return rb; }

  141. */


  142. /**

  143. * 获取锁

  144. *

  145. * @param redisson

  146. * @param objectName

  147. * @return

  148. */

  149. public RLock getRLock(RedissonClient redisson, String objectName) {

  150. RLock rLock = redisson.getLock(objectName);

  151. return rLock;

  152. }


  153. /**

  154. * 获取原子数

  155. *

  156. * @param redisson

  157. * @param objectName

  158. * @return

  159. */

  160. public RAtomicLong getRAtomicLong(RedissonClient redisson, String objectName) {

  161. RAtomicLong rAtomicLong = redisson.getAtomicLong(objectName);

  162. return rAtomicLong;

  163. }


  164. /**

  165. * 获取记数锁

  166. *

  167. * @param redisson

  168. * @param objectName

  169. * @return

  170. */

  171. public RCountDownLatch getRCountDownLatch(RedissonClient redisson,

  172. String objectName) {

  173. RCountDownLatch rCountDownLatch = redisson

  174. .getCountDownLatch(objectName);

  175. return rCountDownLatch;

  176. }


  177. /**

  178. * 获取消息的Topic

  179. *

  180. * @param redisson

  181. * @param objectName

  182. * @return

  183. */

  184. public <M> RTopic<M> getRTopic(RedissonClient redisson, String objectName) {

  185. RTopic<M> rTopic = redisson.getTopic(objectName);

  186. return rTopic;

  187. }


  188. }

注:redission支持多种连接模式:


   
     
     
   
  1. //单机

  2. RedissonClient redisson = Redisson.create();

  3. Config config = new Config();

  4. config.useSingleServer().setAddress("myredisserver:6379");

  5. RedissonClient redisson = Redisson.create(config);



  6. //主从


  7. Config config = new Config();

  8. config.useMasterSlaveServers()

  9. .setMasterAddress("127.0.0.1:6379")

  10. .addSlaveAddress("127.0.0.1:6389", "127.0.0.1:6332", "127.0.0.1:6419")

  11. .addSlaveAddress("127.0.0.1:6399");

  12. RedissonClient redisson = Redisson.create(config);



  13. //哨兵

  14. Config config = new Config();

  15. config.useSentinelServers()

  16. .setMasterName("mymaster")

  17. .addSentinelAddress("127.0.0.1:26389", "127.0.0.1:26379")

  18. .addSentinelAddress("127.0.0.1:26319");

  19. RedissonClient redisson = Redisson.create(config);



  20. //集群

  21. Config config = new Config();

  22. config.useClusterServers()

  23. .setScanInterval(2000) // cluster state scan interval in milliseconds

  24. .addNodeAddress("127.0.0.1:7000", "127.0.0.1:7001")

  25. .addNodeAddress("127.0.0.1:7002");

  26. RedissonClient redisson = Redisson.create(config);

3、基于redission的各种分布式“锁”:

1)可重入锁:

Redisson的分布式可重入锁RLock,实现了java.util.concurrent.locks.Lock接口,以及支持自动过期解锁。同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。


   
     
     
   
  1. // 最常见的使用方法

  2. RLock lock = redisson.getLock("anyLock");

  3. lock.lock();

  4. //...

  5. lock.unlock();


  6. //另外Redisson还通过加锁的方法提供了leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了。


  7. // 加锁以后10秒钟自动解锁

  8. // 无需调用unlock方法手动解锁

  9. lock.lock(10, TimeUnit.SECONDS);


  10. // 尝试加锁,最多等待100秒,上锁以后10秒自动解锁

  11. boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);

  12. if (res) {

  13. try {

  14. ...

  15. } finally {

  16. lock.unlock();

  17. }

  18. }

大家都知道,如果负责储存这个分布式锁的Redisson节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是30秒钟,也可以通过修改Config.lockWatchdogTimeout来另行指定。

Redisson同时还为分布式锁提供了异步执行的相关方法:


   
     
     
   
  1. RLock lock = redisson.getLock("anyLock");

  2. lock.lockAsync();

  3. lock.lockAsync(10, TimeUnit.SECONDS);

  4. Future<Boolean> res = lock.tryLockAsync(100, 10, TimeUnit.SECONDS);

RLock对象完全符合Java的Lock规范。也就是说只有拥有锁的进程才能解锁,其他进程解锁则会抛出IllegalMonitorStateException错误。

2)公平锁(Fair Lock):

它保证了当多个Redisson客户端线程同时请求加锁时,优先分配给先发出请求的线程。所有请求线程会在一个队列中排队,当某个线程出现宕机时,Redisson会等待5秒后继续下一个线程,也就是说如果前面有5个线程都处于等待状态,那么后面的线程会等待至少25秒。使用方式同上,获取的时候使用如下方法:

RLock fairLock = redisson.getFairLock("anyLock");

3)联锁(MultiLock):

基于Redis的Redisson分布式联锁RedissonMultiLock对象可以将多个RLock对象关联为一个联锁,每个RLock对象实例可以来自于不同的Redisson实例。


   
     
     
   
  1. RLock lock1 = redissonInstance1.getLock("lock1");

  2. RLock lock2 = redissonInstance2.getLock("lock2");

  3. RLock lock3 = redissonInstance3.getLock("lock3");


  4. RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);

  5. // 同时加锁:lock1 lock2 lock3

  6. // 所有的锁都上锁成功才算成功。

  7. lock.lock();

  8. ...

  9. lock.unlock();


  10. //另外Redisson还通过加锁的方法提供了leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了。


  11. RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);

  12. // 给lock1,lock2,lock3加锁,如果没有手动解开的话,10秒钟后将会自动解开

  13. lock.lock(10, TimeUnit.SECONDS);


  14. // 为加锁等待100秒时间,并在加锁成功10秒钟后自动解开

  15. boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);

  16. ...

  17. lock.unlock();

4)红锁(RedLock):

基于Redis的Redisson红锁RedissonRedLock对象实现了Redlock介绍的加锁算法。该对象也可以用来将多个RLock对象关联为一个红锁,每个RLock对象实例可以来自于不同的Redisson实例。


   
     
     
   
  1. RLock lock1 = redissonInstance1.getLock("lock1");

  2. RLock lock2 = redissonInstance2.getLock("lock2");

  3. RLock lock3 = redissonInstance3.getLock("lock3");


  4. RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);

  5. // 同时加锁:lock1 lock2 lock3

  6. // 红锁在大部分节点上加锁成功就算成功。

  7. lock.lock();

  8. ...

  9. lock.unlock();


  10. //另外Redisson还通过加锁的方法提供了leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了。


  11. RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);

  12. // 给lock1,lock2,lock3加锁,如果没有手动解开的话,10秒钟后将会自动解开

  13. lock.lock(10, TimeUnit.SECONDS);


  14. // 为加锁等待100秒时间,并在加锁成功10秒钟后自动解开

  15. boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);

  16. ...

  17. lock.unlock();

5)读写锁(ReadWriteLock):

基于Redis的Redisson分布式可重入读写锁RReadWriteLock Java对象实现了java.util.concurrent.locks.ReadWriteLock接口。其中读锁和写锁都继承了RLock接口。分布式可重入读写锁允许同时有多个读锁和一个写锁处于加锁状态。


   
     
     
   
  1. RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock");

  2. // 最常见的使用方法

  3. rwlock.readLock().lock();

  4. // 或

  5. rwlock.writeLock().lock();


  6. //另外Redisson还通过加锁的方法提供了leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了。


  7. // 10秒钟以后自动解锁

  8. // 无需调用unlock方法手动解锁

  9. rwlock.readLock().lock(10, TimeUnit.SECONDS);

  10. // 或

  11. rwlock.writeLock().lock(10, TimeUnit.SECONDS);


  12. // 尝试加锁,最多等待100秒,上锁以后10秒自动解锁

  13. boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS);

  14. // 或

  15. boolean res = rwlock.writeLock().tryLock(100, 10, TimeUnit.SECONDS);

  16. ...

  17. lock.unlock();

6)信号量(Semaphore):

基于Redis的Redisson的分布式信号量(Semaphore)Java对象RSemaphore采用了与java.util.concurrent.Semaphore相似的接口和用法。同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。


   
     
     
   
  1. RSemaphore semaphore = redisson.getSemaphore("semaphore");

  2. semaphore.acquire();

  3. //

  4. semaphore.acquireAsync();

  5. semaphore.acquire(23);

  6. semaphore.tryAcquire();

  7. //

  8. semaphore.tryAcquireAsync();

  9. semaphore.tryAcquire(23, TimeUnit.SECONDS);

  10. //

  11. semaphore.tryAcquireAsync(23, TimeUnit.SECONDS);

  12. semaphore.release(10);

  13. semaphore.release();

  14. //

  15. semaphore.releaseAsync();

7)可过期性信号量(PermitExpirableSemaphore):

基于Redis的Redisson可过期性信号量(PermitExpirableSemaphore)是在RSemaphore对象的基础上,为每个信号增加了一个过期时间。每个信号可以通过独立的ID来辨识,释放时只能通过提交这个ID才能释放。它提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。


   
     
     
   
  1. RPermitExpirableSemaphore semaphore = redisson.getPermitExpirableSemaphore("mySemaphore");

  2. String permitId = semaphore.acquire();

  3. // 获取一个信号,有效期只有2秒钟。

  4. String permitId = semaphore.acquire(2, TimeUnit.SECONDS);

  5. // ...

  6. semaphore.release(permitId);

8)门闩:

基于Redisson的Redisson分布式闭锁(CountDownLatch)Java对象RCountDownLatch采用了与java.util.concurrent.CountDownLatch相似的接口和用法。


   
     
     
   
  1. RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");

  2. latch.trySetCount(1);

  3. latch.await();


  4. // 在其他线程或其他JVM里

  5. RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");

  6. latch.countDown();

9)分布式AtomicLong:


   
     
     
   
  1. RAtomicLong atomicLong = redisson.getAtomicLong("myAtomicLong");

  2. atomicLong.set(3);

  3. atomicLong.incrementAndGet();

  4. atomicLong.get();

10)分布式BitSet:


   
     
     
   

  1. RBitSet set = redisson.getBitSet("simpleBitset");

  2. set.set(0, true);

  3. set.set(1812, false);

  4. set.clear(0);

  5. set.addAsync("e");

  6. set.xor("anotherBitset");

11)分布式Object:


   
     
     
   
  1. RBucket<AnyObject> bucket = redisson.getBucket("anyObject");

  2. bucket.set(new AnyObject(1));

  3. AnyObject obj = bucket.get();

  4. bucket.trySet(new AnyObject(3));

  5. bucket.compareAndSet(new AnyObject(4), new AnyObject(5));

  6. bucket.getAndSet(new AnyObject(6));

12)分布式Set:


   
     
     
   
  1. RSet<SomeObject> set = redisson.getSet("anySet");

  2. set.add(new SomeObject());

  3. set.remove(new SomeObject());

13)分布式List:


   
     
     
   
  1. RList<SomeObject> list = redisson.getList("anyList");

  2. list.add(new SomeObject());

  3. list.get(0);

  4. list.remove(new SomeObject());

14)分布式Blocking Queue:


   
     
     
   
  1. RBlockingQueue<SomeObject> queue = redisson.getBlockingQueue("anyQueue");

  2. queue.offer(new SomeObject());

  3. SomeObject obj = queue.peek();

  4. SomeObject someObj = queue.poll();

  5. SomeObject ob = queue.poll(10, TimeUnit.MINUTES);

15)分布式Map:


   
     
     
   
  1. RMap<String, SomeObject> map = redisson.getMap("anyMap");

  2. SomeObject prevObject = map.put("123", new SomeObject());

  3. SomeObject currentObject = map.putIfAbsent("323", new SomeObject());

  4. SomeObject obj = map.remove("123");

  5. map.fastPut("321", new SomeObject());

  6. map.fastRemove("321");

  7. Future<SomeObject> putAsyncFuture = map.putAsync("321");

  8. Future<Void> fastPutAsyncFuture = map.fastPutAsync("321");

  9. map.fastPutAsync("321", new SomeObject());

  10. map.fastRemoveAsync("321");

除此之外,还支持Multimap。

16)Map eviction:

现在Redis没有过期清空Map中的某个entry的功能,只能是清空Map所有的entry。Redission提供了这种功能。


   
     
     
   
  1. RMapCache<String, SomeObject> map = redisson.getMapCache("anyMap");

  2. // ttl = 10 minutes,

  3. map.put("key1", new SomeObject(), 10, TimeUnit.MINUTES);

  4. // ttl = 10 minutes, maxIdleTime = 10 seconds

  5. map.put("key1", new SomeObject(), 10, TimeUnit.MINUTES, 10, TimeUnit.SECONDS);

  6. // ttl = 3 seconds

  7. map.putIfAbsent("key2", new SomeObject(), 3, TimeUnit.SECONDS);

  8. // ttl = 40 seconds, maxIdleTime = 10 seconds

  9. map.putIfAbsent("key2", new SomeObject(), 40, TimeUnit.SECONDS, 10, TimeUnit.SECONDS);