29讲⽣产者消费者模式:电商库存设计优化
你好,我是刘超。
⽣产者消费者模式,在之前的⼀些案例中,我们是有使⽤过的,相信你有⼀定的了解。这个模式是⼀个⼗分经典的多线程并发协作模式,⽣产者与消费者是通过⼀个中间容器来解决强耦合关系,并以此来实现不同的⽣产与消费速度,从⽽达到缓冲的效果。
使⽤⽣产者消费者模式,可以提⾼系统的性能和吞吐量,今天我们就来看看该模式的⼏种实现⽅式,还有其在电商库存中的应
⽤。
Object的wait/notify/notifyAll实现⽣产者消费者
在第16讲中,我就曾介绍过使⽤Object的wait/notify/notifyAll实现⽣产者消费者模式,这种⽅式是基于Object的
wait/notify/notifyAll与对象监视器(Monitor)实现线程间的等待和通知。
还有,在第12讲中我也详细讲解过Monitor的⼯作原理,借此我们可以得知,这种⽅式实现的⽣产者消费者模式是基于内核来实现的,有可能会导致⼤量的上下⽂切换,所以性能并不是最理想的。
Lock中Condition的await/signal/signalAll实现⽣产者消费者
相对Object类提供的wait/notify/notifyAll⽅法实现的⽣产者消费者模式,我更推荐使⽤java.util.concurrent包提供的Lock &&
Condition实现的⽣产者消费者模式。
在接⼝Condition类中定义了await/signal/signalAll ⽅法,其作⽤与Object的wait/notify/notifyAll⽅法类似,该接⼝类与显示锁
Lock配合,实现对线程的阻塞和唤醒操作。
我在第13讲中详细讲到了显示锁,显示锁ReentrantLock或ReentrantReadWriteLock都是基于AQS实现的,⽽在AQS中有⼀个内部类ConditionObject实现了Condition接⼝。
我们知道AQS中存在⼀个同步队列(CLH队列),当⼀个线程没有获取到锁时就会进⼊到同步队列中进⾏阻塞,如果被唤醒
后获取到锁,则移除同步队列。
除此之外,AQS中还存在⼀个条件队列,通过addWaiter⽅法,可以将await()⽅法调⽤的线程放⼊到条件队列中,线程进⼊等待状态。当调⽤signal以及signalAll ⽅法后,线程将会被唤醒,并从条件队列中删除,之后进⼊到同步队列中。条件队列是通过⼀个单向链表实现的,所以Condition⽀持多个等待队列。
由上可知,Lock中Condition的await/signal/signalAll实现的⽣产者消费者模式,是基于Java代码层实现的,所以在性能和扩展性⽅⾯都更有优势。
下⾯来看⼀个案例,我们通过⼀段代码来实现⼀个商品库存的⽣产和消费。
public class LockConditionTest {
private LinkedList
private int maxInventory = 10; // 最⼤库存
private Lock lock = new ReentrantLock();// 资源锁
private Condition condition = lock.newCondition();// 库存⾮满和⾮空条件
/**
- 新增商品库存
- @param e
*/
public void produce(String e) { lock.lock();
try {
while (product.size() == maxInventory) { condition.await();
}
product.add(e);
System.out.println(“放⼊⼀个商品库存,总库存为:” + product.size()); condition.signalAll();
} catch (Exception ex) { ex.printStackTrace();
} finally { lock.unlock();
}
}
/**
- 消费商品
- @return
*/
public String consume() { String result = null; lock.lock();
try {
while (product.size() == 0) { condition.await();
}
result = product.removeLast();
System.out.println(“消费⼀个商品,总库存为:” + product.size()); condition.signalAll();
} catch (Exception e) { e.printStackTrace();
} finally { lock.unlock();
}
return result;
}
/**
- ⽣产者
- @author admin
/
private class Producer implements Runnable {
public void run() {
for (int i = 0; i < 20; i++) {
produce(“商品” + i);
}
}
}
/**
- 消费者
- @author admin
/
private class Customer implements Runnable {
public void run() {
for (int i = 0; i < 20; i++) { consume();
}
}
}
public static void main(String[] args) {
LockConditionTest lc = new LockConditionTest(); new Thread(lc.new Producer()).start();
new Thread(lc.new Customer()).start(); new Thread(lc.new Producer()).start(); new Thread(lc.new Customer()).start();
}
}
看完案例,请你思考下,我们对此还有优化的空间吗?
从代码中应该不难发现,⽣产者和消费者都在竞争同⼀把锁,⽽实际上两者没有同步关系,由于Condition能够⽀持多个等待队列以及不响应中断, 所以我们可以将⽣产者和消费者的等待条件和锁资源分离,从⽽进⼀步优化系统并发性能,代码如下:
private LinkedList
private int maxInventory = 10; // 最⼤库存
private Lock consumerLock = new ReentrantLock();// 资源锁
private Lock productLock = new ReentrantLock();// 资源锁
private Condition notEmptyCondition = consumerLock.newCondition();// 库存满和空条件
private Condition notFullCondition = productLock.newCondition();// 库存满和空条件
/**
- 新增商品库存
- @param e
*/
public void produce(String e) { productLock.lock();
try {
while (inventory.get() == maxInventory) { notFullCondition.await();
}
product.add(e);
System.out.println(“放⼊⼀个商品库存,总库存为:” + inventory.incrementAndGet()); if(inventory.get()
}
} catch (Exception ex) { ex.printStackTrace();
} finally { productLock.unlock();
}
if(inventory.get()>0) { try {
consumerLock.lockInterruptibly(); notEmptyCondition.signalAll();
} catch (InterruptedException e1) {
// TODO Auto-generated catch block e1.printStackTrace();
}finally { consumerLock.unlock();
}
}
}
/**
- 消费商品
- @return
*/
public String consume() { String result = null; consumerLock.lock(); try {
while (inventory.get() == 0) { notEmptyCondition.await();
}
result = product.removeLast();
System.out.println(“消费⼀个商品,总库存为:” + inventory.decrementAndGet());
if(inventory.get()>0) { notEmptyCondition.signalAll();
}
} catch (Exception e) { e.printStackTrace();
} finally { consumerLock.unlock();
}
if(inventory.get()<maxInventory) {
try { productLock.lockInterruptibly(); notFullCondition.signalAll();
} catch (InterruptedException e1) {
// TODO Auto-generated catch block e1.printStackTrace();
}finally { productLock.unlock();
}
}
return result;
}
/**
- ⽣产者
- @author admin
/
private class Producer implements Runnable {
public void run() {
for (int i = 0; i < 20; i++) {
produce(“商品” + i);
}
}
}
/**
- 消费者
- @author admin
/
private class Customer implements Runnable {
public void run() {
for (int i = 0; i < 20; i++) { consume();
}
}
}
public static void main(String[] args) {
LockConditionTest2 lc = new LockConditionTest2(); new Thread(lc.new Producer()).start();
new Thread(lc.new Customer()).start();
}
}
我们分别创建 productLock 以及 consumerLock 两个锁资源,前者控制⽣产者线程并⾏操作,后者控制消费者线程并发运
⾏;同时也设置两个条件变量,⼀个是notEmptyCondition,负责控制消费者线程状态,⼀个是notFullCondition,负责控制⽣产者线程状态。这样优化后,可以减少消费者与⽣产者的竞争,实现两者并发执⾏。
我们这⾥是基于LinkedList来存取库存的,虽然LinkedList是⾮线程安全,但我们新增是操作头部,⽽消费是操作队列的尾部, 理论上来说没有线程安全问题。⽽库存的实际数量inventory是基于AtomicInteger(CAS锁)线程安全类实现的,既可以保证 原⼦性,也可以保证消费者和⽣产者之间是可⻅的。
BlockingQueue实现⽣产者消费者
相对前两种实现⽅式,BlockingQueue实现是最简单明了的,也是最容易理解的。
因为BlockingQueue是线程安全的,且从队列中获取或者移除元素时,如果队列为空,获取或移除操作则需要等待,直到队列不为空;同时,如果向队列中添加元素,假设此时队列⽆可⽤空间,添加操作也需要等待。所以BlockingQueue⾮常适合⽤来实现⽣产者消费者模式。还是以⼀个案例来看下它的优化,代码如下:
public class BlockingQueueTest {
private int maxInventory = 10; // 最⼤库存
private BlockingQueue
/**
- 新增商品库存
- @param e
*/
public void produce(String e) { try {
product.put(e);
System.out.println(“放⼊⼀个商品库存,总库存为:” + product.size());
} catch (InterruptedException e1) {
// TODO Auto-generated catch block e1.printStackTrace();
}
}
/**
- 消费商品
- @return
*/
public String consume() { String result = null; try {
result = product.take();
System.out.println(“消费⼀个商品,总库存为:” + product.size());
} catch (InterruptedException e) {
// TODO Auto-generated catch block e.printStackTrace();
}
return result;
}
/**
- ⽣产者
- @author admin
/
private class Producer implements Runnable {
public void run() {
for (int i = 0; i < 20; i++) {
produce(“商品” + i);
}
}
}
/**
- 消费者
- @author admin
/
private class Customer implements Runnable {
public void run() {
for (int i = 0; i < 20; i++) { consume();
}
}
}
public static void main(String[] args) {
BlockingQueueTest lc = new BlockingQueueTest(); new Thread(lc.new Producer()).start();
new Thread(lc.new Customer()).start(); new Thread(lc.new Producer()).start(); new Thread(lc.new Customer()).start();
}
}
在这个案例中,我们创建了⼀个LinkedBlockingQueue,并设置队列⼤⼩。之后我们创建⼀个消费⽅法consume(),⽅法⾥⾯
调⽤LinkedBlockingQueue中的take()⽅法,消费者通过该⽅法获取商品,当队列中商品数量为零时,消费者将进⼊等待状
态;我们再创建⼀个⽣产⽅法produce(),⽅法⾥⾯调⽤LinkedBlockingQueue中的put()⽅法,⽣产⽅通过该⽅法往队列中放商品,如果队列满了,⽣产者就将进⼊等待状态。
⽣产者消费者优化电商库存设计
了解完⽣产者消费者模式的⼏种常⻅实现⽅式,接下来我们就具体看看该模式是如何优化电商库存设计的。
电商系统中经常会有抢购活动,在这类促销活动中,抢购商品的库存实际是存在库存表中的。为了提⾼抢购性能,我们通常会将库存存放在缓存中,通过缓存中的库存来实现库存的精确扣减。在提交订单并付款之后,我们还需要再去扣除数据库中的库存。如果遇到瞬时⾼并发,我们还都去操作数据库的话,那么在单表单库的情况下,数据库就很可能会出现性能瓶颈。
⽽我们库存表如果要实现分库分表,势必会增加业务的复杂度。试想⼀个商品的库存分别在不同库的表中,我们在扣除库存时,⼜该如何判断去哪个库中扣除呢?
如果随意扣除表中库存,那么就会出现有些表已经扣完了,有些表中还有库存的情况,这样的操作显然是不合理的,此时就需要额外增加逻辑判断来解决问题。
在不分库分表的情况下,为了提⾼订单中扣除库存业务的性能以及吞吐量,我们就可以采⽤⽣产者消费者模式来实现系统的性能优化。
创建订单等于⽣产者,存放订单的队列则是缓冲容器,⽽从队列中消费订单则是数据库扣除库存操作。其中存放订单的队列可
以极⼤限度地缓冲⾼并发给数据库带来的压⼒。
我们还可以基于消息队列来实现⽣产者消费者模式,如今RabbitMQ、RocketMQ都实现了事务,我们只需要将订单通过事务提交到MQ中,扣除库存的消费⽅只需要通过消费MQ来逐步操作数据库即可。
总结
使⽤⽣产者消费者模式来缓冲⾼并发数据库扣除库存压⼒,类似这样的例⼦其实还有很多。
例如,我们平时使⽤消息队列来做⾼并发流量削峰,也是基于这个原理。抢购商品时,如果所有的抢购请求都直接进⼊判断是否有库存和冻结缓存库存等逻辑业务中,由于这些逻辑业务操作会增加资源消耗,就可能会压垮应⽤服务。此时,为了保证系统资源使⽤的合理性,我们可以通过⼀个消息队列来缓冲瞬时的⾼并发请求。
⽣产者消费者模式除了可以做缓冲优化系统性能之外,它还可以应⽤在处理⼀些执⾏任务时间⽐较⻓的场景中。
例如导出报表业务,⽤户在导出⼀种⽐较⼤的报表时,通常需要等待很⻓时间,这样的⽤户体验是⾮常差的。通常我们可以固定⼀些报表内容,⽐如⽤户经常需要在今天导出昨天的销量报表,或者在⽉初导出上个⽉的报表,我们就可以提前将报表导出到本地或内存中,这样⽤户就可以在很短的时间内直接下载报表了。
思考题
我们可以⽤⽣产者消费者模式来实现瞬时⾼并发的流量削峰,然⽽这样做虽然缓解了消费⽅的压⼒,但⽣产⽅则会因为瞬时⾼并发,⽽发⽣⼤量线程阻塞。⾯对这样的情况,你知道有什么⽅式可以优化线程阻塞所带来的性能问题吗?
期待在留⾔区看到你的⻅解。也欢迎你点击“请朋友读”,把今天的内容分享给身边的朋友,邀请他⼀起讨论。
精选留⾔ <br />![](https://cdn.nlark.com/yuque/0/2022/png/1852637/1646315754642-d6e34503-0caa-4cee-8bc6-1c3a8858da11.png#)杨俊<br />我的理解是库存放缓存,⽤户提交订单在缓存扣减库存,⽤户端能够快速返回显示订单提交成功并⽀付,然后只有⽀付成功之
后才会利⽤队列实际的扣减数据库库存是吗?要是不⽀付会在缓存补回库存吧,应该会限时⽀付吧
2019-07-30 08:39
作者回复
对的
2019-07-31 11:02
QQ怪
在⽹关层中把请求放⼊到mq中,后端服务从消费队列中消费消息并处理;或者⽤有固定容量的消费队列的令牌桶,令牌发⽣器预估预计的处理能⼒,匀速⽣产放⼊令牌队列中,满了直接丢弃,⽹关收到请求之后消费⼀个令牌,获得令牌的请求才能进⾏后端秒杀请求,反之直接返回秒杀失败
2019-07-30 21:07
作者回复
⼤家都⼀致想到了限流,限流是⾮常必要的,⽆论我们的程序优化的如何,还是有上限的,限流则是⼀种兜底策略。
除了这个,我们还可以使⽤协程来优化线程由于阻塞等待带来的上下⽂切换性能问题,可以回顾第19讲,我们也⽤协程实现过
⽣产者消费者模式。
2019-07-31 10:45
撒旦的堕落
⽹关与服务之间增加令牌桶 或者mq 以保护秒杀服务不会被⼤的流量压垮 可以么
2019-07-30 09:28
作者回复
可以的,很通⽤的⼀种解决⽅案
2019-07-31 10:54
Geek_844248
⽼师,优化ReentrantLock那⾥是不是有问题呢,product的修改放在两个不同的锁下,就是说可能会同时有两个线程会修改pro
duct这个list,这样是否违反了有序性。
⽽且我尝试⽆限循环运⾏⽣产者消费者线程,发现运⾏久了会出错的,希望⽼师讲解⼀下。
2019-08-12 21:08
K
⽼师好,我有个问题,就是实际的inventory会不会超过maxInventory啊?
productLock.lock(); try {
while (inventory.get() == maxInventory){ //3 notFullCondition.await();
}
product.add(e); //1
//producer被唤醒了以后,执⾏完1,还没执⾏2,这个时候时间⽚⽤完了,所以先停⽌了。
//然后另外的线程被唤醒了,在3处的判断逻辑,(上⼀个线程并没有inventory.incr(),所以while条件不满⾜,不循环)
//线程2号执⾏完代码1,2。
//当之前⼀个线程1号醒过来,他也会继续执⾏代码2。这不是相当于,实际的inventory 肯定超过了maxInventory吗?
System.out.println(“ 放⼊⼀个商品库存,总库存为:” + inventory.incrementAndGet()); //2
//后⾯的逻辑
…
2019-08-11 17:20
作者回复
这有⼀个lock锁,不会同时进来两个线程。
2019-08-12 09:29
怎☞劰☜叻
⽼师,我看到你上⾯说⽤协程来优化!我们这边有个服务属于业务⽹关,要聚合多个下有的数据,涉及⼤量的⽹络io,之前是
使⽤多线程并⾏调⽤多个下有,现在发现线程越来越多,遇到了瓶颈!希望⽤协程来改进⽅案~但是我在⽹上找到的⼀些java
协程开源组件,⽂档和⽣态都不是很健全,希望⽼师能给出⼀些建议~ ⾮常感谢
2019-08-09 09:45
Aaron
商品从数据库压⼊redis缓存。
同时库存压⼊redis,⽤商品ID作为key,⽤list模拟队列【1001,1001,1001】⽤商品 做队列元素,100件库存,那就存1
00个 在队列中,扣库存的时候,判断队列⼤⼩,可以防⽌超卖。所有都是靠redis的单线程原⼦操作保证,可⾏不
2019-08-03 08:45
晓杰
请问⽼师在分布式架构中,使⽤lock和blockqueue实现⽣产者消费者是不是不适⽤了
2019-07-31 21:09
作者回复
是的,可以基于消息队列或redis缓存来实现。
2019-08-02 11:02
JasonK
你好,刘⽼师,最近⽣产上⼀个服务,⽼是半夜cpu飙升,导致服务死掉,排查问题看了下,都是GC task thread#15 (Parallel
GC) 线程占⽤CPU资源,这是为什么?⽽且同样的服务我布了两台机器,⼀台服务会死掉,⼀台不会。请⽼师解惑。
2019-07-31 14:42
作者回复
导致CPU飙升只是⼀个性能的直接表现,是不是有对象⼀直在创建,所以导致⼀直在GC。建议打开dump⽇志查看具体的内存使⽤情况以及对象的创建分布情况。
2019-08-06 10:01
Jxin
1.⽣产消费模式⽤信号量也能玩。
2.⽣产者这边的优化思路应该是提⾼响应速度和增加资源。提⾼响应速度就是尽量降低⽣产逻辑的耗时,增加资源就是根据业务量为该⽣产者单独线程池并调整线程数。⾄于限流和令牌桶感觉都是降级处理,属于规避阻塞场景⽽⾮解决阻塞场景,应该不在答案范围内吧。
3.对于进程内⽣产消费模式,⼤规模,量⼤的数据本身就不适合,毕竟内存空间有限,消息堆积有限,所以量级达到⼀定指标就采⽤跨进程⽅式,⽐如kafka和rocketmq。同时,进程内⽣产消费模式,异常要处理好,不然可能会出现消息堆积和脏数据
,毕竟mq的消费确认和重试机制都是开箱即⽤,⽽我们得⾃⼰实现和把关。
2019-07-31 09:46
作者回复
看来有实战经验
2019-08-02 10:30
我已经设置了昵称
kafka也有事务消息
2019-07-31 08:34
作者回复
是的
2019-08-02 10:28
nightmare
可以在⽹关通过令牌桶算法限流,真正执⾏的⽣产者⼀⽅使⽤线程池来优化
2019-07-30 21:25
作者回复
限流是⼀种⽅式,线程池其实也是⼀种限流⼿段。我们在之前协程这⼀讲中,其实也⽤协程代替线程实现了⽣产者消费者模式
,这也不乏是⼀种优化⽅式。
2019-07-31 10:41
明翼
⽼师,⽣产者和消费者的锁分开没问题吗?都是⽤的同⼀个队列?
2019-07-30 21:23
作者回复
这⾥同步更新下,新增了以下代码作为实时库存:
private AtomicInteger inventory = new AtomicInteger(0);
我们这⾥是基于LinkedList来存取库存的,虽然LinkedList是⾮线程安全,但我们新增是操作头部,⽽消费则是操作队列的尾部
,理论上来说没有线程安全问题。⽽库存的实际数量inventory是基于AtomicInteger(CAS锁)线程安全类实现,即可以保证原
⼦性,也可以保证消费者和⽣产者之间是可⻅的。
2019-07-31 10:36
正在减肥的胖籽。
1.如果把库存放到缓存中,下订单去缓存扣减库存数量,如何是保证数据⼀致性?希望⽼师能详细讲解下?
2019-07-30 12:29
作者回复
⽐较常⽤的⼿段就是使⽤分布式锁来实现,在40讲中我们会详细介绍。
2019-07-31 10:47
刘章周
对于⽂中的案例,可以⽤tryLock⽅法,给定⼀个超时时间,超过时间还未获取到锁,就返回⼀个错误提示信息。
2019-07-30 09:30
作者回复
嗯,可以通过超时来避免⻓时间阻塞等待。
2019-07-31 10:53
-W.LI-
课后习题:⽣产者也⽤MQ缓冲,在接⼊层做限流控制流量。客户端增加验证码等操作,防刷。服务器的计算能⼒,最⼤吞吐量是有限的。真要⼀直那么⼤量就只能加服务器了,只是瞬时就⽤MQ做流量削峰,或者提⾼⽤户⻔槛客户端限制减少⽆效请求( 各种纬度进⾏控制)。只能想到这么点了
⽣产者消费者模型作⽤:
1.⽣产者和消费者解耦
2.通过缓冲,削峰,
3.⽣产者和消费吞吐量分别调控(⽣产者少只加⽣产者就好)
4….
2019-07-30 08:32
作者回复
思考的很全⾯
2019-07-31 11:07
⽊⽊匠
⽣产⽅的⾼并发优化,我们可以参考下tomcat的设计,tomcat设计了线程池来进⾏请求接收,有最⼩线程数,最⼤线程数,同时还有⼀个有界的⼯作队列,来接收超过线程数的请求,当⼯作队列满了后可以选择拒绝或者丢弃处理。
2019-07-30 08:28
作者回复
赞,很好的参考例⼦
2019-07-31 11:03
undifined
⽣产⽅可以使⽤异步的⽅式,收到请求后直接发出 MQ,同时应当添加限流熔断等保护措施
⽼师我还有⼀个问题,我们需要向第三⽅平台同步库存,但是第三⽅有限流,⽽且 API 收费,⽬前是将商品 ID放到⼀个延时队列,如果有相同的 ID,⼀分钟内只会同步⼀次,⽼师有更好的办法吗,谢谢⽼师
2019-07-30 08:24
作者回复
我理解的这个应该是库存的对实时性要求不⾼吧。如果对库存的实时性要求不⾼,可以建议第三⽅平台开放批量更新库存接⼝
,并且固定⼀个更新周期。
2019-08-06 09:55
-W.LI-
⽼师好,⽣产者消费者模型⾥⾯,分开两个锁的时候 贡献变量private LinkedList
需要+volatile保证可⻅性么?,条件判断等值判断好还是范围判断好?
2019-07-30 08:17
作者回复
这⾥同步更新下,新增了以下代码作为实时库存:
private AtomicInteger inventory = new AtomicInteger(0);
我们这⾥是基于LinkedList来存取库存的,虽然LinkedList是⾮线程安全,但我们新增是操作头部,⽽消费则是操作队列的尾部
,理论上来说没有线程安全问题。⽽库存的实际数量inventory是基于AtomicInteger(CAS锁)线程安全类实现,即可以保证原
⼦性,也可以保证消费者和⽣产者之间是可⻅的。
2019-07-31 10:35
Zed
我觉得可以通过线程池控制限流。
2019-07-30 07:37
作者回复
是的,限流是⼀种优化⽅式
2019-07-31 09:26