阻塞队列
应用场景: pub/sub 模型
例:用户注册成功后,发送优惠券;
public class BlockingDemo {ArrayBlockingQueue<String> ab=new ArrayBlockingQueue(10);//FIFO的队列{init(); //构造块初始化}public void init(){new Thread(()->{while(true) {try {String data = ab.take(); //阻塞方式获得元素System.out.println("receive:" + data);} catch (InterruptedException e) {e.printStackTrace();}}}).start();}public void addData(String data) throws InterruptedException {ab.add(data);System.out.println("sendData:"+data);Thread.sleep(1000);}public static void main(String[] args) throws InterruptedException {BlockingDemo blockingDemo=new BlockingDemo();for(int i=0;i<1000;i++){blockingDemo.addData("data:"+i);}}}
J.U.C中的阻塞队列
java阻塞队列
java8提供了,7中阻塞队列,常用3种如下
| 名称 | 描述 |
|---|---|
| ArrayBlockingQueue | 数组实现的有界阻塞队列, 此队列按照先进先出(FIFO)的原则 对元素进行排序 |
| LinkedBlockingQueue | 链表实现的有界阻塞队列, 此队列的默认和最大长度为 Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行 排序 |
| DelayQueue | 优先级队列实现的无界阻塞队列 |
阻塞队列常用方法
插入操作
boolean offer(e): 添加元素,返回插入成功的状态;唤醒不为null 获取队列;
void add(e) : 添加元素到队尾,如果满了,插入报错,throw new IllegalStateException("Queue full");
void put(e) : 当队列插满以后,会阻塞生产者线程, 直到有元素被移除;
boolean offer(e,timeout,unit): 当阻塞队列满了后继续添加元素,生产者线程会被阻塞指定时间,如果超时,则线程直接退出;
移除操作
boolean remove(): 当队列为null,返回false, 如果队列移除成功,则返回true;
E poll(): 返回takeIndex 元素, 唤醒notFull中的 插入队列, 先获取锁,然后响应中断
E take(): 返回takeIndex 元素, 唤醒notFull中的 插入队列, 先响应中断,获取锁
E poll(time,unit):带超时机制的获取数据,如果队列为空,
则会等待指定的时间再去获取元素返回
ArrayBlockingQueue原理分析
构造方法
public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity <= 0)throw new IllegalArgumentException();this.items = new Object[capacity];lock = new ReentrantLock(fair); // 重入锁,出入队列都用同一把锁notEmpty = lock.newCondition(); // 初始化非null等待队列notFull = lock.newCondition(); // 初始化非满等待队列}
takeIndex、putIndex 分别表示 取索引值、入索引值;takeIndex == count 后,takeIndex = 0,恢复成0,从0开始重新获取;
add()
add()方法会调用父类的add 方法,也就是AbstractQueue, 如果源码看的多的话,一般这种方式都是子类调用父类的模板方法来解决通用型问题;
public boolean add(E e) {return super.add(e);}
public boolean add(E e) { // 父类 abstractQueue 方法if (offer(e))return true;elsethrow new IllegalStateException("Queue full");}
offer()
public boolean offer(E e) {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lock();try {if (count == items.length) // 队列已满,返回return false;else {enqueue(e); // 入队操作return true;}} finally {lock.unlock();}}
enqueue()
private void enqueue(E x) {// assert lock.getHoldCount() == 1;// assert items[putIndex] == null;final Object[] items = this.items;items[putIndex] = x;if (++putIndex == items.length) // 添加角标达到最大值,至0重新开始putIndex = 0;count++;notEmpty.signal();}
put()
put 方法和 add 方法功能一样,差异是 put 方法如果队列满了,会阻塞。这个在最开始的时候说过。接下来看一下它的实现逻辑
public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lockInterruptibly();// 这个也是获取锁,但不同的是,这个方法允许 其他线程调用本线程的interrupt方法来中断等待,直接返回。 而lock方法是尝试获取锁后响应中断;try {while (count == items.length)notFull.await();enqueue(e);} finally {lock.unlock();}}
take()
阻塞式获取队列中的方法,原理:有就删除返回,没有就阻塞;
如果队列中没有数据,就会把当前线程放入notEmpty 等待队列中,有数据后,put会执行notEmpty.signal() 唤醒take线程,执行take()操作,如下图;

take()方法:
public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == 0)notEmpty.await();return dequeue();} finally {lock.unlock();}}
dequeue()方法:
private E dequeue() {// assert lock.getHoldCount() == 1;// assert items[takeIndex] != null;final Object[] items = this.items;@SuppressWarnings("unchecked")E x = (E) items[takeIndex]; // 默认获取0角标元素items[takeIndex] = null;if (++takeIndex == items.length) // 角标最大后置0takeIndex = 0;count--;if (itrs != null)itrs.elementDequeued(); // 更新迭代器中的元素数据notFull.signal(); // 触发 因为队列满以后导致被阻塞的线程;return x;}
原子操作类
原子性这个概念,在多线程编程里是一个老生常谈的问题。所谓的原子性表示一个或者多个操作,要么全部执行完,要么一个也不执行。不能出现成功一部分失败一部分的情况。
在多线程中,如果多个线程同时更新一个共享变量,可能会得到一个意料之外的值。比如 i=1 。 A 线程更新 i+1 、B 线程也更新 i+1
AtomicInteger分析
automic 基于 volatile 和 cas 实现的 乐观锁,保证数据正确性;
incrementAndGet()
/*** Atomically increments by one the current value.** @return the updated value*/public final int incrementAndGet() {return unsafe.getAndAddInt(this, valueOffset, 1) + 1;}
public final int getAndAddInt(Object var1, long var2, int var4) {int var5;do {var5 = this.getIntVolatile(var1, var2);} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));return var5;}// var1: 当前对象 var2:相对当前对象内存地址偏移量 var4:预期值 var5: 更新值public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
get()
private volatile int value; // 使用valatile 保证可见性/*** Gets the current value.** @return the current value*/public final int get() {return value;}
其他
lock()与lockInterruptibly()区别
public class InterruptDemo {public static void main(String[] args) throws InterruptedException {final Lock lock = new ReentrantLock();lock.lock();Thread.sleep(1000);Thread t1 = new Thread(new Runnable(){@Overridepublic void run() {// 1) lock.lock();try {// 2) lock.lockInterruptibly();} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}System.out.println(Thread.currentThread().getName()+" interrupted.");6}});t1.start();Thread.sleep(1000);t1.interrupt();System.out.println("....................");}}
开放1),结果:t1线程持续阻塞;
开放2),结果:t1线程相应中断,抛出异常,执行结束;
