1 Redis实现秒杀优化
1.1 秒杀流程
1、查询优惠卷
2、判断秒杀库存是否足够
3、查询订单
4、校验是否是一人一单
5、扣减库存
6、创建订单

在流程上,这是同步操作,即:会按照顺序进行执行,但是这样一个一个执行会有一个很大的缺陷:效率很低。那么是否可以提高效率呢?
可以:使用异步进行优化。
将耗时比较短的逻辑判断放入到redis中,比如是否库存足够,比如是否一人一单,这样的操作,只要这种逻辑可以完成,就意味着我们是一定可以下单完成的,我们只需要进行快速的逻辑判断,根本就不用等下单逻辑走完,我们直接给用户返回成功, 再在后台开一个线程,后台线程慢慢的去执行queue里边的消息,这样程序不就超级快了吗?而且也不用担心线程池消耗殆尽的问题,因为这里我们的程序中并没有手动使用任何线程池。

但是此时会有两个问题:
- 怎么在redis中去快速校验一人一单,还有库存判断。
- 由于我们校验和tomct下单是两个线程,那么我们如何知道到底哪个单他最后是否成功,或者是下单完成,为了完成这件事我们在redis操作完之后,我们会将一些信息返回给前端,同时也会把这些信息丢到异步queue中去,后续操作中,可以通过这个id来查询我们tomcat中的下单逻辑是否完成了。
针对问题1:
当用户下单之后,判断库存是否充足只需要导redis中去根据key找对应的value是否大于0即可,如果不充足,则直接结束,如果充足,继续在redis中判断用户是否可以下单,如果set集合中没有这条数据,说明他可以下单,如果set集合中没有这条记录,则将userId和优惠卷存入到redis中,并且返回0,整个过程需要保证是原子性的,我们可以使用lua来操作。
针对问题2:
当以上判断逻辑走完之后,我们可以判断当前redis中返回的结果是否是0 ,如果是0,则表示可以下单,则将之前说的信息存入到到queue中去,然后返回,然后再来个线程异步的下单,前端可以通过返回的订单id来判断是否下单成功。
1.2 流程优化

1.2.1 优惠券添加至Redis
在添加秒杀优惠券的时候,将优惠券id和个数添加至redis中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| @Service public class VoucherServiceImpl extends ServiceImpl<VoucherMapper, Voucher> implements IVoucherService {
@Resource private ISeckillVoucherService seckillVoucherService;
@Resource private StringRedisTemplate stringRedisTemplate;
@Override public Result queryVoucherOfShop(Long shopId) { List<Voucher> vouchers = getBaseMapper().queryVoucherOfShop(shopId); return Result.ok(vouchers); }
@Override @Transactional(rollbackFor = {}) public void addSeckillVoucher(Voucher voucher) { save(voucher); SeckillVoucher seckillVoucher = new SeckillVoucher(); seckillVoucher.setVoucherId(voucher.getId()); seckillVoucher.setStock(voucher.getStock()); seckillVoucher.setBeginTime(voucher.getBeginTime()); seckillVoucher.setEndTime(voucher.getEndTime()); seckillVoucherService.save(seckillVoucher);
stringRedisTemplate.opsForValue().set(RedisConstants.SECKILL_STOCK_KEY + voucher.getId(), seckillVoucher.getStock().toString()); } }
|
1.2.2 Lua脚本
Lua脚本用来保证下单的时候不会超卖和一人一单,原子化操作放在同一个脚本中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
|
local voucherId = ARGV[1]
local userId = ARGV[2]
local stockKey = 'seckill:stock:' .. voucherId
local orderKey = 'seckill:order:' .. voucherId
if (tonumber(redis.call('get', stockKey)) <= 0) then return 1 end
if (redis.call('sismember', orderKey, userId) == 1) then return 2 end
redis.call('incrby', stockKey, -1)
redis.call('sadd', orderKey, userId) return 0
|
1.2.3 添加异步阻塞队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177
| @Slf4j @Service public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Autowired private ISeckillVoucherService iSeckillVoucherService;
@Autowired private RedisWorker redisWorker;
@Resource StringRedisTemplate stringRedisTemplate;
@Resource private RedissonClient redissonClient;
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static { SECKILL_SCRIPT = new DefaultRedisScript<>(); SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua")); SECKILL_SCRIPT.setResultType(Long.class); }
private BlockingQueue<VoucherOrder> blockingQueue = new ArrayBlockingQueue<>(1024 * 1024);
ExecutorService fixedThreadPool = CacheClient.newFixedThreadPool(1);
private IVoucherOrderService proxy;
@PostConstruct private void init() { fixedThreadPool.submit(new Runnable() { @Override public void run() { while (true) { try { VoucherOrder voucherOrder = blockingQueue.take(); handleVoucherOrder(voucherOrder);
} catch (Exception e) { log.error("处理订单异常:" + e.getMessage()); }
} }
private void handleVoucherOrder(VoucherOrder voucherOrder) { Long userId = voucherOrder.getUserId(); RLock lock = redissonClient.getLock(RedisConstants.LOCK_VOUVHER_ORDER_KEY + userId); boolean flag = lock.tryLock(); if (!flag) { log.error("不允许重复下单!"); return; } try { proxy.createVoucherOrderThread(voucherOrder); } finally { lock.unlock(); } } }); }
@Override public Result seckillVoucher(Long voucherId) { Long userId = UserHolder.getUser().getId(); Long result = stringRedisTemplate.execute( SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), userId.toString() );
if (result.intValue() != 0) { return Result.fail(result == 1 ? "优惠券已售罄,感谢参与!" : "您已经购买过该优惠券,优惠券限制每人仅购买一次!"); } long orderId = redisWorker.nextID(RedisConstants.VOUVHER_ORDER_KEY); VoucherOrder voucherOrder = new VoucherOrder(); voucherOrder.setVoucherId(voucherId); voucherOrder.setUserId(userId); voucherOrder.setId(orderId);
Object o = AopContext.currentProxy(); IVoucherOrderService proxy = (IVoucherOrderService) o;
blockingQueue.add(voucherOrder);
return Result.ok(orderId); }
@Override @Transactional(rollbackFor = {}) public void createVoucherOrderThread(VoucherOrder voucherOrder) { Long userID = voucherOrder.getId();
Integer count = query().eq("user_id", userID).eq("voucher_id", voucherOrder.getVoucherId()).count(); if (count > 0) { log.error("秒杀优惠券每人限购1张,感谢配合,本优惠券最终解释权归ty公司所有!"); }
boolean result = iSeckillVoucherService.update() .setSql("stock = stock - 1") .eq("voucher_id", voucherOrder.getVoucherId()) .gt("stock", 0) .update();
if (!result) { log.error("商品已经售罄!"); }
long orderId = redisWorker.nextID(RedisConstants.VOUVHER_ORDER_KEY);
save(voucherOrder);
} }
|
1.3 基于阻塞队列存在的问题
- 内存限制问题
- 数据安全问题
2 秒杀的异步优化
2.1 Redis的消息队列
什么是消息队列:字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色:
- 消息队列:存储和管理消息,也被称为消息代理(Message Broker)
- 生产者:发送消息到消息队列
- 消费者:从消息队列获取消息并处理消息

2.1.1 基于List结构的消息队列
队列是入口和出口不在一边,因此我们可以利用:LPUSH 结合 RPOP、或者 RPUSH 结合 LPOP来实现。
不过要注意的是,当队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息。因此这里应该使用BRPOP或者BLPOP来实现阻塞效果。

优点:
- 利用Redis存储,不受限于JVM内存上限
- 基于Redis的持久化机制,数据安全性有保证
- 可以满足消息有序性
缺点:
2.1.2 基于PubSub的消息队列
PubSub(发布publish订阅subscribe)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。
SUBSCRIBE channel [channel] :订阅一个或多个频道
PUBLISH channel msg :向一个频道发送消息
PSUBSCRIBE pattern[pattern] :订阅与pattern格式匹配的所有频道
[注]pattern:通配符
? 一个字符
*0个或者多个
[ab]a或者b

优点:
缺点:
- 不支持数据持久化
- 无法避免消息丢失
- 消息堆积有上限,超出时数据丢失
2.1.3 基于stream的消息队列
Stream 是 Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列。
2.1.3.1 发送消息

XADD
key – 键值
NOMKSTREAM – 如果队列不存在,是否开启创建队列,默认是创建队列
MAXLEN – 最大消息数量
ID – 消息的唯一id,如果设置*代表Redis自动生成,格式是”时间戳-递增数字”
field – 消息体
value – 消息值
2.1.3.2 读取消息

XREAD
count – 每次读取消息的最大数量
block – 当前没有消息时,是否阻塞,阻塞时长,0是永久等待
streams keys – 要从哪个队列中读取消息,key是队列名
ID – 起始id,只返回大于该ID的消息,其中0代表第一条纤细,$代表最新的消息。
阻塞读取

STREAM类型消息队列的XREAD命令特点:
- 消息可回溯
- 一个消息可以被多个消费者读取
- 可以阻塞读取
- 有消息漏读的风险
2.1.3.3 stream消费者组模式
消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。

2.1.3.3.1 创建消费者组

key:队列名称。
groupName:消费者组名称。
ID:起始ID标示,$代表队列中最后一个消息,0则代表队列中第一个消息。
MKSTREAM:队列不存在时自动创建队列。
2.1.3.3.2 消费者组读取消息
1
| XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...] >
|
group:消费组名称
consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
count:本次查询的最大数量
BLOCK milliseconds:当没有消息时最长等待时间
NOACK:无需手动ACK,获取到消息后自动确认
STREAMS key:指定队列名称
ID:获取消息的起始ID:
“>”:从下一个未消费的消息开始
其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第1个消息开始。
STREAM类型消息队列的XREADGROUP命令特点:
- 消息可回溯
- 可以多消费者争抢消息,加快消费速度
- 可以阻塞读取
- 没有消息漏读的风险
- 有消息确认机制,保证消息至少被消费一次
2.1.4 对比

2.2 使用Stream消息队列优化异步秒杀
2.2.1 创建消息队列
1
| XGROUP CREATE stream.orders g1 0 MKSTREAM
|
2.2.2 修改lua脚本
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| -- 1.参数列表 -- 1.1优惠券id local voucherId = ARGV[1] -- 1.2用户id local userId = ARGV[2] -- 1.3订单id local orderId = ARGV[3]
-- 2.数据key -- 2.1 库存key local stockKey = 'seckill:stock:' .. voucherId -- 2.2 订单key local orderKey = 'seckill:order:' .. voucherId
-- 3.脚本业务 -- 3.1 判断库存是否充足 get stockKet if (tonumber(redis.call('get', stockKey)) <= 0) then -- 库存不足返回1 return 1 end -- 3.2 判断用户是否下单 SISMEMBER orderKey userId if (redis.call('sismember', orderKey, userId) == 1) then -- 3.3 存在,说明重复下单 return 2 end -- 3.4 扣库存 incrby stockKey -1 redis.call('incrby', stockKey, -1) -- 3.5 下单 (保存用户) sadd orderKey userId redis.call('sadd', orderKey, userId) -- 3.6 发送到消息队列中 VoucherOrder实体类中主键就叫id,所以这里也是id,不要叫orderId redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'id', orderId, 'voucherId', voucherId) return 0
|
即:每次下单保存了用户之后,还需要将消息发送至消息队列中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265
| @Slf4j @Service @SuppressWarnings({"all"}) public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Autowired private ISeckillVoucherService iSeckillVoucherService;
@Autowired private RedisWorker redisWorker;
@Resource StringRedisTemplate stringRedisTemplate;
@Resource private RedissonClient redissonClient;
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static { SECKILL_SCRIPT = new DefaultRedisScript<>(); SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua")); SECKILL_SCRIPT.setResultType(Long.class); }
private BlockingQueue<VoucherOrder> blockingQueue = new ArrayBlockingQueue<>(1024 * 1024);
private IVoucherOrderService proxy;
private ExecutorService executorService = CacheClient.newFixedThreadPool(1);
@PostConstruct private void init() { executorService.submit(new VoucherOrderHandler()); }
private class VoucherOrderHandler implements Runnable { String queueName = RedisConstants.STREAM_QUEUE_NAME;
@Override public void run() { while (true) { try { List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read( Consumer.from("g1", "c1"), StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2L)), StreamOffset.create(queueName, ReadOffset.lastConsumed()) );
if (null == list || list.isEmpty()) { continue; } MapRecord<String, Object, Object> record = list.get(0); Map<Object, Object> map = record.getValue(); VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(map, new VoucherOrder(), true);
handleVoucherOrder(voucherOrder); stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
} catch (Exception e) { log.error("处理订单异常:" + e); handlePendingList(); }
} } }
private void handlePendingList() { String queueName = RedisConstants.STREAM_QUEUE_NAME; while (true) { try { List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read( Consumer.from("g1", "c1"), StreamReadOptions.empty().count(1), StreamOffset.create(queueName, ReadOffset.from("0")) );
if (null == list || list.isEmpty()) { break; } MapRecord<String, Object, Object> record = list.get(0); Map<Object, Object> map = record.getValue(); VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(map, new VoucherOrder(), true);
handleVoucherOrder(voucherOrder); stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
} catch (Exception e) { log.error("处理PendingList订单异常:" + e); try { Thread.sleep(20); } catch (InterruptedException ex) { ex.printStackTrace(); } }
} }
private void handleVoucherOrder(VoucherOrder voucherOrder) { Long userId = voucherOrder.getUserId(); RLock lock = redissonClient.getLock(RedisConstants.LOCK_VOUVHER_ORDER_KEY + userId); boolean flag = lock.tryLock(); if (!flag) { log.error("不允许重复下单!"); return; } try { proxy.createVoucherOrderThread(voucherOrder); } finally { lock.unlock(); } }
@Override public Result seckillVoucher(Long voucherId) { Long userId = UserHolder.getUser().getId(); long orderId = redisWorker.nextID(RedisConstants.VOUVHER_ORDER_KEY); Long result = stringRedisTemplate.execute( SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), userId.toString(), String.valueOf(orderId) );
if (result.intValue() != 0) { return Result.fail(result == 1 ? "优惠券已售罄,感谢参与!" : "您已经购买过该优惠券,优惠券限制每人仅购买一次!"); }
Object o = AopContext.currentProxy(); proxy = (IVoucherOrderService) o;
return Result.ok(orderId); }
@Override @Transactional public Result createVoucherOrder(Long voucherId) { Long userID = UserHolder.getUser().getId();
Integer count = query().eq("user_id", userID).eq("voucher_id", voucherId).count(); if (count > 0) { return Result.fail("秒杀优惠券每人限购1张,感谢配合,本优惠券最终解释权归ty公司所有!"); }
boolean result = iSeckillVoucherService.update() .setSql("stock = stock - 1") .eq("voucher_id", voucherId) .gt("stock", 0) .update();
if (!result) { return Result.fail("商品已经售罄!"); }
long orderId = redisWorker.nextID(RedisConstants.VOUVHER_ORDER_KEY);
VoucherOrder voucherOrder = new VoucherOrder(); voucherOrder.setId(orderId); voucherOrder.setVoucherId(voucherId); voucherOrder.setUserId(userID);
save(voucherOrder);
return Result.ok(orderId); } }
|