阻塞队列

应用场景: pub/sub 模型

例:用户注册成功后,发送优惠券;

  1. public class BlockingDemo {
  2. ArrayBlockingQueue<String> ab=new ArrayBlockingQueue(10);//FIFO的队列
  3. {
  4. init(); //构造块初始化
  5. }
  6. public void init(){
  7. new Thread(()->{
  8. while(true) {
  9. try {
  10. String data = ab.take(); //阻塞方式获得元素
  11. System.out.println("receive:" + data);
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. }
  15. }
  16. }).start();
  17. }
  18. public void addData(String data) throws InterruptedException {
  19. ab.add(data);
  20. System.out.println("sendData:"+data);
  21. Thread.sleep(1000);
  22. }
  23. public static void main(String[] args) throws InterruptedException {
  24. BlockingDemo blockingDemo=new BlockingDemo();
  25. for(int i=0;i<1000;i++){
  26. blockingDemo.addData("data:"+i);
  27. }
  28. }
  29. }

J.U.C中的阻塞队列

java阻塞队列

java8提供了,7中阻塞队列,常用3种如下

名称 描述
ArrayBlockingQueue 数组实现的有界阻塞队列, 此队列按照先进先出(FIFO)的原则 对元素进行排序
LinkedBlockingQueue 链表实现的有界阻塞队列, 此队列的默认和最大长度为 Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行 排序
DelayQueue 优先级队列实现的无界阻塞队列

阻塞队列常用方法

插入操作

boolean offer(e): 添加元素,返回插入成功的状态;唤醒不为null 获取队列;

void add(e) : 添加元素到队尾,如果满了,插入报错,throw new IllegalStateException("Queue full")

void put(e) : 当队列插满以后,会阻塞生产者线程, 直到有元素被移除;

boolean offer(e,timeout,unit): 当阻塞队列满了后继续添加元素,生产者线程会被阻塞指定时间,如果超时,则线程直接退出;

移除操作

boolean remove(): 当队列为null,返回false, 如果队列移除成功,则返回true;

E poll(): 返回takeIndex 元素, 唤醒notFull中的 插入队列, 先获取锁,然后响应中断

E take(): 返回takeIndex 元素, 唤醒notFull中的 插入队列, 先响应中断,获取锁

E poll(time,unit):带超时机制的获取数据,如果队列为空,
则会等待指定的时间再去获取元素返回

ArrayBlockingQueue原理分析

构造方法

  1. public ArrayBlockingQueue(int capacity, boolean fair) {
  2. if (capacity <= 0)
  3. throw new IllegalArgumentException();
  4. this.items = new Object[capacity];
  5. lock = new ReentrantLock(fair); // 重入锁,出入队列都用同一把锁
  6. notEmpty = lock.newCondition(); // 初始化非null等待队列
  7. notFull = lock.newCondition(); // 初始化非满等待队列
  8. }

takeIndex、putIndex 分别表示 取索引值、入索引值;takeIndex == count 后,takeIndex = 0,恢复成0,从0开始重新获取;

add()

add()方法会调用父类的add 方法,也就是AbstractQueue, 如果源码看的多的话,一般这种方式都是子类调用父类模板方法解决通用型问题

  1. public boolean add(E e) {
  2. return super.add(e);
  3. }
  1. public boolean add(E e) { // 父类 abstractQueue 方法
  2. if (offer(e))
  3. return true;
  4. else
  5. throw new IllegalStateException("Queue full");
  6. }

offer()

  1. public boolean offer(E e) {
  2. checkNotNull(e);
  3. final ReentrantLock lock = this.lock;
  4. lock.lock();
  5. try {
  6. if (count == items.length) // 队列已满,返回
  7. return false;
  8. else {
  9. enqueue(e); // 入队操作
  10. return true;
  11. }
  12. } finally {
  13. lock.unlock();
  14. }
  15. }

enqueue()

  1. private void enqueue(E x) {
  2. // assert lock.getHoldCount() == 1;
  3. // assert items[putIndex] == null;
  4. final Object[] items = this.items;
  5. items[putIndex] = x;
  6. if (++putIndex == items.length) // 添加角标达到最大值,至0重新开始
  7. putIndex = 0;
  8. count++;
  9. notEmpty.signal();
  10. }

put()

put 方法和 add 方法功能一样,差异是 put 方法如果队列满了,会阻塞。这个在最开始的时候说过。接下来看一下它的实现逻辑

  1. public void put(E e) throws InterruptedException {
  2. checkNotNull(e);
  3. final ReentrantLock lock = this.lock;
  4. lock.lockInterruptibly();// 这个也是获取锁,但不同的是,这个方法允许 其他线程调用本线程的interrupt方法来中断等待,直接返回。 而lock方法是尝试获取锁后响应中断;
  5. try {
  6. while (count == items.length)
  7. notFull.await();
  8. enqueue(e);
  9. } finally {
  10. lock.unlock();
  11. }
  12. }

take()

阻塞式获取队列中的方法,原理:有就删除返回,没有就阻塞;

如果队列中没有数据,就会把当前线程放入notEmpty 等待队列中,有数据后,put会执行notEmpty.signal() 唤醒take线程,执行take()操作,如下图;

阻塞队列与原子操作 - 图1

take()方法:

  1. public E take() throws InterruptedException {
  2. final ReentrantLock lock = this.lock;
  3. lock.lockInterruptibly();
  4. try {
  5. while (count == 0)
  6. notEmpty.await();
  7. return dequeue();
  8. } finally {
  9. lock.unlock();
  10. }
  11. }

dequeue()方法:

  1. private E dequeue() {
  2. // assert lock.getHoldCount() == 1;
  3. // assert items[takeIndex] != null;
  4. final Object[] items = this.items;
  5. @SuppressWarnings("unchecked")
  6. E x = (E) items[takeIndex]; // 默认获取0角标元素
  7. items[takeIndex] = null;
  8. if (++takeIndex == items.length) // 角标最大后置0
  9. takeIndex = 0;
  10. count--;
  11. if (itrs != null)
  12. itrs.elementDequeued(); // 更新迭代器中的元素数据
  13. notFull.signal(); // 触发 因为队列满以后导致被阻塞的线程;
  14. return x;
  15. }

原子操作类

原子性这个概念,在多线程编程里是一个老生常谈的问题。所谓的原子性表示一个或者多个操作,要么全部执行完,要么一个也不执行。不能出现成功一部分失败一部分的情况。

在多线程中,如果多个线程同时更新一个共享变量,可能会得到一个意料之外的值。比如 i=1 。 A 线程更新 i+1 、B 线程也更新 i+1

AtomicInteger分析

automic 基于 volatile 和 cas 实现的 乐观锁,保证数据正确性;

incrementAndGet()

  1. /**
  2. * Atomically increments by one the current value.
  3. *
  4. * @return the updated value
  5. */
  6. public final int incrementAndGet() {
  7. return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
  8. }
  1. public final int getAndAddInt(Object var1, long var2, int var4) {
  2. int var5;
  3. do {
  4. var5 = this.getIntVolatile(var1, var2);
  5. } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
  6. return var5;
  7. }
  8. // var1: 当前对象 var2:相对当前对象内存地址偏移量 var4:预期值 var5: 更新值
  9. public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

get()

  1. private volatile int value; // 使用valatile 保证可见性
  2. /**
  3. * Gets the current value.
  4. *
  5. * @return the current value
  6. */
  7. public final int get() {
  8. return value;
  9. }

其他

lock()与lockInterruptibly()区别

  1. public class InterruptDemo {
  2. public static void main(String[] args) throws InterruptedException {
  3. final Lock lock = new ReentrantLock();
  4. lock.lock();
  5. Thread.sleep(1000);
  6. Thread t1 = new Thread(new Runnable(){
  7. @Override
  8. public void run() {
  9. // 1) lock.lock();
  10. try {
  11. // 2) lock.lockInterruptibly();
  12. } catch (InterruptedException e) {
  13. // TODO Auto-generated catch block
  14. e.printStackTrace();
  15. }
  16. System.out.println(Thread.currentThread().getName()+" interrupted.");6
  17. }
  18. });
  19. t1.start();
  20. Thread.sleep(1000);
  21. t1.interrupt();
  22. System.out.println("....................");
  23. }
  24. }

开放1),结果:t1线程持续阻塞;

开放2),结果:t1线程相应中断,抛出异常,执行结束;

参见