把多线程环境比作是分布式的话,那么线程与线程之间是不是也可以使用这种消息队列的方式进行数据通信和解耦呢?

一,阻塞队列使用案例

1.注册成功后增加积分

假如模拟一个场景,就是用户注册的时候,在注册成功以后发放积分。这个场景在一般来说,会这么去实现:
image.png
但是实际上,我们需要考虑两个问题:

  1. 性能,在注册这个环节里面,假如添加用户需要花费 1 秒钟,增加积分需要花费 1 秒钟,那么整个注册结果的返回就可能需要大于 2 秒,虽然影响不是很大,但是在量比较大的时候,我们也需要做一些优化。

  2. 耦合,添加用户和增加积分,可以认为是两个领域,也就是说,增加积分并不是注册必须要具备的功能,但是一旦增加积分这个逻辑出现异常,就会导致注册失败。这种耦合在程序设计的时候是一定要规避的。

因此我们可以通过异步的方式来实现。

2.改造之前的代码逻辑

  1. public class UserService {
  2. public boolean register() {
  3. User user = new User();
  4. user.setName("Mic");
  5. addUser(user);
  6. sendPoints(user);
  7. return true;
  8. }
  9. public static void main(String[] args) {
  10. new UserService().register();
  11. }
  12. private void addUser(User user) {
  13. System.out.println(" 添加用户:" + user);
  14. try {
  15. Thread.sleep(1000);
  16. } catch (InterruptedException e) {
  17. e.printStackTrace();
  18. }
  19. }
  20. private void sendPoints(User user) {
  21. System.out.println(" 发 送 积 分 给 指 定 用 户:" + user);
  22. try {
  23. Thread.sleep(1000);
  24. } catch (InterruptedException e) {
  25. e.printStackTrace();
  26. }
  27. }
  28. }
  29. @Data
  30. class User {
  31. private String name;
  32. }

3.改造之后的逻辑

  1. public class UserService {
  2. private final ExecutorService single = Executors.newSingleThreadExecutor();
  3. private volatile boolean isRunning = true;
  4. ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(10);
  5. {
  6. init();
  7. }
  8. public void init() {
  9. single.execute(() -> {
  10. while (isRunning) {
  11. try {
  12. User user = (User) arrayBlockingQueue.take();// 阻塞的方式获取队列中的数据
  13. sendPoints(user);
  14. } catch (InterruptedException e) {
  15. e.printStackTrace();
  16. }
  17. }
  18. });
  19. }
  20. public boolean register() {
  21. User user = new User();
  22. user.setName("Mic");
  23. addUser(user);
  24. arrayBlockingQueue.add(user);// 添加到异步队列
  25. return true;
  26. }
  27. public static void main(String[] args) {
  28. new UserService().register();
  29. }
  30. private void addUser(User user) {
  31. System.out.println(" 添加用户:" + user);
  32. try {
  33. Thread.sleep(1000);
  34. } catch (InterruptedException e) {
  35. e.printStackTrace();
  36. }
  37. }
  38. private void sendPoints(User user) {
  39. System.out.println(" 发 送 积 分 给 指 定 用 户:" + user);
  40. try {
  41. Thread.sleep(1000);
  42. } catch (InterruptedException e) {
  43. e.printStackTrace();
  44. }
  45. }
  46. }
  47. @Data
  48. class User {
  49. private String name;
  50. }

优化以后,整个流程就变成了这样
image.png
我们使用了 ArrayBlockingQueue 基于数组的阻塞队列,来优化代码的执行逻辑。

4.阻塞队列的应用场景

阻塞队列这块的应用场景,比较多的仍然是对于生产者消费者场景的应用,但是由于分布式架构的普及,更多的关注在分布式消息队列上。所以其实如果把阻塞队列比作成分布式消息队列的话,那么所谓的生产者和消费者其实就是基于阻塞队列的解耦。

另外,阻塞队列是一个 fifo 的队列,所以对于希望在线程级别需要实现对目标服务的顺序访问的场景中,也可以使用。

二,J.U.C 中的阻塞队列

1.JUC中提供的阻塞队列

在 Java8 中,提供了 7 个阻塞队列。

ArrayBlockingQueue 数组实现的有界阻塞队列, 此队列按照先进先出(FIFO)的原则对元素进行排序。
LinkedBlockingQueue 链表实现的有界阻塞队列, 此队列的默认和最大长度为Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序
PriorityBlockingQueue 支持优先级排序的无界阻塞队列, 默认情况下元素采取自然顺序升序排列。也可以自定义类实现 compareTo()方法来指定元素排序规则,或者初始化 PriorityBlockingQueue 时,指定构造参数 Comparator 来对元素进行排序。
DelayQueue 优先级队列实现的无界阻塞队列
SynchronousQueue 不存储元素的阻塞队列, 每一个 put 操作必须等待一个 take 操作,否则不能继续添加元素。
LinkedTransferQueue 链表实现的无界阻塞队列
LinkedBlockingDeque 链表实现的双向阻塞队列

2.阻塞队列的操作方法

在阻塞队列中,提供了四种处理方式

2.1插入操作

add(e) :添加元素到队列中,如果队列满了,继续插入元素会报错,IllegalStateException。

offer(e) : 添加元素到队列,同时会返回元素是否插入成功的状态,如果成功则返回 true。

put(e) :当阻塞队列满了以后,生产者继续通过 put添加元素,队列会一直阻塞生产者线程,直到队列可用。

offer(e,time,unit) :当阻塞队列满了以后继续添加元素,生产者线程会被阻塞指定时间,如果超时,则线程直接退出。

2.2移除操作

remove():当队列为空时,调用 remove 会返回 false,如果元素移除成功,则返回 true。

poll(): 当队列中存在元素,则从队列中取出一个元素,如果队列为空,则直接返回 null。

take():基于阻塞的方式获取队列中的元素,如果队列为空,则 take 方法会一直阻塞,直到队列中有新的数据可以消费。

poll(time,unit):带超时机制的获取数据,如果队列为空,则会等待指定的时间再去获取元素返回。

三,ArrayBlockingQueue源码

1.构造方法

  • ArrayBlockingQueue 提供了三个构造方法,分别如下。
  • capacity: 表示数组的长度,也就是队列的长度。
  • fair:表示是否为公平的阻塞队列,默认情况下构造的是非公平的阻塞队列。
  1. public ArrayBlockingQueue(int capacity) {
  2. this(capacity, false);
  3. }
  4. public ArrayBlockingQueue(int capacity, boolean fair) {
  5. if (capacity <= 0)
  6. throw new IllegalArgumentException();
  7. this.items = new Object[capacity];
  8. lock = new ReentrantLock(fair);//重入锁,出队和入队持有这一把锁
  9. notEmpty = lock.newCondition();//初始化非空等待队列
  10. notFull = lock.newCondition();//初始化非满等待队列
  11. }
  12. public ArrayBlockingQueue(int capacity, boolean fair,
  13. Collection<? extends E> c) {
  14. this(capacity, fair);
  15. final ReentrantLock lock = this.lock;
  16. lock.lock(); // Lock only for visibility, not mutual exclusion
  17. try {
  18. int i = 0;
  19. try {
  20. for (E e : c) {
  21. checkNotNull(e);
  22. items[i++] = e;
  23. }
  24. } catch (ArrayIndexOutOfBoundsException ex) {
  25. throw new IllegalArgumentException();
  26. }
  27. count = i;
  28. putIndex = (i == capacity) ? 0 : i;
  29. } finally {
  30. lock.unlock();
  31. }
  32. }

items 构造以后,大概是一个这样的数组结构:

2.add

以 add 方法作为入口,在 add 方法中会调用父类的 add 方法,也就是 AbstractQueue.

  1. public boolean add(E e) {
  2. return super.add(e);
  3. }
  4. ======================================================
  5. public boolean add(E e) {
  6. if (offer(e))
  7. return true;
  8. else
  9. throw new IllegalStateException("Queue full");
  10. }

从父类的 add 方法可以看到,这里做了一个队列是否满了的判断,如果队列满了直接抛出一个异常。

3.offer

  1. public boolean offer(E e) {
  2. //校验放入队列的元素如果为null,抛出空指针异常
  3. checkNotNull(e);
  4. final ReentrantLock lock = this.lock;
  5. lock.lock();
  6. try {
  7. //如果队列已经满了,返回false
  8. if (count == items.length)
  9. return false;
  10. else {
  11. //否则,执行入队逻辑
  12. enqueue(e);
  13. return true;
  14. }
  15. } finally {
  16. lock.unlock();
  17. }
  18. }

4.checkNotNull

  1. private static void checkNotNull(Object v) {
  2. if (v == null)
  3. throw new NullPointerException();
  4. }

5.enqueue

  1. private void enqueue(E x) {
  2. //当前队列的引用
  3. final Object[] items = this.items;
  4. //元素入队
  5. items[putIndex] = x;
  6. //如果下一个元素存放位置大于队列长度,把队列长度置为0
  7. if (++putIndex == items.length)
  8. putIndex = 0;
  9. //元素个数+1
  10. count++;
  11. //唤醒处于等待状态下的线程,表示当前队列中的元素不为空,如果存在消费者线程阻塞,就可以开始取出元素
  12. notEmpty.signal();
  13. }

putIndex 为什么会在等于数组长度的时候重新设置为 0?

因为 ArrayBlockingQueue 是一个 FIFO 的队列,队列添加元素时,是从队尾获取 putIndex 来存储元素,当 putIndex等于数组长度时,下次就需要从数组头部开始添加了。

下面这个图模拟了添加到不同长度的元素时,putIndex 的变化,当 putIndex 等于数组长度时,不可能让 putIndex 继续累加,否则会超出数组初始化的容量大小。

  1. 当元素满了以后是无法继续添加的,因为会报错。

  2. 队列中的元素肯定会有一个消费者线程通过 take或者其他方法来获取数据,而获取数据的同时元素也会从队列中移除。

image.png

6.put

put 方法和 add 方法功能一样,差异是 put 方法如果队列满了,会阻塞。

  1. public void put(E e) throws InterruptedException {
  2. checkNotNull(e);
  3. final ReentrantLock lock = this.lock;
  4. lock.lockInterruptibly();
  5. try {
  6. //元素个数如果等于数组的长度,阻塞当前线程
  7. while (count == items.length)
  8. notFull.await();
  9. //否则,入队逻辑
  10. enqueue(e);
  11. } finally {
  12. lock.unlock();
  13. }
  14. }

image.png

7.take

take 方法是一种阻塞获取队列中元素的方法。

它的实现原理很简单,有就删除没有就阻塞,注意这个阻塞是可以中断的,如果队列没有数据那么就加入 notEmpty条件队列等待(有数据就直接取走,方法结束),如果有新的put 线程添加了数据,那么 put 操作将会唤醒 take 线程,执行 take 操作。

  1. public E take() throws InterruptedException {
  2. final ReentrantLock lock = this.lock;
  3. lock.lockInterruptibly();
  4. try {
  5. //队列元素个数==0 阻塞
  6. while (count == 0)
  7. notEmpty.await();
  8. //否则执行出队逻辑
  9. return dequeue();
  10. } finally {
  11. lock.unlock();
  12. }
  13. }

image.png
如果队列中添加了元素,那么这个时候,会在 enqueue 中调用 notempty.signal 唤醒 take 线程来获得元素
image.png

8.dequeue

这个是出队列的方法,主要是删除队列头部的元素并发返回给客户端。

  1. private E dequeue() {
  2. //获取队列引用
  3. final Object[] items = this.items;
  4. //拿出来一个元素
  5. E x = (E) items[takeIndex];
  6. //将拿出的元素的索引位置设置为null
  7. items[takeIndex] = null;
  8. //如果到了数组长度 从零开始
  9. if (++takeIndex == items.length)
  10. takeIndex = 0;
  11. //队列元素个数-1
  12. count--;
  13. if (itrs != null)
  14. itrs.elementDequeued();//更新迭代器中的元素个数
  15. //触发 因为队列满了以后导致的被阻塞的线程
  16. notFull.signal();
  17. return x;
  18. }

9.elementDequeued

ArrayBlockingQueue 中,实现了迭代器的功能,也就是可以通过迭代器来遍历阻塞队列中的元素。

所以 itrs.elementDequeued() 是用来更新迭代器中的元素数据的。

takeIndex 的索引变化图如下,同时随着数据的移除,会唤醒处于 put 阻塞状态下的线程来继续添加数据。
image.png

10.remove

remove 方法是移除一个指定元素。看看它的实现代码:

  1. public boolean remove(Object o) {
  2. //判空
  3. if (o == null) return false;
  4. //获取队列的引用
  5. final Object[] items = this.items;
  6. final ReentrantLock lock = this.lock;
  7. lock.lock();
  8. try {
  9. //如果队列中元素个数大于0
  10. if (count > 0) {
  11. //获取上一次放入的元素的索引位置
  12. final int putIndex = this.putIndex;
  13. //获取上一次取出的元素的索引的位置
  14. int i = takeIndex;
  15. do {
  16. //遍历每一个元素,如果找到了,就移除元素
  17. if (o.equals(items[i])) {
  18. removeAt(i);
  19. return true;
  20. }
  21. //这个的逻辑有啥用?
  22. //可能存在这样一种情况:
  23. // 1 null 2 3 4
  24. if (++i == items.length)
  25. i = 0;
  26. } while (i != putIndex);
  27. }
  28. return false;
  29. } finally {
  30. lock.unlock();
  31. }
  32. }

11.removeAt

  1. void removeAt(final int removeIndex) {
  2. //获取队列的引用
  3. final Object[] items = this.items;
  4. //如果要删除的节点恰好是下一个获取元素要拿的索引位置,直接干掉就中
  5. if (removeIndex == takeIndex) {
  6. // removing front item; just advance
  7. items[takeIndex] = null;
  8. if (++takeIndex == items.length)
  9. takeIndex = 0;
  10. count--;
  11. if (itrs != null)
  12. itrs.elementDequeued();
  13. } else {
  14. //这个时候就需要轮训删除
  15. final int putIndex = this.putIndex;
  16. //自旋
  17. for (int i = removeIndex;;) {
  18. //删除索引的下一个索引
  19. int next = i + 1;
  20. //如果到头了 ,意思就是 , 那就得 从头再来
  21. if (next == items.length)
  22. next = 0;
  23. // 断流了 得跳过去
  24. if (next != putIndex) {
  25. items[i] = items[next];
  26. i = next;
  27. } else {
  28. //找到了 删除 退出
  29. items[i] = null;
  30. this.putIndex = i;
  31. break;
  32. }
  33. }
  34. //元素个数--
  35. count--;
  36. //处理迭代器的逻辑
  37. if (itrs != null)
  38. itrs.removedAt(removeIndex);
  39. }
  40. //将往队列放元素,但是因为队列满了阻塞的线程唤醒一个,当然了,也可能队列没有线程
  41. notFull.signal();
  42. }