阻塞队列
应用场景: pub/sub 模型
例:用户注册成功后,发送优惠券;
public class BlockingDemo {
ArrayBlockingQueue<String> ab=new ArrayBlockingQueue(10);//FIFO的队列
{
init(); //构造块初始化
}
public void init(){
new Thread(()->{
while(true) {
try {
String data = ab.take(); //阻塞方式获得元素
System.out.println("receive:" + data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
public void addData(String data) throws InterruptedException {
ab.add(data);
System.out.println("sendData:"+data);
Thread.sleep(1000);
}
public static void main(String[] args) throws InterruptedException {
BlockingDemo blockingDemo=new BlockingDemo();
for(int i=0;i<1000;i++){
blockingDemo.addData("data:"+i);
}
}
}
J.U.C中的阻塞队列
java阻塞队列
java8提供了,7中阻塞队列,常用3种如下
名称 | 描述 |
---|---|
ArrayBlockingQueue | 数组实现的有界阻塞队列, 此队列按照先进先出(FIFO)的原则 对元素进行排序 |
LinkedBlockingQueue | 链表实现的有界阻塞队列, 此队列的默认和最大长度为 Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行 排序 |
DelayQueue | 优先级队列实现的无界阻塞队列 |
阻塞队列常用方法
插入操作
boolean offer(e): 添加元素,返回插入成功的状态;唤醒不为null 获取队列;
void add(e) : 添加元素到队尾,如果满了,插入报错,throw new IllegalStateException("Queue full")
;
void put(e) : 当队列插满以后,会阻塞
生产者线程, 直到有元素被移除;
boolean offer(e,timeout,unit): 当阻塞队列满了后继续添加元素,生产者线程会被阻塞指定时间,如果超时,则线程直接退出;
移除操作
boolean remove(): 当队列为null,返回false, 如果队列移除成功,则返回true;
E poll(): 返回takeIndex 元素, 唤醒notFull中的 插入队列, 先获取锁,然后响应中断
E take(): 返回takeIndex 元素, 唤醒notFull中的 插入队列, 先响应中断,获取锁
E poll(time,unit):带超时机制的获取数据,如果队列为空,
则会等待指定的时间再去获取元素返回
ArrayBlockingQueue原理分析
构造方法
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair); // 重入锁,出入队列都用同一把锁
notEmpty = lock.newCondition(); // 初始化非null等待队列
notFull = lock.newCondition(); // 初始化非满等待队列
}
takeIndex、putIndex 分别表示 取索引值、入索引值;takeIndex == count 后,takeIndex = 0,恢复成0,从0开始重新获取;
add()
add()方法会调用父类的add 方法,也就是AbstractQueue, 如果源码看的多的话,一般这种方式都是子类调用父类
的模板方法
来解决通用型问题
;
public boolean add(E e) {
return super.add(e);
}
public boolean add(E e) { // 父类 abstractQueue 方法
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
offer()
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length) // 队列已满,返回
return false;
else {
enqueue(e); // 入队操作
return true;
}
} finally {
lock.unlock();
}
}
enqueue()
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length) // 添加角标达到最大值,至0重新开始
putIndex = 0;
count++;
notEmpty.signal();
}
put()
put 方法和 add 方法功能一样,差异是 put 方法如果队列满了,会阻塞。这个在最开始的时候说过。接下来看一下它的实现逻辑
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();// 这个也是获取锁,但不同的是,这个方法允许 其他线程调用本线程的interrupt方法来中断等待,直接返回。 而lock方法是尝试获取锁后响应中断;
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
take()
阻塞式获取队列中的方法,原理:有就删除返回,没有就阻塞;
如果队列中没有数据,就会把当前线程放入notEmpty 等待队列中,有数据后,put会执行notEmpty.signal() 唤醒take线程,执行take()操作,如下图;
take()方法:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
dequeue()方法:
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex]; // 默认获取0角标元素
items[takeIndex] = null;
if (++takeIndex == items.length) // 角标最大后置0
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued(); // 更新迭代器中的元素数据
notFull.signal(); // 触发 因为队列满以后导致被阻塞的线程;
return x;
}
原子操作类
原子性这个概念,在多线程编程里是一个老生常谈的问题。所谓的原子性表示一个或者多个操作,要么全部执行完,要么一个也不执行。不能出现成功一部分失败一部分的情况。
在多线程中,如果多个线程同时更新一个共享变量,可能会得到一个意料之外的值。比如 i=1 。 A 线程更新 i+1 、B 线程也更新 i+1
AtomicInteger分析
automic 基于 volatile 和 cas 实现的 乐观锁,保证数据正确性;
incrementAndGet()
/**
* Atomically increments by one the current value.
*
* @return the updated value
*/
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
// var1: 当前对象 var2:相对当前对象内存地址偏移量 var4:预期值 var5: 更新值
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
get()
private volatile int value; // 使用valatile 保证可见性
/**
* Gets the current value.
*
* @return the current value
*/
public final int get() {
return value;
}
其他
lock()与lockInterruptibly()区别
public class InterruptDemo {
public static void main(String[] args) throws InterruptedException {
final Lock lock = new ReentrantLock();
lock.lock();
Thread.sleep(1000);
Thread t1 = new Thread(new Runnable(){
@Override
public void run() {
// 1) lock.lock();
try {
// 2) lock.lockInterruptibly();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+" interrupted.");6
}
});
t1.start();
Thread.sleep(1000);
t1.interrupt();
System.out.println("....................");
}
}
开放1),结果:t1线程持续阻塞;
开放2),结果:t1线程相应中断,抛出异常,执行结束;