8.1 是什么
阻塞队列,顾名思义,首先它是一个队列,而一个阻塞队列在数据结构中所起的作用大致如下图所示:
- 线程 1 往阻塞队列中添加元素,而线程 2 从阻塞队列中移除元素,当阻塞队列是空时,从队列中获取元素的操作将会被阻塞。
- 当阻塞队列是满时,往队列里添加元素的操作将会被阻塞。
- 试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他的线程往空的队列插入新的元素。同样试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他的线程从列中移除一个或者多个元素或者完全清空队列后使队列重新变得空闲起来并后续新增
8.2 有什么好处
在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤醒
BlockingQueue 好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切 BlockingQueue 都给你一手包办了
在 concurrent 包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。8.3 种类分析
ArrayBlockingQueue:由数组结构组成的有界阴塞队列。
LinkedBlockingQueue:由链表结构组成的有界(但大小默认值为 Integer.MAX_VALUE)阻塞队列。
PriorityBlockingQueue:支持优先级排序的无界阻塞队列。
DelayQueue:使用优先级队列实现的延迟无界阻塞队列。
SynchronousQueue:不存储元素的阻塞队列,也即单个元素的队列。
LinkedTransferQueue:由链表结构组成的无界阻塞队列。
LinkedBlockingDeque:由链表结构组成的双向阻塞队列。8.4 核心方法
| 方法类型 | 抛出异常 | 特殊值 | 阻塞 | 超时 | | —- | —- | —- | —- | —- | | 插入 | add(e) | offer(e) | put(e) | offer(e,time,unit) | | 移除 | remove() | poll() | take() | poll(time,unit) | | 检查 | element() | peek() | 不可用 | 不可用 |
抛出异常 | 当阻塞队列满时,再往队列里 add 插入元素会抛 IllegalStateException: Queue full 当阻塞队列空时,再往队列里 remove 移除元素会抛 NoSuchElementException |
---|---|
特殊值 | 插入方法,成功 ture 失败 false 移除方法,成功返回出队列的元素,队列里面没有就返回 null |
一直阻塞 | 当阻塞队列满时,生产者线程继续往队列里 put 元素,队列会一直阻塞生产线程直到 put 数据 or 响应中断退出。 当阻塞队列空时,消费者线程试图从队列里 take 元素,队列会一直阻塞消费者线程直到队列可用。 |
超时退出 | 当阻塞队列满时,队列会阻塞生产者线程一定时间,超过后限时后生产者线程会退出 |
8.4.1 抛出异常 api
package s02.e08;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BlockingQueueDemo {
public static void main(String[] args) throws Exception {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
System.out.println(blockingQueue.add("x"));
}
}
package s02.e08;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BlockingQueueDemo {
public static void main(String[] args) throws Exception {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
blockingQueue.remove();
blockingQueue.remove();
blockingQueue.remove();
blockingQueue.remove();
}
}
package s02.e08;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BlockingQueueDemo {
public static void main(String[] args) throws Exception {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
System.out.println(blockingQueue.element());
blockingQueue.remove();
blockingQueue.remove();
blockingQueue.remove();
}
}
8.4.2 特殊值 api
package s02.e08;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BlockingQueueDemo {
public static void main(String[] args) throws Exception {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("x"));
}
}
package s02.e08;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BlockingQueueDemo {
public static void main(String[] args) throws Exception {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
System.out.println(blockingQueue.offer("x"));
System.out.println(blockingQueue.peek());
}
}
package s02.e08;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BlockingQueueDemo {
public static void main(String[] args) throws Exception {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
System.out.println(blockingQueue.offer("x"));
System.out.println(blockingQueue.peek());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
}
}
8.4.3 阻塞 api
package s02.e08;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BlockingQueueDemo {
public static void main(String[] args) throws Exception {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
blockingQueue.put("a");
blockingQueue.put("a");
blockingQueue.put("a");
System.out.println("=========================");
blockingQueue.put("x");
}
}
package s02.e08;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BlockingQueueDemo {
public static void main(String[] args) throws Exception {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
blockingQueue.put("a");
blockingQueue.put("a");
blockingQueue.put("a");
System.out.println("=========================");
blockingQueue.take();
blockingQueue.take();
blockingQueue.take();
blockingQueue.take();
}
}
8.4.4 超时 api
package s02.e08;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
public class BlockingQueueDemo {
public static void main(String[] args) throws Exception {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.offer("a", 2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("a", 2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("a", 2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("a", 2L, TimeUnit.SECONDS));
}
}
8.5 SynchronousQueue
SynchronousQueue 没有容量。
与其他 BlockingQueue 不同,SynchronousQueue 是一个不存储元素的 BlockingQueue。
每一个 put 操作必须要等待一个 take 操作,否则不能继续添加元素,反之亦然。
package s02.e08;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
public class SynchronousQueueDemo {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "\t put 1");
blockingQueue.put("1");
System.out.println(Thread.currentThread().getName() + "\t put 2");
blockingQueue.put("2");
System.out.println(Thread.currentThread().getName() + "\t put 3");
blockingQueue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "AAA").start();
new Thread(() -> {
try {
// 暂停一会儿线程
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "\t" + blockingQueue.take());
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "\t" + blockingQueue.take());
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "\t" + blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "BBB").start();
}
}
8.6 用在哪里
- 生产者消费者模式
- 线程池
- 消息中间件
8.6.1 生产者消费者模式——传统版
一个初始值为零的变量,两个线程对其交替操作,一个加 1 一个减 1 ,来 5 轮 ```java package s02.e08;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock;
// 资源类 class ShareData { private int number = 0; private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition();
public void increment() throws Exception {
lock.lock();
try {
// 1 判断
while (number != 0) {
// 等待,不能生产
condition.await();
}
// 2 干活
number++;
System.out.println(Thread.currentThread().getName() + "\t" + number);
// 3 通知唤醒
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void decrement() throws Exception {
lock.lock();
try {
// 1 判断
while (number == 0) {
// 等待,不能生产
condition.await();
}
// 2 干活
number--;
System.out.println(Thread.currentThread().getName() + "\t" + number);
// 3 通知唤醒
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public class ProdConsumer_TraditionDemo { public static void main(String[] args) { ShareData shareData = new ShareData();
new Thread(() -> {
for (int i = 1; i <= 5; i++) {
try {
shareData.increment();
} catch (Exception e) {
e.printStackTrace();
}
}
}, "AA").start();
new Thread(() -> {
for (int i = 1; i <= 5; i++) {
try {
shareData.decrement();
} catch (Exception e) {
e.printStackTrace();
}
}
}, "BB").start();
}
}
![image.png](https://cdn.nlark.com/yuque/0/2022/png/390086/1644769968129-3644d405-d7c9-4ee2-af70-a840d903bb03.png#clientId=ube101e58-d1ee-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=216&id=u80484c9d&margin=%5Bobject%20Object%5D&name=image.png&originHeight=216&originWidth=69&originalType=binary&ratio=1&rotation=0&showTitle=false&size=1667&status=done&style=none&taskId=u24fadda7-1d29-48e1-8c75-03990b8396e&title=&width=69)<br />多线程的判断一定要用 while 判断,不能用 if 判断,否则会产生虚假唤醒<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/390086/1644770372605-773e4676-558f-454a-b0e0-263a86167c5e.png#clientId=ube101e58-d1ee-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=202&id=ucc0bf973&margin=%5Bobject%20Object%5D&name=image.png&originHeight=202&originWidth=753&originalType=binary&ratio=1&rotation=0&showTitle=false&size=79637&status=done&style=none&taskId=ue9c80a50-8059-47d5-9c38-23f81e7d6ef&title=&width=753)
```java
package s02.e08;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
// 资源类
class ShareData {
private int number = 0;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void increment() throws Exception {
lock.lock();
try {
// 1 判断
if (number != 0) {
// 等待,不能生产
condition.await();
}
// 2 干活
number++;
System.out.println(Thread.currentThread().getName() + "\t" + number);
// 3 通知唤醒
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void decrement() throws Exception {
lock.lock();
try {
// 1 判断
if (number == 0) {
// 等待,不能生产
condition.await();
}
// 2 干活
number--;
System.out.println(Thread.currentThread().getName() + "\t" + number);
// 3 通知唤醒
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public class ProdConsumer_TraditionDemo {
public static void main(String[] args) {
ShareData shareData = new ShareData();
new Thread(() -> {
for (int i = 1; i <= 5; i++) {
try {
shareData.increment();
} catch (Exception e) {
e.printStackTrace();
}
}
}, "AA").start();
new Thread(() -> {
for (int i = 1; i <= 5; i++) {
try {
shareData.decrement();
} catch (Exception e) {
e.printStackTrace();
}
}
}, "BB").start();
new Thread(() -> {
for (int i = 1; i <= 5; i++) {
try {
shareData.increment();
} catch (Exception e) {
e.printStackTrace();
}
}
}, "CC").start();
new Thread(() -> {
for (int i = 1; i <= 5; i++) {
try {
shareData.decrement();
} catch (Exception e) {
e.printStackTrace();
}
}
}, "DD").start();
}
}
8.6.2 Synchronized 和 Lock 有什么区别
- 原始构成
synchronized 是关键字属于 JVM 层面
monitorenter(底层是通过 monitor 对象来完成,其实 wait/notify 等方法也依赖于 monitor 对象只有在同步块或方法中才能调 wait/notify 等方法)
monitorexit
Lock 是具体类(java.util.concurrent.Locks.Lock)是 api 层面的锁
package s02.e08;
import java.util.concurrent.locks.ReentrantLock;
public class SyncAndReentrantLockDemo {
public static void main(String[] args) {
synchronized (new Object()) {
}
new ReentrantLock();
}
}
出现两个 monitorexit 是为了保证不会死锁
- 使用方法
synchronized 不需要用户去手动释放锁,当 synchronized 代码执行完后系统会自动让线程释放对锁的占用
ReentrantLock 则需要用户去手动释放锁若没有主动释放锁,就有可能导致出现死锁现象
需要 lock( ) 和 unlock() 方法配合 try/finally 语句块来完成
- 等待是否可中断
synchronized 不可中断,除非抛出异常或者正常运行完成
ReentrantLock 可中断
- 设置超时方法 tryLock(Long timeout,TimeUnit unit)
- LockInterruptibly() 放代码块中,调用 interrupt() 方法可中断
- 加锁是否公平
synchronized 非公平锁
ReentrantLock 两者都可以,默认非公平锁,构造方法可以传入 boolean 值,true 为公平锁,false 为非公平锁
- 锁绑定多个条件 condition
synchronized 没有
ReentrantLock 用来实现分组唤醒需要唤醒的线程们,可以精确唤醒,而不是像 synchronized 要么随机唤醒一个线程要么唤醒全部线程。
题目:多线程之间按顺序调用,实现 A->B->C 三个线程启动,要求如下
AA 打印 5 次,BB 打印 10 次,CC 打印 15 次,紧接着 AA 打印 5 次,BB 打印 10次,CC 打印 15 次,来10轮
package s02.e08;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class ShareResource {
private int number = 1; //A:1 B:2 c:3
private Lock lock = new ReentrantLock();
private Condition c1 = lock.newCondition();
private Condition c2 = lock.newCondition();
private Condition c3 = lock.newCondition();
public void print5() {
lock.lock();
try {
// 1. 判断
while (number != 1) {
c1.await();
}
// 2. 干活
for (int i = 1; i <= 5; i++) {
System.out.println(Thread.currentThread().getName() + "\t" + i);
}
// 3. 通知
number = 2;
c2.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void print10() {
lock.lock();
try {
// 1. 判断
while (number != 2) {
c2.await();
}
// 2. 干活
for (int i = 1; i <= 10; i++) {
System.out.println(Thread.currentThread().getName() + "\t" + i);
}
// 3. 通知
number = 3;
c3.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void print15() {
lock.lock();
try {
// 1. 判断
while (number != 3) {
c3.await();
}
// 2. 干活
for (int i = 1; i <= 15; i++) {
System.out.println(Thread.currentThread().getName() + "\t" + i);
}
// 3. 通知
number = 1;
c1.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public class SyncAndReentrantLockDemo {
public static void main(String[] args) {
ShareResource shareResource = new ShareResource();
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
shareResource.print5();
}
}, "A").start();
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
shareResource.print10();
}
}, "B").start();
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
shareResource.print15();
}
}, "C").start();
}
}
8.6.3 线程通信之生产者消费者阻塞队列版
package s02.e08;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
class MyResource {
private volatile boolean FLAG = true; // 默认开启,进行生产+消费
private AtomicInteger atomicInteger = new AtomicInteger();
BlockingQueue<String> blockingQueue = null;
public MyResource(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
System.out.println(blockingQueue.getClass().getName());
}
public void myprod() throws Exception {
String data = null;
boolean retvalue;
while (FLAG) {
data = atomicInteger.incrementAndGet() + "";
retvalue = blockingQueue.offer(data, 2L, TimeUnit.SECONDS);
if (retvalue) {
System.out.println(Thread.currentThread().getName() + "\t插入队列" + data + "成功");
} else {
System.out.println(Thread.currentThread().getName() + "\t插入队列" + data + "失败");
}
TimeUnit.SECONDS.sleep(1);
}
System.out.println(Thread.currentThread().getName() + "\t大老板叫停了,表示FLAG=false,生产动作结束");
}
public void myConsumer() throws Exception {
String result = null;
while (FLAG) {
result = blockingQueue.poll(2L, TimeUnit.SECONDS);
if (null == result || result.equalsIgnoreCase("")) {
FLAG = false;
System.out.println(Thread.currentThread().getName() + "\t超过2秒钟没有取到蛋糕,消费退出");
System.out.println();
System.out.println();
return;
}
System.out.println(Thread.currentThread().getName() + "\t消费队列蛋糕" + result + "成功");
}
}
public void stop() throws Exception {
this.FLAG = false;
}
}
/**
* volatile/cAs/atomicInteger/BLockQueue/线程交互/原子引用
*/
public class Prodconsumer_BlockQueueDemo {
public static void main(String[] args) throws Exception {
MyResource myResource = new MyResource(new ArrayBlockingQueue<>(10));
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t生产线程启动");
try {
myResource.myprod();
} catch (Exception e) {
e.printStackTrace();
}
}, "Prod").start();
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t 消费线程启动");
System.out.println();
System.out.println();
try {
myResource.myConsumer();
System.out.println();
System.out.println();
} catch (Exception e) {
e.printStackTrace();
}
}, "Consumer").start();
// 暂停一会儿线程
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println();
System.out.println();
System.out.println();
System.out.println("5秒钟时间到,大老板main线程叫停,活动结束");
myResource.stop();
}
}