- 分布式锁
- 原理与使用
- Redisson搭建
- Redisson-lock锁
- Redisson-读写锁
- Redisson-闭锁
- Redisson-信号量
我们可以在redis命令行通过setnx命令来完成分布式锁,加上NX后只有当key不存在时才会set
127.0.0.1:6379> set a a OK 127.0.0.1:6379> set a b NX (nil)
将分布式锁的简单逻辑转换为java业务代码如下
public void testLock(){ // java代码中使用setIfAbsent()方法代理redis命令行中的setNX Boolean lock = redisTemplate.opsForValue.setIfAbsent("lock","111"); if(lock){ // 加锁成功 执行业务方法 function(); // 执行完成后,释放锁 redisTemplate.delete("lock"); } else { // 加锁失败,每隔一段时间后重试 Thread.sleep(500); // 使用自旋的方式 重试 testLock() } }
问题:没有删除锁逻辑
可能在执行业务方法时出现了异常,或者是执行过程中宕机了,没有删除锁,这就会造成死锁
解决方法是:设置锁的自动过期时间,如果没有删除就自动删除。
public void testLock(){ Boolean lock = redisTemplate.opsForValue.setIfAbsent("lock","111"); if(lock){ // 设置自动过期时间 30秒 redisTemplate.expire("lock", 30, TimeUnit.SECONDS) function("业务方法"); redisTemplate.delete("lock"); } else { Thread.sleep(500); testLock() } }
问题:现在往redis中存值和设置过期时间不是原子操作,有可能在if判断执行完后服务器断电或者宕机,那就还是会造成锁一直存在。
解决方法是:存值和设置过期时间变为原子操作,要么都成功,要么都不成功。
# redis命令行中 set命令的格式如下,其中可以加EX和PX都是设置过期时间,EX单位是秒 PX单位是毫秒 set key value [expiration EX seconds|PX milliseconds] [NX|XX] # 如下所示 127.0.0.1:6379> set a aa EX 30 NX
现在java代码中的写法如下,在setIfAbsent()方法中还传入过期时间与过期单位
public void testLock(){ // 设置自动过期时间 30秒 Boolean lock = redisTemplate.opsForValue.setIfAbsent("lock","111", 30, TimeUnit.SECONDS); if(lock){ function("业务方法"); redisTemplate.delete("lock"); } else { Thread.sleep(500); testLock() } }
问题:可能业务方法执行耗时较长,锁自己过期了,我们直接删除,可能把别人正在持有的锁删除了
解决方法是:value之前是随便写的一个值,现在不随便写了,先生成一个uuid作为锁的值存入redis,删除锁之前先查询判断redis中存储的uuid和自己生成的uuid相同后才去删除锁。
public void testLock(){ // 生成uuid String uuid = UUID.randomUUID().toString(); Boolean lock = redisTemplate.opsForValue.setIfAbsent("lock",uuid, 30, TimeUnit.SECONDS); if(lock){ function("业务方法"); // 先判断uuid是否相同 再删除 String uuidRedis = redisTemplate.opsForValue.get("lock"); if(uuid.equals(uuidRedis)){ redisTemplate.delete("lock"); } } else { Thread.sleep(500); testLock() } }
问题:可能先从redis中进行查询,得到uuid后,redis中的锁过期了,然后别人又加锁了,我这里进行if判断两个uuid相等,然后又把别人的锁删除了
解决方法:我们需要让获取数据+比较+删除 也是一个原子操作。 使用redis+Lua脚本来进行
lua脚本官方网站
public void testLock(){ // 生成uuid String uuid = UUID.randomUUID().toString(); Boolean lock = redisTemplate.opsForValue.setIfAbsent("lock",uuid, 30, TimeUnit.SECONDS); if(lock){ function("业务方法"); // lua脚本 解锁,下面script是从官网中复制的 // 可以使用DefaultRedisScript来执行这个lua脚本,泛型是返回值,形参是脚本和脚本的返回值 // execute() 方法的第二个参数是一个集合类型的key 对应下面脚本中的KEYS[1] // execute() 方法的第三个参数就是我们的uuid,对应下面脚本中的ARGV[1] String script = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end"; redisTemplate.execute(new DefaultRedisScript(script, Integer.class), Array.asList("lock"), uuid); } else { Thread.sleep(500); testLock() } }
所以使用redis来实现分布式锁时,需要注意的就是:
- 加锁时,存入的值不要随便写
- 过期时间与加锁需要保证原子操作,set NX EX
- 删除锁时使用lua脚本来进行删除
难点是锁的自动续期,业务方法还没有执行完,锁就过期了,最简单的方法就是把锁的过期时间设置很长,现在的java代码如下
public void testLock(){ // 加锁 String uuid = UUID.randomUUID().toString(); Boolean lock = redisTemplate.opsForValue.setIfAbsent("lock",uuid, 300, TimeUnit.SECONDS); if(lock){ try{ function("业务方法"); } finally { // 解锁 String script = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end"; redisTemplate.execute(new DefaultRedisScriptRedisson搭建(script, Long.class), Array.asList("lock"), uuid); } } else { // 获取分布式锁失败 重试 自旋 Thread.sleep(500); testLock() } }
在日常的开发中,并不太推荐使用上面我们自己实现加锁与解锁的逻辑,我们可以使用redisson来实现。
官网
我们可以通过点击这里查看文档
首先导入redisson的依赖
org.redisson redisson 3.17.5
或者是导入redisson的场景启动器
org.redisson redisson-spring-boot-starter
第二步,创建一个redisson的配置类,往容器中添加一个RedissonClient对象
具体参考官网配置方式
单节点模式:
package com.hs.springcloud.config; import org.redisson.Redisson; import org.redisson.api.RedissonClient; import org.redisson.config.Config; import org.springframework.context.annotation.Configuration; import java.io.IOException; @Configuration public class RedissonConfig { // 我们可以通过destroyMethod来指定服务停止后就调用shutdown()方法 @Bean(destroyMethod = "shutdown") public RedissonClient redissonClient() throws IOException{ // 默认连接地址 127.0.0.1:6379 // RedissonClient redisson = Redisson.create(); Config config = new Config(); config.useSingleServer() //可以用 "rediss://" 来启用SSL安全连接 .setAddress("redis://82.156.9.191:6379") .setPassword("deimkf"); RedissonClient redisson = Redisson.create(config); return redisson; } }
集群模式
Config config = new Config(); config.useClusterServers() .setScanInterval(2000) // 集群状态扫描间隔时间,单位是毫秒 //可以用"rediss://" 来启用SSL连接 .addNodeAddress("redis://127.0.0.1:7000", "redis://127.0.0.1:7001") .addNodeAddress("redis://127.0.0.1:7002"); RedissonClient redisson = Redisson.create(config);
还有其他哨兵模式和主从模式请参考上面提供的官方文档网站进行查看
Redisson-lock锁官方文档
redisson的锁,他们有一个好处就是都实现了JUC中的锁。就比较符合我们的实现习惯。
@RestController public class RedissonController { @Autowired RedissonClient redissonClient; @RequestMapping("anyLock") public String anyLock(){ // 获取锁,多个系统获取锁的名字一样,那就是获取的同一把锁 RLock lock = redissonClient.getLock("any-lock"); // 加锁 lock.lock(); try{ // 执行业务方法 System.out.println("执行业务方法"); Thread.sleep(3000); }catch (Exception e){ }finally { // 释放锁 lock.unlock(); } return "ok" + Thread.currentThread().getId(); } }
加锁的几种方式:
-
lock.lock();是阻塞的方法,如果没有加锁成功是会一直阻塞。
锁默认的过期时间是30秒,还有看门狗 每隔默认时间/3也就是10秒 进行锁的自动续期,又重新设置过期时间为30秒。通过定时任务来实现
如果服务器宕机了,看门狗的进程也会停止掉,也就不会进行锁的续期,这个锁也会因为过期时间而过期。
-
lock.lock(20, TimeUnit.SECONDS); 手动设置锁的过期时间,这种方式到期就直接删除锁,不会有看门狗进行锁的续期。
这种方式过期时间尽量设置长一点,需要比业务方法执行的时间长。
推荐使用这种方式进行加锁
官方文档
- 如果已经加了写锁,那么其他进程就不能在加读锁或写锁
- 如果已经加了读锁,那么其他进程就只能再加读锁
@RequestMapping("/write") public String writeValue(){ // 获取读写锁,必须保证多个线程获取的读写锁是同一个 RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("rw-lock"); // 获取写锁 RLock lock = readWriteLock.writeLock(); // 加写锁 lock.lock(); try { System.out.println("执行业务方法,进行写入操作" + Thread.currentThread().getId()); Thread.sleep(7000); } catch (Exception e) { e.printStackTrace(); } finally { System.out.println("释放写锁" + Thread.currentThread().getId()); lock.unlock(); } return "ok"; } @RequestMapping("/read") public String readValue(){ // 获取读锁 RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("rw-lock"); // 获取读锁 RLock lock = readWriteLock.readLock(); lock.lock(); try{ System.out.println("加了读锁,进行读取操作" + Thread.currentThread().getId()); }finally { System.out.println("释放读锁" + Thread.currentThread().getId()); lock.unlock(); } return "ok"; }Redisson-闭锁
官方文档
闭锁 CountDownLatch
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch"); latch.trySetCount(1); latch.await(); // 在其他线程或其他JVM里 RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch"); latch.countDown();
案例:
@RequestMapping("/closeDoor") public String closeDoor() throws InterruptedException { // 当5个班级都放学后就关闭校门 RCountDownLatch countDownLatch = redissonClient.getCountDownLatch("closeDoor-lock"); countDownLatch.trySetCount(5); // 阻塞方法,只有当数量变为了0就放行 countDownLatch.await(); return "放学了,关闭校门"; } @RequestMapping("/goHome") public String goHome() throws InterruptedException { RCountDownLatch countDownLatch = redissonClient.getCountDownLatch("closeDoor-lock"); // 班级离校 数量-1 countDownLatch.countDown(); return "放学了,一个班级离开了学习"; }Redisson-信号量
官方文档
案例:模拟停车场只有5个停车位,最多只能同时停5量车
我们需要实现往redis中存一个key为park-lock,value为5的键值对。
@RequestMapping("/park") public String park() throws InterruptedException { RSemaphore semaphore = redissonClient.getSemaphore("park-lock"); // 获取一个信号量 semaphore.acquire(); return "进行停车,占用了一个停车位"; } @RequestMapping("/leave") public String leave() throws InterruptedException { RSemaphore semaphore = redissonClient.getSemaphore("park-lock"); // 释放一个信号量 semaphore.release(); return "离开了,释放了一个停车位"; }
基于Redis的Redisson的分布式信号量(Semaphore)Java对象RSemaphore采用了与java.util.concurrent.Semaphore相似的接口和用法。同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。
RSemaphore semaphore = redisson.getSemaphore("semaphore"); semaphore.acquire(); //或 semaphore.acquireAsync(); semaphore.acquire(23); semaphore.tryAcquire(); //或 semaphore.tryAcquireAsync(); semaphore.tryAcquire(23, TimeUnit.SECONDS); //或 semaphore.tryAcquireAsync(23, TimeUnit.SECONDS); semaphore.release(10); semaphore.release(); //或 semaphore.releaseAsync();
基于Redis的Redisson可过期性信号量(PermitExpirableSemaphore)是在RSemaphore对象的基础上,为每个信号增加了一个过期时间。
RPermitExpirableSemaphore semaphore = redisson.getPermitExpirableSemaphore("mySemaphore"); String permitId = semaphore.acquire(); // 获取一个信号,有效期只有2秒钟。 String permitId = semaphore.acquire(2, TimeUnit.SECONDS); // ... semaphore.release(permitId);