1 Redis实现秒杀优化
1.1 秒杀流程
1、查询优惠卷
2、判断秒杀库存是否足够
3、查询订单
4、校验是否是一人一单
5、扣减库存
6、创建订单

在流程上,这是同步操作,即:会按照顺序进行执行,但是这样一个一个执行会有一个很大的缺陷:效率很低。那么是否可以提高效率呢?
可以:使用异步进行优化。
将耗时比较短的逻辑判断放入到redis中,比如是否库存足够,比如是否一人一单,这样的操作,只要这种逻辑可以完成,就意味着我们是一定可以下单完成的,我们只需要进行快速的逻辑判断,根本就不用等下单逻辑走完,我们直接给用户返回成功, 再在后台开一个线程,后台线程慢慢的去执行queue里边的消息,这样程序不就超级快了吗?而且也不用担心线程池消耗殆尽的问题,因为这里我们的程序中并没有手动使用任何线程池。

但是此时会有两个问题:
- 怎么在redis中去快速校验一人一单,还有库存判断。
- 由于我们校验和tomct下单是两个线程,那么我们如何知道到底哪个单他最后是否成功,或者是下单完成,为了完成这件事我们在redis操作完之后,我们会将一些信息返回给前端,同时也会把这些信息丢到异步queue中去,后续操作中,可以通过这个id来查询我们tomcat中的下单逻辑是否完成了。
针对问题1:
当用户下单之后,判断库存是否充足只需要导redis中去根据key找对应的value是否大于0即可,如果不充足,则直接结束,如果充足,继续在redis中判断用户是否可以下单,如果set集合中没有这条数据,说明他可以下单,如果set集合中没有这条记录,则将userId和优惠卷存入到redis中,并且返回0,整个过程需要保证是原子性的,我们可以使用lua来操作。
针对问题2:
当以上判断逻辑走完之后,我们可以判断当前redis中返回的结果是否是0 ,如果是0,则表示可以下单,则将之前说的信息存入到到queue中去,然后返回,然后再来个线程异步的下单,前端可以通过返回的订单id来判断是否下单成功。
1.2 流程优化

1.2.1 优惠券添加至Redis
在添加秒杀优惠券的时候,将优惠券id和个数添加至redis中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| @Service public class VoucherServiceImpl extends ServiceImpl<VoucherMapper, Voucher> implements IVoucherService {
@Resource private ISeckillVoucherService seckillVoucherService;
@Resource private StringRedisTemplate stringRedisTemplate;
@Override public Result queryVoucherOfShop(Long shopId) { List<Voucher> vouchers = getBaseMapper().queryVoucherOfShop(shopId); return Result.ok(vouchers); }
@Override @Transactional(rollbackFor = {}) public void addSeckillVoucher(Voucher voucher) { save(voucher); SeckillVoucher seckillVoucher = new SeckillVoucher(); seckillVoucher.setVoucherId(voucher.getId()); seckillVoucher.setStock(voucher.getStock()); seckillVoucher.setBeginTime(voucher.getBeginTime()); seckillVoucher.setEndTime(voucher.getEndTime()); seckillVoucherService.save(seckillVoucher);
stringRedisTemplate.opsForValue().set(RedisConstants.SECKILL_STOCK_KEY + voucher.getId(), seckillVoucher.getStock().toString()); } }
|
1.2.2 Lua脚本
Lua脚本用来保证下单的时候不会超卖和一人一单,原子化操作放在同一个脚本中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
|
local voucherId = ARGV[1]
local userId = ARGV[2]
local stockKey = 'seckill:stock:' .. voucherId
local orderKey = 'seckill:order:' .. voucherId
if (tonumber(redis.call('get', stockKey)) <= 0) then return 1 end
if (redis.call('sismember', orderKey, userId) == 1) then return 2 end
redis.call('incrby', stockKey, -1)
redis.call('sadd', orderKey, userId) return 0
|
1.2.3 添加异步阻塞队列

| @Slf4j @Service public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Autowired private ISeckillVoucherService iSeckillVoucherService;
@Autowired private RedisWorker redisWorker;
@Resource StringRedisTemplate stringRedisTemplate;
@Resource private RedissonClient redissonClient;
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static { SECKILL_SCRIPT = new DefaultRedisScript<>(); SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua")); SECKILL_SCRIPT.setResultType(Long.class); }
private BlockingQueue<VoucherOrder> blockingQueue = new ArrayBlockingQueue<>(1024 * 1024);
ExecutorService fixedThreadPool = CacheClient.newFixedThreadPool(1);
private IVoucherOrderService proxy;
@PostConstruct private void init() { fixedThreadPool.submit(new Runnable() { @Override public void run() { while (true) { try { VoucherOrder voucherOrder = blockingQueue.take(); handleVoucherOrder(voucherOrder);
} catch (Exception e) { log.error("处理订单异常:" + e.getMessage()); }
} }
private void handleVoucherOrder(VoucherOrder voucherOrder) { Long userId = voucherOrder.getUserId(); RLock lock = redissonClient.getLock(RedisConstants.LOCK_VOUVHER_ORDER_KEY + userId); boolean flag = lock.tryLock(); if (!flag) { log.error("不允许重复下单!"); return; } try { proxy.createVoucherOrderThread(voucherOrder); } finally { lock.unlock(); } } }); }
@Override public Result seckillVoucher(Long voucherId) { Long userId = UserHolder.getUser().getId(); Long result = stringRedisTemplate.execute( SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), userId.toString() );
if (result.intValue() != 0) { return Result.fail(result == 1 ? "优惠券已售罄,感谢参与!" : "您已经购买过该优惠券,优惠券限制每人仅购买一次!"); } long orderId = redisWorker.nextID(RedisConstants.VOUVHER_ORDER_KEY); VoucherOrder voucherOrder = new VoucherOrder(); voucherOrder.setVoucherId(voucherId); voucherOrder.setUserId(userId); voucherOrder.setId(orderId);
Object o = AopContext.currentProxy(); IVoucherOrderService proxy = (IVoucherOrderService) o;
blockingQueue.add(voucherOrder);
return Result.ok(orderId); }
@Override @Transactional(rollbackFor = {}) public void createVoucherOrderThread(VoucherOrder voucherOrder) { Long userID = voucherOrder.getId();
Integer count = query().eq("user_id", userID).eq("voucher_id", voucherOrder.getVoucherId()).count(); if (count > 0) { log.error("秒杀优惠券每人限购1张,感谢配合,本优惠券最终解释权归ty公司所有!"); }
boolean result = iSeckillVoucherService.update() .setSql("stock = stock - 1") .eq("voucher_id", voucherOrder.getVoucherId()) .gt("stock", 0) .update();
if (!result) { log.error("商品已经售罄!"); }
long orderId = redisWorker.nextID(RedisConstants.VOUVHER_ORDER_KEY);
save(voucherOrder);
} }
|
1.3 基于阻塞队列存在的问题
- 内存限制问题
- 数据安全问题
2 秒杀的异步优化
2.1 Redis的消息队列
什么是消息队列:字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色:
- 消息队列:存储和管理消息,也被称为消息代理(Message Broker)
- 生产者:发送消息到消息队列
- 消费者:从消息队列获取消息并处理消息

2.1.1 基于List结构的消息队列
队列是入口和出口不在一边,因此我们可以利用:LPUSH 结合 RPOP、或者 RPUSH 结合 LPOP来实现。
不过要注意的是,当队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息。因此这里应该使用BRPOP或者BLPOP来实现阻塞效果。

优点:
- 利用Redis存储,不受限于JVM内存上限
- 基于Redis的持久化机制,数据安全性有保证
- 可以满足消息有序性
缺点:
2.1.2 基于PubSub的消息队列
PubSub(发布publish订阅subscribe)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。
SUBSCRIBE channel [channel] :订阅一个或多个频道
PUBLISH channel msg :向一个频道发送消息
PSUBSCRIBE pattern[pattern] :订阅与pattern格式匹配的所有频道
[注]pattern:通配符
? 一个字符
*0个或者多个
[ab]a或者b

优点:
缺点:
- 不支持数据持久化
- 无法避免消息丢失
- 消息堆积有上限,超出时数据丢失
2.1.3 基于stream的消息队列
Stream 是 Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列。
2.1.3.1 发送消息

XADD
key – 键值
NOMKSTREAM – 如果队列不存在,是否开启创建队列,默认是创建队列
MAXLEN – 最大消息数量
ID – 消息的唯一id,如果设置*代表Redis自动生成,格式是”时间戳-递增数字”
field – 消息体
value – 消息值
2.1.3.2 读取消息

XREAD
count – 每次读取消息的最大数量
block – 当前没有消息时,是否阻塞,阻塞时长,0是永久等待
streams keys – 要从哪个队列中读取消息,key是队列名
ID – 起始id,只返回大于该ID的消息,其中0代表第一条纤细,$代表最新的消息。
阻塞读取

STREAM类型消息队列的XREAD命令特点:
- 消息可回溯
- 一个消息可以被多个消费者读取
- 可以阻塞读取
- 有消息漏读的风险
2.1.3.3 stream消费者组模式
消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。

2.1.3.3.1 创建消费者组

key:队列名称。
groupName:消费者组名称。
ID:起始ID标示,$代表队列中最后一个消息,0则代表队列中第一个消息。
MKSTREAM:队列不存在时自动创建队列。
2.1.3.3.2 消费者组读取消息
1
| XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...] >
|
group:消费组名称
consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
count:本次查询的最大数量
BLOCK milliseconds:当没有消息时最长等待时间
NOACK:无需手动ACK,获取到消息后自动确认
STREAMS key:指定队列名称
ID:获取消息的起始ID:
“>”:从下一个未消费的消息开始
其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第1个消息开始。
STREAM类型消息队列的XREADGROUP命令特点:
- 消息可回溯
- 可以多消费者争抢消息,加快消费速度
- 可以阻塞读取
- 没有消息漏读的风险
- 有消息确认机制,保证消息至少被消费一次
2.1.4 对比

2.2 使用Stream消息队列优化异步秒杀
2.2.1 创建消息队列
1
| XGROUP CREATE stream.orders g1 0 MKSTREAM
|
2.2.2 修改lua脚本
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| -- 1.参数列表 -- 1.1优惠券id local voucherId = ARGV[1] -- 1.2用户id local userId = ARGV[2] -- 1.3订单id local orderId = ARGV[3]
-- 2.数据key -- 2.1 库存key local stockKey = 'seckill:stock:' .. voucherId -- 2.2 订单key local orderKey = 'seckill:order:' .. voucherId
-- 3.脚本业务 -- 3.1 判断库存是否充足 get stockKet if (tonumber(redis.call('get', stockKey)) <= 0) then -- 库存不足返回1 return 1 end -- 3.2 判断用户是否下单 SISMEMBER orderKey userId if (redis.call('sismember', orderKey, userId) == 1) then -- 3.3 存在,说明重复下单 return 2 end -- 3.4 扣库存 incrby stockKey -1 redis.call('incrby', stockKey, -1) -- 3.5 下单 (保存用户) sadd orderKey userId redis.call('sadd', orderKey, userId) -- 3.6 发送到消息队列中 VoucherOrder实体类中主键就叫id,所以这里也是id,不要叫orderId redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'id', orderId, 'voucherId', voucherId) return 0
|
即:每次下单保存了用户之后,还需要将消息发送至消息队列中

| @Slf4j @Service @SuppressWarnings({"all"}) public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Autowired private ISeckillVoucherService iSeckillVoucherService;
@Autowired private RedisWorker redisWorker;
@Resource StringRedisTemplate stringRedisTemplate;
@Resource private RedissonClient redissonClient;
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static { SECKILL_SCRIPT = new DefaultRedisScript<>(); SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua")); SECKILL_SCRIPT.setResultType(Long.class); }
private BlockingQueue<VoucherOrder> blockingQueue = new ArrayBlockingQueue<>(1024 * 1024);
private IVoucherOrderService proxy;
private ExecutorService executorService = CacheClient.newFixedThreadPool(1);
@PostConstruct private void init() { executorService.submit(new VoucherOrderHandler()); }
private class VoucherOrderHandler implements Runnable { String queueName = RedisConstants.STREAM_QUEUE_NAME;
@Override public void run() { while (true) { try { List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read( Consumer.from("g1", "c1"), StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2L)), StreamOffset.create(queueName, ReadOffset.lastConsumed()) );
if (null == list || list.isEmpty()) { continue; } MapRecord<String, Object, Object> record = list.get(0); Map<Object, Object> map = record.getValue(); VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(map, new VoucherOrder(), true);
handleVoucherOrder(voucherOrder); stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
} catch (Exception e) { log.error("处理订单异常:" + e); handlePendingList(); }
} } }
private void handlePendingList() { String queueName = RedisConstants.STREAM_QUEUE_NAME; while (true) { try { List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read( Consumer.from("g1", "c1"), StreamReadOptions.empty().count(1), StreamOffset.create(queueName, ReadOffset.from("0")) );
if (null == list || list.isEmpty()) { break; } MapRecord<String, Object, Object> record = list.get(0); Map<Object, Object> map = record.getValue(); VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(map, new VoucherOrder(), true);
handleVoucherOrder(voucherOrder); stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
} catch (Exception e) { log.error("处理PendingList订单异常:" + e); try { Thread.sleep(20); } catch (InterruptedException ex) { ex.printStackTrace(); } }
} }
private void handleVoucherOrder(VoucherOrder voucherOrder) { Long userId = voucherOrder.getUserId(); RLock lock = redissonClient.getLock(RedisConstants.LOCK_VOUVHER_ORDER_KEY + userId); boolean flag = lock.tryLock(); if (!flag) { log.error("不允许重复下单!"); return; } try { proxy.createVoucherOrderThread(voucherOrder); } finally { lock.unlock(); } }
@Override public Result seckillVoucher(Long voucherId) { Long userId = UserHolder.getUser().getId(); long orderId = redisWorker.nextID(RedisConstants.VOUVHER_ORDER_KEY); Long result = stringRedisTemplate.execute( SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), userId.toString(), String.valueOf(orderId) );
if (result.intValue() != 0) { return Result.fail(result == 1 ? "优惠券已售罄,感谢参与!" : "您已经购买过该优惠券,优惠券限制每人仅购买一次!"); }
Object o = AopContext.currentProxy(); proxy = (IVoucherOrderService) o;
return Result.ok(orderId); }
@Override @Transactional public Result createVoucherOrder(Long voucherId) { Long userID = UserHolder.getUser().getId();
Integer count = query().eq("user_id", userID).eq("voucher_id", voucherId).count(); if (count > 0) { return Result.fail("秒杀优惠券每人限购1张,感谢配合,本优惠券最终解释权归ty公司所有!"); }
boolean result = iSeckillVoucherService.update() .setSql("stock = stock - 1") .eq("voucher_id", voucherId) .gt("stock", 0) .update();
if (!result) { return Result.fail("商品已经售罄!"); }
long orderId = redisWorker.nextID(RedisConstants.VOUVHER_ORDER_KEY);
VoucherOrder voucherOrder = new VoucherOrder(); voucherOrder.setId(orderId); voucherOrder.setVoucherId(voucherId); voucherOrder.setUserId(userID);
save(voucherOrder);
return Result.ok(orderId); } }
|