reids-分布式锁

1 分布式锁

在之前分析过,集群模式下的一人一单存在问题,其根源就在锁监视器上。因此需要一个分布式锁,满足分布式系统集群模式下多进程可见并且互斥的锁。

核心思想:让大家都使用同一把锁,只要大家使用的是同一把锁,那么我们就能锁住线程,不让线程进行,让程序串行执行。

在这里插入图片描述

1.1 分布式要求

  • 可见性:多个线程都能看到相同的结果,注意:这个地方说的可见性并不是并发编程中指的内存可见性,只是说多个进程之间都能感知到变化的意思

  • 互斥:互斥是分布式锁的最基本的条件,使得程序串行执行

  • 高可用:程序不易崩溃,时时刻刻都保证较高的可用性

  • 高性能:由于加锁本身就让性能降低,所有对于分布式锁本身需要他就较高的加锁性能和释放锁性能

  • 安全性:安全也是程序中必不可少的一环

1.2 常见的分布式锁

在这里插入图片描述

1.3 redis实现分布式锁

  1. 获取锁:要求确保当前只能有一个线程获取到锁。(set key value ex ttl nx )利用set nx的特性保证只能有一个获取线程
  2. 释放锁:要求可以手动释放,也要有超时释放做兜底。(del key)

1.4 加锁

增加一个接口:ILock.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package com.hmdp.lock;


public interface ILock {
/**
* @param
* @return boolean true获取锁成功,false获取锁失败
* @description //获取尝试锁
* @param: expireTime 锁持有的超时时间,过期后自动释放
**/
boolean tryLock(long expireTime);

/**
* @param
* @return void
* @description //释放锁
**/
void unLock();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class SimpleRedisLock implements ILock {
/**
* 锁前缀
*/
private static final String KEY_PREFIX = "lock:";
/**
* 定义锁的名称
*/
private String lockName;

StringRedisTemplate stringRedisTemplate;

public SimpleRedisLock(String lockName, StringRedisTemplate stringRedisTemplate) {
this.lockName = lockName;
this.stringRedisTemplate = stringRedisTemplate;
}

@Override
public boolean tryLock(long expireTime) {
// 获取当前线程的标识
long threadId = Thread.currentThread().getId();
// set key "1" ex expireTime nx
Boolean flag = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + lockName, threadId + "", expireTime, TimeUnit.SECONDS);
return BooleanUtil.isTrue(flag);
}

@Override
public void unLock() {
stringRedisTemplate.delete(KEY_PREFIX + lockName);
}
}

修改VoucherOrderServiceImpl.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

@Autowired
private ISeckillVoucherService iSeckillVoucherService;

@Autowired
private RedisWorker redisWorker;

@Resource
StringRedisTemplate stringRedisTemplate;

/**
* @param
* @return com.hmdp.dto.Result
* @description //秒杀优惠券
* @param: voucherId
* @date 2023/2/15 22:09
* @author wty
**/
@Override
public Result seckillVoucher(Long voucherId) {
// 1.查询秒杀优惠券信息
// select * from tb_seckill_voucher where voucher_id = ?
SeckillVoucher seckillVoucher = iSeckillVoucherService.getById(voucherId);

// 2.判断秒杀是否开始
LocalDateTime beginTime = seckillVoucher.getBeginTime();
LocalDateTime endTime = seckillVoucher.getEndTime();
LocalDateTime now = LocalDateTime.now();

if (now.isBefore(beginTime)) {
// 当前时间早于秒杀开始时间,说明秒杀没有开始
return Result.fail("秒杀尚未开始,请耐心等待!秒杀开始时间:" + beginTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
}
// 3.判断秒杀是否已经结束
if (now.isAfter(endTime)) {
// 当前时间晚于秒杀结束时间,说明秒杀结束了
return Result.fail("秒杀已经结束,感谢支持!");
}


// 4.判断库存是否充足
Integer stock = seckillVoucher.getStock();
if (stock <= 0) {
return Result.fail("商品已经售罄!");
}

// 实现一人一单,获取user对象锁
Long userID = UserHolder.getUser().getId();
/*// 使用JDK提供的锁监视器synchronized来实现
synchronized (userID.toString().intern()) {
// 调用本类方法的时候,Spring事务是失效的,解决方案二:调用AopContext API
Object o = AopContext.currentProxy();
IVoucherOrderService proxy = (IVoucherOrderService) o;
return proxy.createVoucherOrder(voucherId);
}*/
// TODO:修改的地方 尝试自定义锁监视器
SimpleRedisLock simpleRedisLock = new SimpleRedisLock("order:" + userID.toString().intern(), stringRedisTemplate);
boolean flag = simpleRedisLock.tryLock(RedisConstants.LOCK_VOUVHER_ORDER_TTL);

if (!flag) {
// 获取锁失败,就直接返回错误信息即可
return Result.fail("[秒杀优惠券]不允许重复下单!本秒杀业务一切解释器归ty公司所有");
}

Result result;
try {
Object o = AopContext.currentProxy();
IVoucherOrderService proxy = (IVoucherOrderService) o;
return proxy.createVoucherOrder(voucherId);
} finally {
simpleRedisLock.unLock();
}
}

/**
* @param
* @return com.hmdp.dto.Result
* @description //根据优惠券id和用户id查询订单 减少库存生成订单
* @param: voucherId
* @date 2023/2/15 22:12
* @author wty
**/
@Override
@Transactional
public Result createVoucherOrder(Long voucherId) {
Long userID = UserHolder.getUser().getId();

// 5.实现1人1单加入逻辑:根据优惠券id和用户id查询订单
// 5.1查询订单,并不用查询出具体的值,而是查询出数量即可
Integer count = query().eq("user_id", userID).eq("voucher_id", voucherId).count();
// 5.2判断订单是否存在
if (count > 0) {
// 5.2.1 存在就返回异常结果
return Result.fail("秒杀优惠券每人限购1张,感谢配合,本优惠券最终解释权归ty公司所有!");
}

// 5.2.2 不存在再减少库存

// 6.1扣减库存(会出现超卖问题)
// update tb_seckill_voucher set stock = stock -1 where voucher_id = ?
/*boolean result = iSeckillVoucherService.update()
.setSql("stock = stock -1")
.eq("voucher_id", voucherId).update();*/

// 6.2扣减库存(针对超卖问题用乐观锁CAS解决)
// update tb_seckill_voucher set stock = stock -1 where voucher_id = ? and stock = ?
/*boolean result = iSeckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId)
.eq("stock", stock)
.update();*/

// 6.3扣减库存(针对使用乐观锁CAS,没卖完解决)
// update tb_seckill_voucher set stock = stock -1 where voucher_id = ? and stock > 0
boolean result = iSeckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId)
.gt("stock", 0)
.update();

if (!result) {
return Result.fail("商品已经售罄!");
}

// 7.创建订单
/**
* 获取订单id
*/
long orderId = redisWorker.nextID("order");

VoucherOrder voucherOrder = new VoucherOrder();
voucherOrder.setId(orderId);
voucherOrder.setVoucherId(voucherId);
voucherOrder.setUserId(userID);

// 将订单信息保存到数据库
// insert into tb_voucher_order values ()
save(voucherOrder);

//8.返回订单id
return Result.ok(orderId);
}
}

1.6 锁误删问题

1.6.1 介绍

持有锁的线程在锁的内部出现了阻塞,导致他的锁自动释放。

这时其他线程,线程2来尝试获得锁,就拿到了这把锁,然后线程2在持有锁执行过程中,线程1反应过来,继续执行,而线程1执行过程中,走到了删除锁逻辑,此时就会把本应该属于线程2的锁进行删除。

img

1.6.2 解决方案

在获取锁之前,增加一个标识,每次删除锁的时候,判断当前的锁是否属于自己,属于自己就删除,不属于自己就不删除。

在这里插入图片描述

步骤:

  1. 在获取锁时存入线程标示(可以用UUID表示)
  2. 在释放锁时先获取锁中的线程标示,判断是否与当前线程标示一致

在这里插入图片描述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public class SimpleRedisLock implements ILock {
/**
* 锁前缀
*/
private static final String KEY_PREFIX = "lock:";
/**
* 线程ID前缀
* true是可以把UUID中的横线去掉
*/
private static final String THREAD_ID_PREFIX = UUID.randomUUID().toString(true) + "-";
/**
* 定义锁的名称
*/
private String lockName;

StringRedisTemplate stringRedisTemplate;

public SimpleRedisLock(String lockName, StringRedisTemplate stringRedisTemplate) {
this.lockName = lockName;
this.stringRedisTemplate = stringRedisTemplate;
}

/**
* @param
* @return boolean
* @description // 获取锁
* @param: expireTime
* @date 2023/2/16 12:24
* @author wty
**/
@Override
public boolean tryLock(long expireTime) {
// 获取当前线程的标识
//long threadId = Thread.currentThread().getId();
String threadId = THREAD_ID_PREFIX + Thread.currentThread().getId();
// set key "1" ex expireTime nx
Boolean flag = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + lockName, threadId + "", expireTime, TimeUnit.SECONDS);
return BooleanUtil.isTrue(flag);
}

/**
* @param
* @return void
* @description //释放锁
* @date 2023/2/16 12:24
* @author wty
**/
@Override
public void unLock() {
// 获取当前线程的ID
String threadId = THREAD_ID_PREFIX + Thread.currentThread().getId();
String threadId_Redis = stringRedisTemplate.opsForValue().get(KEY_PREFIX + lockName);
// 只有redis中的线程和当前的线程是同一个才允许释放锁
if (String.valueOf(threadId).equals(threadId_Redis)) {
stringRedisTemplate.delete(KEY_PREFIX + lockName);
}
}
}

1.7 分布式锁原子性操作问题

1.7.1 介绍

在上述部分绝大部分情况都能满足业务生产使用,但是在最后判断和删除的时候还是有可能出现阻塞,例如:判断完属于自己的锁,正准备删除的时候,此时应用出现full gc,这个时候就出现了阻塞,同理,其他线程此时由于线程1的阻塞导致的释放锁而拿到了锁,线程1等阻塞完成之后还是会删除锁

在这里插入图片描述

1.7.2 解决方案

Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。Lua是一种编程语言,它的基本语法大家可以参考网站:菜鸟教程

1.7.2.1 更改释放锁逻辑

原逻辑:

  1. 获取锁中的线程标示
  2. 判断是否与指定的标示(当前线程标示)一致
  3. 如果一致则释放锁(删除)
  4. 如果不一致则什么都不做

使用Lua脚本更改

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
-- 锁的key
local key = KEYS[1]

-- 当前线程的标识
local threadID = ARGV[1]

-- 获取锁中的线程标识
local threadID_Redis = redis.call('get',KEYS[1])

-- 比较线程的标识与锁中的标识是否一致
if(ARGV[1] == threadID_Redis) then
-- 一致就释放锁
redis.call('del',KEYS[1])
end
return 0

java代码调用Lua脚本,更改分布式锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class SimpleRedisLock implements ILock {
/**
* 锁前缀
*/
private static final String KEY_PREFIX = "lock:";
/**
* 线程ID前缀
* true是可以把UUID中的横线去掉
*/
private static final String THREAD_ID_PREFIX = UUID.randomUUID().toString(true) + "-";
/**
* 定义锁的名称
*/
private String lockName;

StringRedisTemplate stringRedisTemplate;

private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;

// 在类第一次启动的时候就进行加载
static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}


public SimpleRedisLock(String lockName, StringRedisTemplate stringRedisTemplate) {
this.lockName = lockName;
this.stringRedisTemplate = stringRedisTemplate;
}

/**
* @param
* @return void
* @description //释放锁(基于Lua脚本)
* @date 2023/2/16 15:37
* @author wty
**/
@Override
public void unLock() {
// 调用Lua脚本
stringRedisTemplate.execute(
UNLOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX + lockName),
THREAD_ID_PREFIX + Thread.currentThread().getId()
);

}
}

1.8 Redisssion分布式锁

使用setnx实现分布式锁存在以下问题:

  1. 重入问题:重入问题是指 获得锁的线程可以再次进入到相同的锁的代码块中,可重入锁的意义在于防止死锁,比如HashTable这样的代码中,他的方法都是使用synchronized修饰的,假如他在一个方法内,调用另一个方法,那么此时如果是不可重入的,不就死锁了吗?所以可重入锁他的主要意义是防止死锁,我们的synchronized和Lock锁都是可重入的。
  2. 不可重试:是指目前的分布式只能尝试一次,我们认为合理的情况是:当线程在获得锁失败后,他应该能再次尝试获得锁。
  3. 超时释放:我们在加锁时增加了过期时间,这样的我们可以防止死锁,但是如果卡顿的时间超长,虽然我们采用了lua表达式防止删锁的时候,误删别人的锁,但是毕竟没有锁住,有安全隐患。
  4. 主从一致性: 如果Redis提供了主从集群,当我们向集群写数据时,主机需要异步的将数据同步给从机,而万一在同步过去之前,主机宕机了,就会出现死锁问题。

1.8.1 关于Redission

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。

官网:Redisson官网

1.8.2 引入pom依赖

1
2
3
4
5
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>

1.8.3 增加配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Configuration
public class RedissonConfig {

@Bean
public RedissonClient redissonClient(){
// 配置类
Config config = new Config();
// 添加redis地址,这里添加了单点的地址(虚拟机地址),也可以使用config.useClusterServers()添加集群地址
config.useSingleServer().setAddress("redis://192.168.150.101:6379")
.setPassword("123321");
// 创建RedissonClient对象
return Redisson.create(config);
}
}

1.8.4 使用Redission解决一人一单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

@Autowired
private ISeckillVoucherService iSeckillVoucherService;

@Autowired
private RedisWorker redisWorker;

@Resource
StringRedisTemplate stringRedisTemplate;

@Resource
private RedissonClient redissonClient;

@Override
public Result seckillVoucher(Long voucherId) {
// 1.查询秒杀优惠券信息
// select * from tb_seckill_voucher where voucher_id = ?
SeckillVoucher seckillVoucher = iSeckillVoucherService.getById(voucherId);

// 2.判断秒杀是否开始
LocalDateTime beginTime = seckillVoucher.getBeginTime();
LocalDateTime endTime = seckillVoucher.getEndTime();
LocalDateTime now = LocalDateTime.now();

if (now.isBefore(beginTime)) {
// 当前时间早于秒杀开始时间,说明秒杀没有开始
return Result.fail("秒杀尚未开始,请耐心等待!秒杀开始时间:" + beginTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
}
// 3.判断秒杀是否已经结束
if (now.isAfter(endTime)) {
// 当前时间晚于秒杀结束时间,说明秒杀结束了
return Result.fail("秒杀已经结束,感谢支持!");
}


// 4.判断库存是否充足
Integer stock = seckillVoucher.getStock();
if (stock <= 0) {
return Result.fail("商品已经售罄!");
}

// 实现一人一单,获取user对象锁
Long userID = UserHolder.getUser().getId();

//TODO: 用Redisson提供的可重入锁
RLock lock = redissonClient.getLock("lock:order:" + userID.toString().intern());
boolean flag = lock.tryLock();

if (!flag) {
// 获取锁失败,就直接返回错误信息即可
return Result.fail("[秒杀优惠券]不允许重复下单!本秒杀业务一切解释器归ty公司所有");
}

try {
Object o = AopContext.currentProxy();
IVoucherOrderService proxy = (IVoucherOrderService) o;
return proxy.createVoucherOrder(voucherId);
} finally {
lock.unlock();
}
}

}

1.8.5 Redission的可重入锁原理

在JDK中,lock锁和synchronized锁都是可重入锁,可重入锁就是当前一个线程在调就是一个线程不用释放,可以重复的获取一个锁n次,只是在释放的时候,也需要相应的释放n次。

那么猜想Redisssion可以实现可重入就一定有一个记录获取锁次数的变量。

1.8.5.1 Redis无法实现可重入锁

img

可以看到没有一个变量用来计数锁获取次数

1.8.5.2 Redisssion可实现可重入锁

img

1.8.5.3 Lua脚本实现获取锁(可重入)

img

1.8.5.4 Lua脚本实现释放锁(可重入)

img

1.8.5.5 测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@SpringBootTest
@Slf4j
public class RedissonTest {

@Autowired
private RedissonClient redissonClient;

private RLock lock;

@BeforeEach
void setup(){
lock=redissonClient.getLock("order");
}

@Test
void method1(){
boolean isLock = lock.tryLock();
if(!isLock){
log.error("获取锁失败……1");
return;
}
try{
log.info("获取锁成功……1");
method2();
log.info("开始执行业务……1");
}finally{
log.warn("准备释放锁……1");
lock.unlock();
}
}

void method2(){
boolean isLock = lock.tryLock();
if(!isLock){
log.error("获取锁失败……2");
return;
}
try{
log.info("获取锁成功……2");
log.info("开始执行业务……2");
}finally{
log.warn("准备释放锁……2");
lock.unlock();
}
}
}

img

1.8.5.6 源码

tryLock():

img

unLock():

img

注:在最后删除锁之后,还进行了一个发布动作

1.8.6 Redission的锁重试和看门狗机制

1.8.6.1 获取锁重试

redis获取锁只尝试一次就返回false,没有更多的尝试机制。

整体流程:
img

image-20240225204126277

image-20240225204221016

image-20240225204321817

同时计算获取锁耗时,再使用等待时间-获取锁耗时时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
//将最大获取锁等待时间转化为毫秒
long time = unit.toMillis(waitTime);
//获取当前时间的毫秒值
long current = System.currentTimeMillis();
//获取线程标识
long threadId = Thread.currentThread().getId();
//尝试获取锁,如果获取锁失败ttl应该是一个具体值,而不是null
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
//获取到锁,直接返回true
return true;
}

//计算最大等待时间减去第一次尝试获取锁的时间,得到剩余等待时间
time -= System.currentTimeMillis() - current;
if (time <= 0) {
//如果不存在剩余时间
acquireFailed(waitTime, unit, threadId);
return false;
}

//如果还存在剩余时间,接着获取当前时间
current = System.currentTimeMillis();
//订阅释放锁的信号(就是开头说的释放锁时会发布的那条信息),这里也是异步执行,因此返回类型为Future
CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
try {
//获取等待结果,如果超过了剩余最大等待时间会抛出异常,执行TimeOutException中的catch代码
subscribeFuture.get(time, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
if (!subscribeFuture.completeExceptionally(new RedisTimeoutException(
"Unable to acquire subscription lock after " + time + "ms. " +
"Try to increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."))) {
subscribeFuture.whenComplete((res, ex) -> {
if (ex == null) {
unsubscribe(res, threadId);
}
});
}
acquireFailed(waitTime, unit, threadId);
return false;
} catch (ExecutionException e) {
acquireFailed(waitTime, unit, threadId);
return false;
}

//走到这一步说明还存在剩余时间并获取到了锁释放信息
try {
//更新剩余时间
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}

while (true) {
long currentTime = System.currentTimeMillis();
//再次尝试获取锁,如果失败获取到ttl存活时间
ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
//执行到这说明又没有获取到锁,更新剩余时间
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}

// waiting for message
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
//如果锁剩余时间小于当前线程剩余等待时间,再次获取锁,最大等待时间为锁的释放时间
commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
//如果锁的剩余时间大于当前线程剩余等待时间,再次获取锁,最大等待时间为当前线程的剩余时间
commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
//如果还没有获取到锁,while循环执行上述代码
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
}
} finally {
unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);
}
}

1.8.6.2 看门狗机制(刷新有效期)

1.8.6.2.1 获取锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
//返回的值赋值给该变量,该变量是一个Future,因为是异步执行lua脚本,因此无法立刻拿到返回值
RFuture<Long> ttlRemainingFuture;
if (leaseTime > 0) {
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}

// 刷新有效期
//回调函数,当拿到返回值ttlRemaining
CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
// lock acquired
//说明获取锁成功
if (ttlRemaining == null) {
//如果我们配置了锁的过期时间,那么将其转化为毫秒后覆盖掉默认的锁释放时间(同时也会取消看门狗机制)
if (leaseTime > 0) {
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
//如果没有指定锁的过期施放时间,那么定时将锁的有效时间进行更新
scheduleExpirationRenewal(threadId);
}
}
return ttlRemaining;
});

return new CompletableFutureWrapper<>(f);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
protected void scheduleExpirationRenewal(long threadId) {
//该对象主要存储了两个属性,线程标识,Timeout对象(一个定时任务),我们可以理解为一个锁对象
ExpirationEntry entry = new ExpirationEntry();
//MAP对象存储的是不同业务中的不同锁对象。getEntryName()实际上获取到的是getLock(name)方法中的name
//如果第一次获取,返回值为null,如果map中已经存在该业务类型的锁那么返回的是entry对象
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
//说明map中以及存在该业务类型的锁了,更新该业务锁的线程标识id
oldEntry.addThreadId(threadId);
} else {
//第一次向map中存放该业务类型的锁,更新该业务锁的线程标识id
entry.addThreadId(threadId);
try {
//续约方法
renewExpiration();
} finally {
if (Thread.currentThread().isInterrupted()) {
cancelExpirationRenewal(threadId);
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
private void renewExpiration() {
//获取业务锁对象
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
//这里有三个参数,第一个是定时任务需要执行的逻辑代码,第二个是延时执行时间,第三个延时执行时间单位
//延时执行时间是锁的过期释放时间的三分之一
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
//获取锁对象
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
//获取锁对象中的线程标识
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
//刷新锁的有效时间
CompletionStage<Boolean> future = renewExpirationAsync(threadId);
//刷新锁的有效时间结束后,调用下面方法
future.whenComplete((res, e) -> {
//如果刷新锁的有效时间抛出异常,抛出日志并将锁对象从map中移除
if (e != null) {
log.error("Can't update lock {} expiration", getRawName(), e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
//如果刷新成功
if (res) {
// 递归调用本方法
renewExpiration();
} else {
//如果返回值为null,那么就取消定时任务
cancelExpirationRenewal(null);
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

ee.setTimeout(task);
}
1
2
3
4
5
6
7
8
9
10
protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getRawName()),
internalLockLeaseTime, getLockName(threadId));
}
1.8.6.2.2 释放锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public RFuture<Void> unlockAsync(long threadId) {
RFuture<Boolean> future = unlockInnerAsync(threadId);
//当取消锁成功时,执行该回调方法
CompletionStage<Void> f = future.handle((opStatus, e) -> {
//取消续约定时任务
cancelExpirationRenewal(threadId);

if (e != null) {
throw new CompletionException(e);
}
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
throw new CompletionException(cause);
}

return null;
});

return new CompletableFutureWrapper<>(f);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
protected void cancelExpirationRenewal(Long threadId) {
//根据key获取到锁对象
ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
//如果不存在锁,那么直接返回
if (task == null) {
return;
}

if (threadId != null) {
//移除锁对象中的线程标识
task.removeThreadId(threadId);
}

if (threadId == null || task.hasNoThreads()) {
Timeout timeout = task.getTimeout();
if (timeout != null) {
//如果锁对象中的定时任务不为空,那么就取消
timeout.cancel();
}
//移除map中的锁对象
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
}
}

1.8.7 Redission的multiLock问题

1.8.7.1 背景

Redisson分布式锁主从一致性问题
为了提高redis的可用性,我们会搭建集群或者主从,现在以主从为例
此时我们去写命令,写在主机上, 主机会将数据同步给从机,但是假设在主机还没有来得及把数据写入到从机去的时候,此时主机宕机,哨兵会发现主机宕机,并且选举一个slave变成master,而此时新的master中实际上并没有锁信息,此时锁信息就已经丢掉了。

在这里插入图片描述

1.8.7.2 解决

redission提出来了MutiLock锁,使用这把锁咱们就不使用主从了,每个节点的地位都是一样的, 这把锁加锁的逻辑需要写入到每一个主丛节点上,只有所有的服务器都写入成功,此时才是加锁成功,假设现在某个节点挂了,那么他去获得锁的时候,只要有一个节点拿不到,都不能算是加锁成功,就保证了加锁的可靠性。

在这里插入图片描述

1.8.7.3 使用示例

修改配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient() {
// 配置类
Config config = new Config();
// 添加redis地址,这里添加了单点的地址,也可以使用config.useClusterServers()添加集群地址
config.useSingleServer().setAddress("redis://192.168.183.145:6379")
.setPassword("112453");
// 创建RedissonClient对象
return Redisson.create(config);
}

@Bean
public RedissonClient redissonClient2() {
// 配置类
Config config = new Config();
// 添加redis地址,这里添加了单点的地址,也可以使用config.useClusterServers()添加集群地址
config.useSingleServer().setAddress("redis://192.168.193.175:6380")
.setPassword("557724");
// 创建RedissonClient对象
return Redisson.create(config);
}

@Bean
public RedissonClient redissonClient3() {
// 配置类
Config config = new Config();
// 添加redis地址,这里添加了单点的地址,也可以使用config.useClusterServers()添加集群地址
config.useSingleServer().setAddress("redis://192.168.177.145:6381")
.setPassword("5896");
// 创建RedissonClient对象
return Redisson.create(config);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
@Slf4j
@SpringBootTest
public class RedissonTest {
@Resource
private RedissonClient redissonClient;

@Resource
private RedissonClient redissonClient2;

@Resource
private RedissonClient redissonClient3;

RLock lock;

@BeforeEach
void setUp() {
RLock lock1 = redissonClient.getLock("lock");
RLock lock2 = redissonClient.getLock("lock");
RLock lock3 = redissonClient.getLock("lock");

// 创建连锁
lock = redissonClient.getMultiLock(lock1, lock2, lock3);
}

@Test
void method1() {
boolean isLock = lock.tryLock();
if (!isLock) {
log.error("获取锁失败1");
return;
}
try {
log.info("获取锁成功,1");
method2();
} finally {
log.info("释放锁,1");
lock.unlock();
}
}

@Test
void method2() {
boolean isLock = lock.tryLock();
if (!isLock) {
log.error("获取锁失败2");
return;
}
try {
log.info("获取锁成功,2");
} finally {
log.info("释放锁,2");
lock.unlock();
}
}
}

1.8.7.4 源码

getMultiLock()

1
2
3
4
5
6
7
8
9
10
11
12
// 这里相对简单,就是创建了一个RLock集合,为了后续分别去获取锁
final List<RLock> locks = new ArrayList<>();
@Override
public RLock getMultiLock(RLock... locks) {
return new RedissonMultiLock(locks);
}
public RedissonMultiLock(RLock... locks) {
if (locks.length == 0) {
throw new IllegalArgumentException("Lock objects are not defined");
}
this.locks.addAll(Arrays.asList(locks));
}

加锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
// 假设传入的waitTime=2s leaseTime=2s
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
// 定义了一个新的释放时间newLeaseTime=-1
long newLeaseTime = -1;
// 如果传入了时间的tryLock,leaseTime就不等于-1,不传默认值为-1
if (leaseTime != -1) {
// 将新的锁释放时间设置为waitTime的2倍,重试耗时时间较久,预防重试没完就释放了
newLeaseTime = unit.toMillis(waitTime)*2;
}
// 获取当前时间(毫秒)
long time = System.currentTimeMillis();
// remain==保持,先翻译为保持时间,定义为-1
long remainTime = -1;
if (waitTime != -1) {
// 保持时间设置为waitTime,2000ms
remainTime = unit.toMillis(waitTime);
}
// calcLockWaitTime(remainTime);-->return Math.max(remainTime / locks.size(), 1);
// 300ms=lockWaitTime
long lockWaitTime = calcLockWaitTime(remainTime);
// failedLocksLimit -> 0
int failedLocksLimit = failedLocksLimit();
List<RLock> acquiredLocks = new ArrayList<>(locks.size());
// 循环获取锁
for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
// 获取到的redisson实例生成的锁
RLock lock = iterator.next();
// 锁获取标识
boolean lockAcquired;
try {
if (waitTime == -1 && leaseTime == -1) {
lockAcquired = lock.tryLock();
} else {
long awaitTime = Math.min(lockWaitTime, remainTime);
// 直接去获取锁,返回true or false
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
}
} catch (RedisResponseTimeoutException e) {
// 如果发生了RedisResponseTimeoutException,会先解锁。因为这个时候不确定是否加锁成功了,所以解锁设置标识为失败。
unlockInner(Arrays.asList(lock));
lockAcquired = false;
} catch (Exception e) {
// 其他异常设置标识为false
lockAcquired = false;
}
if (lockAcquired) {
// 如果加锁成功 放入集合中
acquiredLocks.add(lock);
} else {
// 获取锁数量等于锁总数量
if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
break;
}

if (failedLocksLimit == 0) {
// 会把获取到锁的一次性解锁
unlockInner(acquiredLocks);
if (waitTime == -1 && leaseTime == -1) {
return false;
}
// 重置failedLocksLimit=0
failedLocksLimit = failedLocksLimit();
// 清空获取到锁的集合
acquiredLocks.clear();
// 重置迭代器的指针
while (iterator.hasPrevious()) {
iterator.previous();
}
} else {
// RedissonRedLock才会进入这个逻辑
failedLocksLimit--;
}
}
// 如果remainTime不为-1
// remainTime=2000ms
if (remainTime != -1) {
// 查看remainTime的剩余时间
remainTime -= System.currentTimeMillis() - time;
// 重置time
time = System.currentTimeMillis();
// 如果保持时间也就是之前的waitTime小于0,也就是说超过了尝试获取锁时最长的等待时间,释放所有已获得的锁,并返回false,加锁失败
if (remainTime <= 0) {
unlockInner(acquiredLocks);
return false;
}
}
}

// 如果自己手动传入了锁释放时间,释放时间为-1的时候触发看门狗机制
if (leaseTime != -1) {
// 创建了一个RFuture集合
List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size());
// 由于每个锁获取到手的时间不同,等所有锁都拿到手之后,重新给所有锁重新设置有效期
for (RLock rLock : acquiredLocks) {
//为每个锁设置过期时间,是一个异步的操作
RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
futures.add(future);
}

for (RFuture<Boolean> rFuture : futures) {
// 阻塞当前线程,同步等待每个异步操作的结果
rFuture.syncUninterruptibly();
}
}

return true;
}

reids-分布式锁
https://baijianglai.cn/reids-分布式锁/62a2e5871819/
作者
Lai Baijiang
发布于
2024年2月25日
许可协议