队列
阻塞队列
- 阻塞队列有没有好的一面?
- 不得不阻塞,你如何管理?
定义
阻塞队列,首先是一个队列,而一个阻塞队列在数据结构中所起的作用大概如图所示:
- 当阻塞队列是空的时候,从队列中获取元素的操作将会被阻塞
- 当阻塞队列是满的时候,往队列中添加元素的操作将会被阻塞
- 试图从空的阻塞队列中获取元素将会被阻塞,直到其他线程往空的队列插入显得元素,同比插入元素。
什么好处?
在多线程领域,所谓阻塞,在某些情况下会挂起线程,即阻塞,一旦条件满足,被挂起的线程又会自动被唤醒。
为什么需要阻塞队列?不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切被BlockingQueue一手包办了。也不需要考虑效率和线程安全。
架构梳理和种类分析
- 红色重点关注 | 分类 | 说明 | | —- | —- | | ArrayBlockingQueue | 是一个基于数组结构的有界阻塞队列,此队列按FIFO原则对元素进行操作。 | | LinkedBlockingQueue | 基于链表结构的有界阻塞队列,按照FIFO排序元素,吞吐量通常高于ArrayBlockingQueue.但是大小是Integer.MAX_VALUE | | PriorityBlockingQueue | 支持优先级排序的无界阻塞队列 | | DelayQueue | 使用优先级队列实现的延迟无界阻塞队列 | | LinkedTransferQueue | 由链表结构组成的无界阻塞队列 | | SynchrobousBlockingQueue | 一种不存储元素的阻塞队列,每个插入操作必须等待另外一个线程调用移除操作,否则插入操作一直处于阻塞状态 | | LinkedBlockingDueue | 由链表结构组成的双向阻塞队列。 |
核心方法
element检查队首元素是谁。队列先进先出,队首元素。
package com.interview.demo;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
/**
* @Author leijs
* @date 2022/3/29
*/
public class SynchronousQueueDemo {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "生产a");
blockingQueue.put("a");
System.out.println(Thread.currentThread().getName() + "生产b");
blockingQueue.put("b");
System.out.println(Thread.currentThread().getName() + "生产c");
blockingQueue.put("c");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "A").start();
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "拿走a");
blockingQueue.take();
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + "拿走b");
blockingQueue.take();
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + "拿走c");
blockingQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "B").start();
}
}
生产者消费者模式
传统版本
package com.interview.demo.producer_consumer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* 一个初始值为0的变量,两个线程交替操作,一个加1,一个减1,来5轮
*
* @Author leijs
* @date 2022/3/29
*/
public class ProdConsumerTraditionDemo {
public static void main(String[] args) {
ShareData shareData = new ShareData();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
shareData.increment();
}
}, "t1").start();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
shareData.decrement();
}
}, "t2").start();
}
}
class ShareData {
private int num = 0;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void increment() {
lock.lock();
try {
while (num != 0) {
// 等待,不能生产
condition.await();
}
num++;
System.out.println(Thread.currentThread().getName() + ":" + num);
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void decrement() {
lock.lock();
try {
// 注意多线程的判断不能用if只能使用while
while (num == 0) {
// 等待,不能生产
condition.await();
}
num--;
System.out.println(Thread.currentThread().getName() + ":" + num);
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
BlockingQueue实现
package com.interview.demo.producer_consumer;
import org.apache.commons.lang3.StringUtils;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @Author leijs
* @date 2022/3/29
*/
public class ProdConsumerBlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
MyResource myResource = new MyResource(new ArrayBlockingQueue<>(10));
new Thread(() -> {
try {
myResource.produce();
} catch (Exception e) {
e.printStackTrace();
}
}, "producer").start();
new Thread(() -> {
try {
myResource.consumer();
} catch (Exception e) {
e.printStackTrace();
}
}, "consumer").start();
TimeUnit.SECONDS.sleep(5);
System.out.println("主线程叫停了");
myResource.stop();
}
}
class MyResource {
/**
* 默认开启进行生产+消费
*/
private volatile boolean FLAG = true;
private AtomicInteger atomicInteger = new AtomicInteger();
BlockingQueue<String> blockingQueue = null;
public MyResource(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
System.out.println(blockingQueue.getClass().getName());
}
public void produce() throws Exception {
String data;
boolean returnValue;
while (FLAG) {
data = atomicInteger.incrementAndGet() + "";
returnValue = blockingQueue.offer(data, 2L, TimeUnit.SECONDS);
if (returnValue) {
System.out.println(Thread.currentThread().getName() + "插入队列成功" + data);
} else {
System.out.println(Thread.currentThread().getName() + "插入队列失败" + data);
}
TimeUnit.SECONDS.sleep(1);
}
System.out.println("生产动作结束,flag=false");
}
public void consumer() throws Exception {
String data;
while (FLAG) {
data = blockingQueue.poll(2L, TimeUnit.SECONDS);
if (StringUtils.isEmpty(data)) {
FLAG = false;
System.out.println(Thread.currentThread().getName() + "超过两秒钟没有获取数据,退出");
return;
}
System.out.println(Thread.currentThread().getName() + "消费队列成功:" + data);
}
System.out.println("消费动作结束");
}
public void stop() {
this.FLAG = false;
}
}
扩展
Synchronized和Lock的区别
synchronized是关键字,属于JVM层面。
monitorenter(底层是通过Monitor对象来完成的,其实wait/notify)等方法也依赖monitor对象,只有在同步代表快中才能调用wait/notify等方法<br /> monitorexit<br /> Lock是具体类,是API层面的
使用方法
synchronized 不需要手动改释放锁,代码执行完系统会自动让线程释放对锁的占用。<br /> ReentrantLock则需要用户去手动释放锁,如果没有主动释放锁,就可能出现死锁现象<br /> lock和unlock方法配合try...finally来完成
等待是否可中断
synchronized 是不可中断的,除非抛出异常或者正常运行完成<br /> lock可中断。可以设置超时方法或者lockInterruptibly
加锁是否公平
synchronized 只能是非公平锁<br /> lock两者都可以,构造函数传入true/false
锁绑定多个对象Condition
Synchronized没有<br /> Reentrant用来实现分组唤醒需要唤醒的线程们,可以精确唤醒,而不是像Synchronized那样随机唤醒一个线程或者唤醒全部线程。
多个线程按照顺序打印
package com.interview.demo;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* A 打印1次,B打印2次,C打印3次
* 循环5次
*
* @Author leijs
* @date 2022/3/29
*/
public class SyncAndReentrantLockDemo {
public static void main(String[] args) {
ShareData shareData = new ShareData();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
shareData.print1();
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
shareData.print2();
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
shareData.print3();
}
}, "C").start();
}
}
class ShareData {
// A:1, B:2, C:3
private int num = 1;
// 判断 -> 干活 -> 通知唤醒
private Lock lock = new ReentrantLock();
private Condition c1 = lock.newCondition();
private Condition c2 = lock.newCondition();
private Condition c3 = lock.newCondition();
public void print1() {
lock.lock();
try {
while (num != 1) {
c1.await();
}
printByTimes(1);
num = 2;
c2.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void print2() {
lock.lock();
try {
while (num != 2) {
c2.await();
}
printByTimes(2);
num = 3;
c3.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void print3() {
lock.lock();
try {
while (num != 3) {
c3.await();
}
printByTimes(3);
num = 1;
c1.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
private void printByTimes(int times) {
for (int i = 0; i < times; i++) {
System.out.println(Thread.currentThread().getName() + "打印" + i);
}
}
}
callable
package com.interview.demo;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
/**
* @Author leijs
* @date 2022/3/29
*/
public class CallableDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<Integer> futureTask = new FutureTask<>(new MyThread());
Thread thread = new Thread(futureTask, "AA");
thread.start();
System.out.println("*********** result:" + futureTask.get());
}
}
class MyThread implements Callable<Integer> {
@Override
public Integer call() throws Exception {
return 1024;
}
}
futureTask.get()建议放在最后。如果没有计算完成就要去强求,就会导致阻塞,值得计算完成。