为什么Zookeeper天生就是一副分布式锁的胚子?
什么是分布式锁?分布式锁是控制分布式系统之间同步访问共享资源的一种方式。在分布式系统中,常常需要协调他们的动作。
图片来自 Pexels
如果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源,那么访问这些资源的时候,往往需要互斥来防止彼此干扰来保证一致性,在这种情况下,便需要使用到分布式锁。
为什么要使用分布式锁
为了保证一个方法或属性在高并发情况下的同一时间只能被同一个线程执行。
在传统单体应用单机部署的情况下,可以使用 Java 并发处理相关的 API(如 ReentrantLock 或 Synchronized)进行互斥控制;在单机环境中,Java 中提供了很多并发处理相关的 API。
但是,随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的 Java API 并不能提供分布式锁的能力。
为了解决这个问题就需要一种跨 JVM 的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题!
举个例子:机器 A,机器 B 是一个集群。A,B 两台机器上的程序都是一样的,具备高可用性能。
A,B 机器都有一个定时任务,每天晚上凌晨 2 点需要执行一个定时任务,但是这个定时任务只能执行一遍,否则的话就会报错。
那 A,B 两台机器在执行的时候,就需要抢锁,谁抢到锁,谁执行,谁抢不到,就不用执行了!
锁的处理
单个应用中使用锁:(单进程多线程)Synchronize。
分布式锁控制分布式系统之间同步访问资源的一种方式。
分布式锁是控制分布式系统之间同步访问共享资源的一种方式。
分布式锁的实现
分布式锁的实现方式如下:
基于数据的乐观锁实现分布式锁
基于 Zookeeper 临时节点的分布式锁
基于 Redis 的分布式锁
Redis 的分布式锁
获取锁
redis 127.0.0.1:6379>SET KEY VALUE [EX seconds] [PX milliseconds] [NX|XX]
- EX seconds 设置指定的到期时间(单位为秒)
- PX milliseconds 设置指定的到期时间(单位毫秒)
- NX: 仅在键不存在时设置键
- XX: 只有在键已存在时设置
方式 1:推介
private static final String LOCK_SUCCESS = "OK";
private static final String SET_IF_NOT_EXIST = "NX";
private static final String SET_WITH_EXPIRE_TIME = "PX";
public static boolean getLock(JedisCluster jedisCluster, String lockKey, String requestId, int expireTime) {
// NX: 保证互斥性
String result = jedisCluster.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
if (LOCK_SUCCESS.equals(result)) {
return true;
}
return false;
}
public static boolean getLock(String lockKey,String requestId,int expireTime) {
Long result = jedis.setnx(lockKey, requestId);
if(result == 1) {
jedis.expire(lockKey, expireTime);
return true;
}
return false;
}
释放锁
public static void releaseLock(String lockKey,String requestId) {
if (requestId.equals(jedis.get(lockKey))) {
jedis.del(lockKey);
}
}
方式 2:Redis+Lua 脚本实现(推荐)
public static boolean releaseLock(String lockKey, String requestId) {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return
redis.call('del', KEYS[1]) else return 0 end";
Object result = jedis.eval(script, Collections.singletonList(lockKey),
Collections.singletonList(requestId));
if (result.equals(1L)) {
return true;
}
return false;
}
Zookeeper 的分布式锁
Zookeeper 分布式锁实现原理
Zookeeper 分布式锁实现示例
Zookeeper 是通过临时节点来实现分布式锁:
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.Before;
import org.junit.Test;
/**
* @ClassName ZookeeperLock
* @Description TODO
* @Author lingxiangxiang
* @Date 2:57 PM
* @Version 1.0
**/
public class ZookeeperLock {
// 定义共享资源
private static int NUMBER = 10;
private static void printNumber() {
// 业务逻辑: 秒杀
System.out.println("*********业务方法开始************\n");
System.out.println("当前的值: " + NUMBER);
NUMBER--;
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("*********业务方法结束************\n");
}
// 这里使用@Test会报错
public static void main(String[] args) {
// 定义重试的侧策略 1000 等待的时间(毫秒) 10 重试的次数
RetryPolicy policy = new ExponentialBackoffRetry(1000, 10);
// 定义zookeeper的客户端
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("10.231.128.95:2181,10.231.128.96:2181,10.231.128.97:2181")
.retryPolicy(policy)
.build();
// 启动客户端
client.start();
// 在zookeeper中定义一把锁
final InterProcessMutex lock = new InterProcessMutex(client, "/mylock");
//启动是个线程
for (int i = 0; i <10; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
// 请求得到的锁
lock.acquire();
printNumber();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 释放锁
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}).start();
}
}
}
基于数据的分布式锁
基于表记录
乐观锁
悲观锁
基于表记录
为了更好的演示,我们先创建一张数据库表,参考如下:
CREATE TABLE `database_lock` (
`id` BIGINT NOT NULL AUTO_INCREMENT,
`resource` int NOT NULL COMMENT '锁定的资源',
`description` varchar(1024) NOT NULL DEFAULT "" COMMENT '描述',
PRIMARY KEY (`id`),
UNIQUE KEY `uiq_idx_resource` (`resource`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='数据库分布式锁表';
①获得锁
我们可以插入一条数据:
INSERT INTO database_lock(resource, description) VALUES (1, 'lock');
②删除锁
INSERT INTO database_lock(resource, description) VALUES (1, 'lock');
乐观锁
数据库乐观锁也能保证线程安全,通常代码层面我们都会这样做:
select goods_num from goods where goods_name = "小本子";
update goods set goods_num = goods_num -1 where goods_name = "小本子";
首先定义一个 version 字段用来当作一个版本号,每次的操作就会变成这样:
select goods_num,version from goods where goods_name = "小本子";
update goods set goods_num = goods_num -1,version =查询的version值自增 where goods_name ="小本子" and version=查询出来的version;
悲观锁
mysql> SET AUTOCOMMIT = 0;
Query OK, 0 rows affected (0.00 sec)
那么它的具体步骤如下:
STEP1 - 获取锁:SELECT * FROM database_lock WHERE id = 1 FOR UPDATE;。
STEP2 - 执行业务逻辑。
STEP3 - 释放锁:COMMIT。
编辑:陶家龙
出处:https://blog.51cto.com/lingjing/2474793
精彩文章推荐: