队列
阻塞队列
- 阻塞队列有没有好的一面?
- 不得不阻塞,你如何管理?
定义
阻塞队列,首先是一个队列,而一个阻塞队列在数据结构中所起的作用大概如图所示:
- 当阻塞队列是空的时候,从队列中获取元素的操作将会被阻塞
- 当阻塞队列是满的时候,往队列中添加元素的操作将会被阻塞
- 试图从空的阻塞队列中获取元素将会被阻塞,直到其他线程往空的队列插入显得元素,同比插入元素。
什么好处?
在多线程领域,所谓阻塞,在某些情况下会挂起线程,即阻塞,一旦条件满足,被挂起的线程又会自动被唤醒。
为什么需要阻塞队列?不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切被BlockingQueue一手包办了。也不需要考虑效率和线程安全。
架构梳理和种类分析
- 红色重点关注 | 分类 | 说明 | | —- | —- | | ArrayBlockingQueue | 是一个基于数组结构的有界阻塞队列,此队列按FIFO原则对元素进行操作。 | | LinkedBlockingQueue | 基于链表结构的有界阻塞队列,按照FIFO排序元素,吞吐量通常高于ArrayBlockingQueue.但是大小是Integer.MAX_VALUE | | PriorityBlockingQueue | 支持优先级排序的无界阻塞队列 | | DelayQueue | 使用优先级队列实现的延迟无界阻塞队列 | | LinkedTransferQueue | 由链表结构组成的无界阻塞队列 | | SynchrobousBlockingQueue | 一种不存储元素的阻塞队列,每个插入操作必须等待另外一个线程调用移除操作,否则插入操作一直处于阻塞状态 | | LinkedBlockingDueue | 由链表结构组成的双向阻塞队列。 |
核心方法


element检查队首元素是谁。队列先进先出,队首元素。
package com.interview.demo;import java.util.concurrent.BlockingQueue;import java.util.concurrent.SynchronousQueue;import java.util.concurrent.TimeUnit;/*** @Author leijs* @date 2022/3/29*/public class SynchronousQueueDemo {public static void main(String[] args) {BlockingQueue<String> blockingQueue = new SynchronousQueue<>();new Thread(() -> {try {System.out.println(Thread.currentThread().getName() + "生产a");blockingQueue.put("a");System.out.println(Thread.currentThread().getName() + "生产b");blockingQueue.put("b");System.out.println(Thread.currentThread().getName() + "生产c");blockingQueue.put("c");} catch (InterruptedException e) {e.printStackTrace();}}, "A").start();new Thread(() -> {try {System.out.println(Thread.currentThread().getName() + "拿走a");blockingQueue.take();TimeUnit.SECONDS.sleep(2);System.out.println(Thread.currentThread().getName() + "拿走b");blockingQueue.take();TimeUnit.SECONDS.sleep(2);System.out.println(Thread.currentThread().getName() + "拿走c");blockingQueue.take();} catch (InterruptedException e) {e.printStackTrace();}}, "B").start();}}
生产者消费者模式
传统版本
package com.interview.demo.producer_consumer;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;/*** 一个初始值为0的变量,两个线程交替操作,一个加1,一个减1,来5轮** @Author leijs* @date 2022/3/29*/public class ProdConsumerTraditionDemo {public static void main(String[] args) {ShareData shareData = new ShareData();new Thread(() -> {for (int i = 0; i < 5; i++) {shareData.increment();}}, "t1").start();new Thread(() -> {for (int i = 0; i < 5; i++) {shareData.decrement();}}, "t2").start();}}class ShareData {private int num = 0;private Lock lock = new ReentrantLock();private Condition condition = lock.newCondition();public void increment() {lock.lock();try {while (num != 0) {// 等待,不能生产condition.await();}num++;System.out.println(Thread.currentThread().getName() + ":" + num);condition.signalAll();} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}public void decrement() {lock.lock();try {// 注意多线程的判断不能用if只能使用whilewhile (num == 0) {// 等待,不能生产condition.await();}num--;System.out.println(Thread.currentThread().getName() + ":" + num);condition.signalAll();} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}}
BlockingQueue实现
package com.interview.demo.producer_consumer;import org.apache.commons.lang3.StringUtils;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicInteger;/*** @Author leijs* @date 2022/3/29*/public class ProdConsumerBlockingQueueDemo {public static void main(String[] args) throws InterruptedException {MyResource myResource = new MyResource(new ArrayBlockingQueue<>(10));new Thread(() -> {try {myResource.produce();} catch (Exception e) {e.printStackTrace();}}, "producer").start();new Thread(() -> {try {myResource.consumer();} catch (Exception e) {e.printStackTrace();}}, "consumer").start();TimeUnit.SECONDS.sleep(5);System.out.println("主线程叫停了");myResource.stop();}}class MyResource {/*** 默认开启进行生产+消费*/private volatile boolean FLAG = true;private AtomicInteger atomicInteger = new AtomicInteger();BlockingQueue<String> blockingQueue = null;public MyResource(BlockingQueue<String> blockingQueue) {this.blockingQueue = blockingQueue;System.out.println(blockingQueue.getClass().getName());}public void produce() throws Exception {String data;boolean returnValue;while (FLAG) {data = atomicInteger.incrementAndGet() + "";returnValue = blockingQueue.offer(data, 2L, TimeUnit.SECONDS);if (returnValue) {System.out.println(Thread.currentThread().getName() + "插入队列成功" + data);} else {System.out.println(Thread.currentThread().getName() + "插入队列失败" + data);}TimeUnit.SECONDS.sleep(1);}System.out.println("生产动作结束,flag=false");}public void consumer() throws Exception {String data;while (FLAG) {data = blockingQueue.poll(2L, TimeUnit.SECONDS);if (StringUtils.isEmpty(data)) {FLAG = false;System.out.println(Thread.currentThread().getName() + "超过两秒钟没有获取数据,退出");return;}System.out.println(Thread.currentThread().getName() + "消费队列成功:" + data);}System.out.println("消费动作结束");}public void stop() {this.FLAG = false;}}
扩展
Synchronized和Lock的区别
synchronized是关键字,属于JVM层面。
monitorenter(底层是通过Monitor对象来完成的,其实wait/notify)等方法也依赖monitor对象,只有在同步代表快中才能调用wait/notify等方法<br /> monitorexit<br /> Lock是具体类,是API层面的
使用方法
synchronized 不需要手动改释放锁,代码执行完系统会自动让线程释放对锁的占用。<br /> ReentrantLock则需要用户去手动释放锁,如果没有主动释放锁,就可能出现死锁现象<br /> lock和unlock方法配合try...finally来完成
等待是否可中断
synchronized 是不可中断的,除非抛出异常或者正常运行完成<br /> lock可中断。可以设置超时方法或者lockInterruptibly
加锁是否公平
synchronized 只能是非公平锁<br /> lock两者都可以,构造函数传入true/false
锁绑定多个对象Condition
Synchronized没有<br /> Reentrant用来实现分组唤醒需要唤醒的线程们,可以精确唤醒,而不是像Synchronized那样随机唤醒一个线程或者唤醒全部线程。
多个线程按照顺序打印
package com.interview.demo;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;/*** A 打印1次,B打印2次,C打印3次* 循环5次** @Author leijs* @date 2022/3/29*/public class SyncAndReentrantLockDemo {public static void main(String[] args) {ShareData shareData = new ShareData();new Thread(() -> {for (int i = 0; i < 5; i++) {shareData.print1();}}, "A").start();new Thread(() -> {for (int i = 0; i < 5; i++) {shareData.print2();}}, "B").start();new Thread(() -> {for (int i = 0; i < 5; i++) {shareData.print3();}}, "C").start();}}class ShareData {// A:1, B:2, C:3private int num = 1;// 判断 -> 干活 -> 通知唤醒private Lock lock = new ReentrantLock();private Condition c1 = lock.newCondition();private Condition c2 = lock.newCondition();private Condition c3 = lock.newCondition();public void print1() {lock.lock();try {while (num != 1) {c1.await();}printByTimes(1);num = 2;c2.signal();} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}public void print2() {lock.lock();try {while (num != 2) {c2.await();}printByTimes(2);num = 3;c3.signal();} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}public void print3() {lock.lock();try {while (num != 3) {c3.await();}printByTimes(3);num = 1;c1.signal();} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}private void printByTimes(int times) {for (int i = 0; i < times; i++) {System.out.println(Thread.currentThread().getName() + "打印" + i);}}}
callable
package com.interview.demo;import java.util.concurrent.Callable;import java.util.concurrent.ExecutionException;import java.util.concurrent.FutureTask;/*** @Author leijs* @date 2022/3/29*/public class CallableDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {FutureTask<Integer> futureTask = new FutureTask<>(new MyThread());Thread thread = new Thread(futureTask, "AA");thread.start();System.out.println("*********** result:" + futureTask.get());}}class MyThread implements Callable<Integer> {@Overridepublic Integer call() throws Exception {return 1024;}}
futureTask.get()建议放在最后。如果没有计算完成就要去强求,就会导致阻塞,值得计算完成。
