生产者-消费者模式是一个十分经典的多线程并发协作的模式,弄懂生产者-消费者问题能够让我们对并发编程的理解加深。所谓生产者-消费者问题,实际上主要是包含了两类线程,一种是生产者线程用于生产数据,另一种是消费者线程用于消费数据,为了解耦生产者和消费者的关系,通常会采用共享的数据区域,就像是一个仓库,生产者生产数据之后直接放置在共享数据区中,并不需要关心消费者的行为;而消费者只需要从共享数据区中去获取数据,就不再需要关心生产者的行为。但是,这个共享数据区域中应该具备这样的线程间并发协作的功能:

  1. 如果共享数据区已满的话,阻塞生产者继续生产数据放置入内;
  2. 如果共享数据区为空的话,阻塞消费者继续消费数据;

在实现生产者消费者问题时,可以采用三种方式:
1.使用Object的wait/notify的消息通知机制;
2.使用Lock的Condition的await/signal的消息通知机制;
3.使用BlockingQueue实现。本文主要将这三种实现方式进行总结归纳。

1. wait/notify的消息通知机制

1.1 预备知识

Java 中,可以通过配合调用 Object 对象的 wait() 方法和 notify()方法或 notifyAll() 方法来实现线程间的通信。在线程中调用 wait() 方法,将阻塞当前线程,直至等到其他线程调用了调用 notify() 方法或 notifyAll() 方法进行通知之后,当前线程才能从wait()方法出返回,继续执行下面的操作。

  1. wait

该方法用来将当前线程置入休眠状态,直到接到通知或被中断为止。在调用 wait()之前,线程必须要获得该对象的对象监视器锁,即只能在同步方法或同步块中调用 wait()方法。调用wait()方法之后,当前线程会释放锁。如果调用wait()方法时,线程并未获取到锁的话,则会抛出IllegalMonitorStateException异常,这是以个RuntimeException。如果再次获取到锁的话,当前线程才能从wait()方法处成功返回。

  1. notify

该方法也要在同步方法或同步块中调用,即在调用前,线程也必须要获得该对象的对象级别锁,如果调用 notify()时没有持有适当的锁,也会抛出 IllegalMonitorStateException。 该方法任意从WAITTING状态的线程中挑选一个进行通知,使得调用wait()方法的线程从等待队列移入到同步队列中,等待有机会再一次获取到锁,从而使得调用wait()方法的线程能够从wait()方法处退出。调用notify后,当前线程不会马上释放该对象锁,要等到程序退出同步块后,当前线程才会释放锁。

  1. notifyAll 该方法与 notify ()方法的工作方式相同,重要的一点差异是: notifyAll 使所有原来在该对象上 wait 的线程统统退出WAITTING状态,使得他们全部从等待队列中移入到同步队列中去,等待下一次能够有机会获取到对象监视器锁。

1.2 wait/notify消息通知潜在的一些问题

1.notify早期通知

notify 通知的遗漏很容易理解,即 threadA 还没开始 wait 的时候,threadB 已经 notify 了,这样,threadB 通知是没有任何响应的,当 threadB 退出 synchronized 代码块后,threadA 再开始 wait,便会一直阻塞等待,直到被别的线程打断。比如在下面的示例代码中,就模拟出notify早期通知带来的问题:

  1. public class EarlyNotify {
  2. private static String lockObject = "";
  3. public static void main(String[] args) {
  4. WaitThread waitThread = new WaitThread(lockObject);
  5. NotifyThread notifyThread = new NotifyThread(lockObject);
  6. notifyThread.start();
  7. try {
  8. Thread.sleep(3000);
  9. } catch (InterruptedException e) {
  10. e.printStackTrace();
  11. }
  12. waitThread.start();
  13. }
  14. static class WaitThread extends Thread {
  15. private String lock;
  16. public WaitThread(String lock) {
  17. this.lock = lock;
  18. }
  19. @Override
  20. public void run() {
  21. synchronized (lock) {
  22. try {
  23. System.out.println(Thread.currentThread().getName() + " 进去代码块");
  24. System.out.println(Thread.currentThread().getName() + " 开始wait");
  25. lock.wait();
  26. System.out.println(Thread.currentThread().getName() + " 结束wait");
  27. } catch (InterruptedException e) {
  28. e.printStackTrace();
  29. }
  30. }
  31. }
  32. }
  33. static class NotifyThread extends Thread {
  34. private String lock;
  35. public NotifyThread(String lock) {
  36. this.lock = lock;
  37. }
  38. @Override
  39. public void run() {
  40. synchronized (lock) {
  41. System.out.println(Thread.currentThread().getName() + " 进去代码块");
  42. System.out.println(Thread.currentThread().getName() + " 开始notify");
  43. lock.notify();
  44. System.out.println(Thread.currentThread().getName() + " 结束开始notify");
  45. }
  46. }
  47. }
  48. }

示例中开启了两个线程,一个是WaitThread,另一个是NotifyThread。NotifyThread会先启动,先调用notify方法。然后WaitThread线程才启动,调用wait方法,但是由于通知过了,wait方法就无法再获取到相应的通知,因此WaitThread会一直在wait方法出阻塞,这种现象就是通知过早的现象。针对这种现象,解决方法,一般是添加一个状态标志,让waitThread调用wait方法前先判断状态是否已经改变了没,如果通知早已发出的话,WaitThread就不再去wait。对上面的代码进行更正:

  1. public class EarlyNotify {
  2. private static String lockObject = "";
  3. private static boolean isWait = true;
  4. public static void main(String[] args) {
  5. WaitThread waitThread = new WaitThread(lockObject);
  6. NotifyThread notifyThread = new NotifyThread(lockObject);
  7. notifyThread.start();
  8. try {
  9. Thread.sleep(3000);
  10. } catch (InterruptedException e) {
  11. e.printStackTrace();
  12. }
  13. waitThread.start();
  14. }
  15. static class WaitThread extends Thread {
  16. private String lock;
  17. public WaitThread(String lock) {
  18. this.lock = lock;
  19. }
  20. @Override
  21. public void run() {
  22. synchronized (lock) {
  23. try {
  24. while (isWait) {
  25. System.out.println(Thread.currentThread().getName() + " 进去代码块");
  26. System.out.println(Thread.currentThread().getName() + " 开始wait");
  27. lock.wait();
  28. System.out.println(Thread.currentThread().getName() + " 结束wait");
  29. }
  30. } catch (InterruptedException e) {
  31. e.printStackTrace();
  32. }
  33. }
  34. }
  35. }
  36. static class NotifyThread extends Thread {
  37. private String lock;
  38. public NotifyThread(String lock) {
  39. this.lock = lock;
  40. }
  41. @Override
  42. public void run() {
  43. synchronized (lock) {
  44. System.out.println(Thread.currentThread().getName() + " 进去代码块");
  45. System.out.println(Thread.currentThread().getName() + " 开始notify");
  46. lock.notifyAll();
  47. isWait = false;
  48. System.out.println(Thread.currentThread().getName() + " 结束开始notify");
  49. }
  50. }
  51. }
  52. }

这段代码只是增加了一个isWait状态变量,NotifyThread调用notify方法后会对状态变量进行更新,在WaitThread中调用wait方法之前会先对状态变量进行判断,在该示例中,调用notify后将状态变量isWait改变为false,因此,在WaitThread中while对isWait判断后就不会执行wait方法,从而避免了Notify过早通知造成遗漏的情况。

总结:在使用线程的等待/通知机制时,一般都要配合一个 boolean 变量值(或者其他能够判断真假的条件),在 notify 之前改变该 boolean 变量的值,让 wait 返回后能够退出 while 循环(一般都要在 wait 方法外围加一层 while 循环,以防止早期通知),或在通知被遗漏后,不会被阻塞在 wait 方法处。这样便保证了程序的正确性。

2.等待wait的条件发生变化

如果线程在等待时接受到了通知,但是之后等待的条件发生了变化,并没有再次对等待条件进行判断,也会导致程序出现错误。

下面用一个例子来说明这种情况

  1. public class ConditionChange {
  2. private static List<String> lockObject = new ArrayList();
  3. public static void main(String[] args) {
  4. Consumer consumer1 = new Consumer(lockObject);
  5. Consumer consumer2 = new Consumer(lockObject);
  6. Productor productor = new Productor(lockObject);
  7. consumer1.start();
  8. consumer2.start();
  9. productor.start();
  10. }
  11. static class Consumer extends Thread {
  12. private List<String> lock;
  13. public Consumer(List lock) {
  14. this.lock = lock;
  15. }
  16. @Override
  17. public void run() {
  18. synchronized (lock) {
  19. try {
  20. //这里使用if的话,就会存在wait条件变化造成程序错误的问题
  21. if (lock.isEmpty()) {
  22. System.out.println(Thread.currentThread().getName() + " list为空");
  23. System.out.println(Thread.currentThread().getName() + " 调用wait方法");
  24. lock.wait();
  25. System.out.println(Thread.currentThread().getName() + " wait方法结束");
  26. }
  27. String element = lock.remove(0);
  28. System.out.println(Thread.currentThread().getName() + " 取出第一个元素为:" + element);
  29. } catch (InterruptedException e) {
  30. e.printStackTrace();
  31. }
  32. }
  33. }
  34. }
  35. static class Productor extends Thread {
  36. private List<String> lock;
  37. public Productor(List lock) {
  38. this.lock = lock;
  39. }
  40. @Override
  41. public void run() {
  42. synchronized (lock) {
  43. System.out.println(Thread.currentThread().getName() + " 开始添加元素");
  44. lock.add(Thread.currentThread().getName());
  45. lock.notifyAll();
  46. }
  47. }
  48. }
  49. }
  50. 会报异常:
  51. Exception in thread "Thread-1" Thread-0 list为空
  52. Thread-0 调用wait方法
  53. Thread-1 list为空
  54. Thread-1 调用wait方法
  55. Thread-2 开始添加元素
  56. Thread-1 wait方法结束
  57. java.lang.IndexOutOfBoundsException: Index: 0, Size: 0

异常原因分析:在这个例子中一共开启了3个线程,Consumer1,Consumer2以及Productor。首先Consumer1调用了wait方法后,线程处于了WAITTING状态,并且将对象锁释放出来。因此,Consumer2能够获取对象锁,从而进入到同步代块中,当执行到wait方法时,同样的也会释放对象锁。因此,productor能够获取到对象锁,进入到同步代码块中,向list中插入数据后,通过notifyAll方法通知处于WAITING状态的Consumer1和Consumer2线程。consumer1得到对象锁后,从wait方法出退出,删除了一个元素让List为空,方法执行结束,退出同步块,释放掉对象锁。这个时候Consumer2获取到对象锁后,从wait方法退出,继续往下执行,这个时候Consumer2再执行lock.remove(0);就会出错,因为List由于Consumer1删除一个元素之后已经为空了。

解决方案:通过上面的分析,可以看出Consumer2报异常是因为线程从wait方法退出之后没有再次对wait条件进行判断,因此,此时的wait条件已经发生了变化。解决办法就是,在wait退出之后再对条件进行判断即可。

  1. public class ConditionChange {
  2. private static List<String> lockObject = new ArrayList();
  3. public static void main(String[] args) {
  4. Consumer consumer1 = new Consumer(lockObject);
  5. Consumer consumer2 = new Consumer(lockObject);
  6. Productor productor = new Productor(lockObject);
  7. consumer1.start();
  8. consumer2.start();
  9. productor.start();
  10. }
  11. static class Consumer extends Thread {
  12. private List<String> lock;
  13. public Consumer(List lock) {
  14. this.lock = lock;
  15. }
  16. @Override
  17. public void run() {
  18. synchronized (lock) {
  19. try {
  20. //这里使用if的话,就会存在wait条件变化造成程序错误的问题
  21. while (lock.isEmpty()) {
  22. System.out.println(Thread.currentThread().getName() + " list为空");
  23. System.out.println(Thread.currentThread().getName() + " 调用wait方法");
  24. lock.wait();
  25. System.out.println(Thread.currentThread().getName() + " wait方法结束");
  26. }
  27. String element = lock.remove(0);
  28. System.out.println(Thread.currentThread().getName() + " 取出第一个元素为:" + element);
  29. } catch (InterruptedException e) {
  30. e.printStackTrace();
  31. }
  32. }
  33. }
  34. }
  35. static class Productor extends Thread {
  36. private List<String> lock;
  37. public Productor(List lock) {
  38. this.lock = lock;
  39. }
  40. @Override
  41. public void run() {
  42. synchronized (lock) {
  43. System.out.println(Thread.currentThread().getName() + " 开始添加元素");
  44. lock.add(Thread.currentThread().getName());
  45. lock.notifyAll();
  46. }
  47. }
  48. }
  49. }

上面的代码与之前的代码仅仅只是将 wait 外围的 if 语句改为 while 循环即可,这样当 list 为空时,线程便会继续等待,而不会继续去执行删除 list 中元素的代码。

总结:在使用线程的等待/通知机制时,一般都要在 while 循环中调用 wait()方法,因此xuy配合使用一个 boolean 变量(或其他能判断真假的条件,如本文中的 list.isEmpty()),满足 while 循环的条件时,进入 while 循环,执行 wait()方法,不满足 while 循环的条件时,跳出循环,执行后面的代码。

3. “假死”状态

现象:如果是多消费者和多生产者情况,如果使用notify方法可能会出现“假死”的情况,即唤醒的是同类线程。

原因分析:假设当前多个生产者线程会调用wait方法阻塞等待,当其中的生产者线程获取到对象锁之后使用notify通知处于WAITTING状态的线程,如果唤醒的仍然是生产者线程,就会造成所有的生产者线程都处于等待状态。

解决办法:将notify方法替换成notifyAll方法,如果使用的是lock的话,就将signal方法替换成signalAll方法。

总结

在Object提供的消息通知机制应该遵循如下这些条件:

  1. 永远在while循环中对条件进行判断而不是if语句中进行wait条件的判断;
  2. 使用NotifyAll而不是使用notify。

基本的使用范式如下:

  1. // The standard idiom for calling the wait method in Java
  2. synchronized (sharedObject) {
  3. while (condition) {
  4. sharedObject.wait();
  5. // (Releases lock, and reacquires on wakeup)
  6. }
  7. // do action based upon condition e.g. take or put into queue
  8. }

1.3 wait/notifyAll实现生产者-消费者

利用wait/notifyAll实现生产者和消费者代码如下:

  1. public class ProductorConsumer {
  2. public static void main(String[] args) {
  3. LinkedList linkedList = new LinkedList();
  4. ExecutorService service = Executors.newFixedThreadPool(15);
  5. for (int i = 0; i < 5; i++) {
  6. service.submit(new Productor(linkedList, 8));
  7. }
  8. for (int i = 0; i < 10; i++) {
  9. service.submit(new Consumer(linkedList));
  10. }
  11. }
  12. static class Productor implements Runnable {
  13. private List<Integer> list;
  14. private int maxLength;
  15. public Productor(List list, int maxLength) {
  16. this.list = list;
  17. this.maxLength = maxLength;
  18. }
  19. @Override
  20. public void run() {
  21. while (true) {
  22. synchronized (list) {
  23. try {
  24. while (list.size() == maxLength) {
  25. System.out.println("生产者" + Thread.currentThread().getName() + " list以达到最大容量,进行wait");
  26. list.wait();
  27. System.out.println("生产者" + Thread.currentThread().getName() + " 退出wait");
  28. }
  29. Random random = new Random();
  30. int i = random.nextInt();
  31. System.out.println("生产者" + Thread.currentThread().getName() + " 生产数据" + i);
  32. list.add(i);
  33. list.notifyAll();
  34. } catch (InterruptedException e) {
  35. e.printStackTrace();
  36. }
  37. }
  38. }
  39. }
  40. }
  41. static class Consumer implements Runnable {
  42. private List<Integer> list;
  43. public Consumer(List list) {
  44. this.list = list;
  45. }
  46. @Override
  47. public void run() {
  48. while (true) {
  49. synchronized (list) {
  50. try {
  51. while (list.isEmpty()) {
  52. System.out.println("消费者" + Thread.currentThread().getName() + " list为空,进行wait");
  53. list.wait();
  54. System.out.println("消费者" + Thread.currentThread().getName() + " 退出wait");
  55. }
  56. Integer element = list.remove(0);
  57. System.out.println("消费者" + Thread.currentThread().getName() + " 消费数据:" + element);
  58. list.notifyAll();
  59. } catch (InterruptedException e) {
  60. e.printStackTrace();
  61. }
  62. }
  63. }
  64. }
  65. }
  66. }
  67. 输出结果:
  68. 生产者pool-1-thread-1 生产数据-232820990
  69. 生产者pool-1-thread-1 生产数据1432164130
  70. 生产者pool-1-thread-1 生产数据1057090222
  71. 生产者pool-1-thread-1 生产数据1201395916
  72. 生产者pool-1-thread-1 生产数据482766516
  73. 生产者pool-1-thread-1 list以达到最大容量,进行wait
  74. 消费者pool-1-thread-15 退出wait
  75. 消费者pool-1-thread-15 消费数据:1237535349
  76. 消费者pool-1-thread-15 消费数据:-1617438932
  77. 消费者pool-1-thread-15 消费数据:-535396055
  78. 消费者pool-1-thread-15 消费数据:-232820990
  79. 消费者pool-1-thread-15 消费数据:1432164130
  80. 消费者pool-1-thread-15 消费数据:1057090222
  81. 消费者pool-1-thread-15 消费数据:1201395916
  82. 消费者pool-1-thread-15 消费数据:482766516
  83. 消费者pool-1-thread-15 list为空,进行wait
  84. 生产者pool-1-thread-5 退出wait
  85. 生产者pool-1-thread-5 生产数据1442969724
  86. 生产者pool-1-thread-5 生产数据1177554422
  87. 生产者pool-1-thread-5 生产数据-133137235
  88. 生产者pool-1-thread-5 生产数据324882560
  89. 生产者pool-1-thread-5 生产数据2065211573
  90. 生产者pool-1-thread-5 生产数据253569900
  91. 生产者pool-1-thread-5 生产数据571277922
  92. 生产者pool-1-thread-5 生产数据1622323863
  93. 生产者pool-1-thread-5 list以达到最大容量,进行wait
  94. 消费者pool-1-thread-10 退出wait

2. 使用Lock中Condition的await/signalAll实现生产者-消费者

参照Object的wait和notify/notifyAll方法,Condition也提供了同样的方法:

针对wait方法

void await() throws InterruptedException:当前线程进入等待状态,如果其他线程调用condition的signal或者signalAll方法并且当前线程获取Lock从await方法返回,如果在等待状态中被中断会抛出被中断异常;

long awaitNanos(long nanosTimeout):当前线程进入等待状态直到被通知,中断或者超时;

boolean await(long time, TimeUnit unit)throws InterruptedException:同第二种,支持自定义时间单位

boolean awaitUntil(Date deadline) throws InterruptedException:当前线程进入等待状态直到被通知,中断或者到了某个时间

针对notify方法

void signal():唤醒一个等待在condition上的线程,将该线程从等待队列中转移到同步队列中,如果在同步队列中能够竞争到Lock则可以从等待方法中返回。

void signalAll():与1的区别在于能够唤醒所有等待在condition上的线程

也就是说wait—->await,notify——>Signal。另外,关于lock中condition消息通知的原理解析可以看这篇文章。

如果采用lock中Conditon的消息通知原理来实现生产者-消费者问题,原理同使用wait/notifyAll一样。直接上代码:

  1. public class ProductorConsumer {
  2. private static ReentrantLock lock = new ReentrantLock();
  3. private static Condition full = lock.newCondition();
  4. private static Condition empty = lock.newCondition();
  5. public static void main(String[] args) {
  6. LinkedList linkedList = new LinkedList();
  7. ExecutorService service = Executors.newFixedThreadPool(15);
  8. for (int i = 0; i < 5; i++) {
  9. service.submit(new Productor(linkedList, 8, lock));
  10. }
  11. for (int i = 0; i < 10; i++) {
  12. service.submit(new Consumer(linkedList, lock));
  13. }
  14. }
  15. static class Productor implements Runnable {
  16. private List<Integer> list;
  17. private int maxLength;
  18. private Lock lock;
  19. public Productor(List list, int maxLength, Lock lock) {
  20. this.list = list;
  21. this.maxLength = maxLength;
  22. this.lock = lock;
  23. }
  24. @Override
  25. public void run() {
  26. while (true) {
  27. lock.lock();
  28. try {
  29. while (list.size() == maxLength) {
  30. System.out.println("生产者" + Thread.currentThread().getName() + " list以达到最大容量,进行wait");
  31. full.await();
  32. System.out.println("生产者" + Thread.currentThread().getName() + " 退出wait");
  33. }
  34. Random random = new Random();
  35. int i = random.nextInt();
  36. System.out.println("生产者" + Thread.currentThread().getName() + " 生产数据" + i);
  37. list.add(i);
  38. empty.signalAll();
  39. } catch (InterruptedException e) {
  40. e.printStackTrace();
  41. } finally {
  42. lock.unlock();
  43. }
  44. }
  45. }
  46. }
  47. static class Consumer implements Runnable {
  48. private List<Integer> list;
  49. private Lock lock;
  50. public Consumer(List list, Lock lock) {
  51. this.list = list;
  52. this.lock = lock;
  53. }
  54. @Override
  55. public void run() {
  56. while (true) {
  57. lock.lock();
  58. try {
  59. while (list.isEmpty()) {
  60. System.out.println("消费者" + Thread.currentThread().getName() + " list为空,进行wait");
  61. empty.await();
  62. System.out.println("消费者" + Thread.currentThread().getName() + " 退出wait");
  63. }
  64. Integer element = list.remove(0);
  65. System.out.println("消费者" + Thread.currentThread().getName() + " 消费数据:" + element);
  66. full.signalAll();
  67. } catch (InterruptedException e) {
  68. e.printStackTrace();
  69. } finally {
  70. lock.unlock();
  71. }
  72. }
  73. }
  74. }
  75. }
  76. 输出结果:
  77. 消费者pool-1-thread-9 消费数据:1146627506
  78. 消费者pool-1-thread-9 消费数据:1508001019
  79. 消费者pool-1-thread-9 消费数据:-600080565
  80. 消费者pool-1-thread-9 消费数据:-1000305429
  81. 消费者pool-1-thread-9 消费数据:-1270658620
  82. 消费者pool-1-thread-9 消费数据:1961046169
  83. 消费者pool-1-thread-9 消费数据:-307680655
  84. 消费者pool-1-thread-9 list为空,进行wait
  85. 消费者pool-1-thread-13 退出wait
  86. 消费者pool-1-thread-13 list为空,进行wait
  87. 消费者pool-1-thread-10 退出wait
  88. 生产者pool-1-thread-5 退出wait
  89. 生产者pool-1-thread-5 生产数据-892558288
  90. 生产者pool-1-thread-5 生产数据-1917220008
  91. 生产者pool-1-thread-5 生产数据2146351766
  92. 生产者pool-1-thread-5 生产数据452445380
  93. 生产者pool-1-thread-5 生产数据1695168334
  94. 生产者pool-1-thread-5 生产数据1979746693
  95. 生产者pool-1-thread-5 生产数据-1905436249
  96. 生产者pool-1-thread-5 生产数据-101410137
  97. 生产者pool-1-thread-5 list以达到最大容量,进行wait
  98. 生产者pool-1-thread-1 退出wait
  99. 生产者pool-1-thread-1 list以达到最大容量,进行wait
  100. 生产者pool-1-thread-4 退出wait
  101. 生产者pool-1-thread-4 list以达到最大容量,进行wait
  102. 生产者pool-1-thread-2 退出wait
  103. 生产者pool-1-thread-2 list以达到最大容量,进行wait
  104. 生产者pool-1-thread-3 退出wait
  105. 生产者pool-1-thread-3 list以达到最大容量,进行wait
  106. 消费者pool-1-thread-9 退出wait
  107. 消费者pool-1-thread-9 消费数据:-892558288

3. 使用BlockingQueue实现生产者-消费者

由于BlockingQueue内部实现就附加了两个阻塞操作。即当队列已满时,阻塞向队列中插入数据的线程,直至队列中未满;当队列为空时,阻塞从队列中获取数据的线程,直至队列非空时为止。关于BlockingQueue更多细节可以看这篇文章。可以利用BlockingQueue实现生产者-消费者为题,阻塞队列完全可以充当共享数据区域,就可以很好的完成生产者和消费者线程之间的协作。

  1. public class ProductorConsumer {
  2. private static LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
  3. public static void main(String[] args) {
  4. ExecutorService service = Executors.newFixedThreadPool(15);
  5. for (int i = 0; i < 5; i++) {
  6. service.submit(new Productor(queue));
  7. }
  8. for (int i = 0; i < 10; i++) {
  9. service.submit(new Consumer(queue));
  10. }
  11. }

static class Productor implements Runnable {

  1. private BlockingQueue queue;
  2. public Productor(BlockingQueue queue) {
  3. this.queue = queue;
  4. }
  5. @Override
  6. public void run() {
  7. try {
  8. while (true) {
  9. Random random = new Random();
  10. int i = random.nextInt();
  11. System.out.println("生产者" + Thread.currentThread().getName() + "生产数据" + i);
  12. queue.put(i);
  13. }
  14. } catch (InterruptedException e) {
  15. e.printStackTrace();
  16. }
  17. }
  18. }
  19. static class Consumer implements Runnable {
  20. private BlockingQueue queue;
  21. public Consumer(BlockingQueue queue) {
  22. this.queue = queue;
  23. }
  24. @Override
  25. public void run() {
  26. try {
  27. while (true) {
  28. Integer element = (Integer) queue.take();
  29. System.out.println("消费者" + Thread.currentThread().getName() + "正在消费数据" + element);
  30. }
  31. } catch (InterruptedException e) {
  32. e.printStackTrace();
  33. }
  34. }
  35. }
  36. }
  37. 输出结果:
  38. 消费者pool-1-thread-7正在消费数据1520577501
  39. 生产者pool-1-thread-4生产数据-127809610
  40. 消费者pool-1-thread-8正在消费数据504316513
  41. 生产者pool-1-thread-2生产数据1994678907
  42. 消费者pool-1-thread-11正在消费数据1967302829
  43. 生产者pool-1-thread-1生产数据369331507
  44. 消费者pool-1-thread-9正在消费数据1994678907
  45. 生产者pool-1-thread-2生产数据-919544017
  46. 消费者pool-1-thread-12正在消费数据-127809610
  47. 生产者pool-1-thread-4生产数据1475197572
  48. 消费者pool-1-thread-14正在消费数据-893487914
  49. 生产者pool-1-thread-3生产数据906921688
  50. 消费者pool-1-thread-6正在消费数据-1292015016
  51. 生产者pool-1-thread-5生产数据-652105379
  52. 生产者pool-1-thread-5生产数据-1622505717
  53. 生产者pool-1-thread-3生产数据-1350268764
  54. 消费者pool-1-thread-7正在消费数据906921688
  55. 生产者pool-1-thread-4生产数据2091628867
  56. 消费者pool-1-thread-13正在消费数据1475197572
  57. 消费者pool-1-thread-15正在消费数据-919544017
  58. 生产者pool-1-thread-2生产数据564860122
  59. 生产者pool-1-thread-2生产数据822954707
  60. 消费者pool-1-thread-14正在消费数据564860122
  61. 消费者pool-1-thread-10正在消费数据369331507
  62. 生产者pool-1-thread-1生产数据-245820912
  63. 消费者pool-1-thread-6正在消费数据822954707
  64. 生产者pool-1-thread-2生产数据1724595968
  65. 生产者pool-1-thread-2生产数据-1151855115
  66. 消费者pool-1-thread-12正在消费数据2091628867
  67. 生产者pool-1-thread-4生产数据-1774364499
  68. 生产者pool-1-thread-4生产数据2006106757
  69. 消费者pool-1-thread-14正在消费数据-1774364499
  70. 生产者pool-1-thread-3生产数据-1070853639
  71. 消费者pool-1-thread-9正在消费数据-1350268764
  72. 消费者pool-1-thread-11正在消费数据-1622505717
  73. 生产者pool-1-thread-5生产数据355412953

可以看出,使用BlockingQueue来实现生产者-消费者很简洁,这正是利用了BlockingQueue插入和获取数据附加阻塞操作的特性。

关于生产者-消费者实现的三中方式,到这里就全部总结出来,如果觉得不错的话,请点赞,也算是给我的鼓励,在此表示感谢!