概述

生产者消费者模式是一个多线程协作的模式,在这个模式中,一部分线程负责生产数据、一部分线程负责消费数据,通过缓冲区完成生产者与消费者的解耦,缓冲区相当于数据仓库。
image.png

实现方式

wait/notify

  1. //数据仓库,缓冲区
  2. public class DataBuffer {
  3. List<String> dataBuffer = new LinkedList<>();
  4. public void add(String data){
  5. dataBuffer.add(data);
  6. }
  7. public String get(){
  8. String data = null;
  9. if(dataBuffer.size() > 0){
  10. data = dataBuffer.get(0);
  11. if(data != null){
  12. dataBuffer.remove(data);
  13. }
  14. }
  15. return data;
  16. }
  17. }
  18. //生产者
  19. public class Producter implements Runnable {
  20. private DataBuffer dataBuffer;
  21. AtomicInteger atomicInteger = new AtomicInteger(0);
  22. public Producter(DataBuffer dataBuffer){
  23. this.dataBuffer = dataBuffer;
  24. }
  25. @Override
  26. public void run() {
  27. while (true){
  28. synchronized (dataBuffer){
  29. atomicInteger.incrementAndGet();
  30. dataBuffer.add("a" + atomicInteger.intValue());
  31. System.out.println("生产数据,并唤醒等待线程");
  32. dataBuffer.notifyAll();
  33. try {
  34. //Thread.sleep(100L);
  35. }catch (Exception e) {
  36. }
  37. }
  38. }
  39. }
  40. }
  41. //消费者
  42. public class Consumer implements Runnable {
  43. private DataBuffer dataBuffer;
  44. public Consumer(DataBuffer dataBuffer){
  45. this.dataBuffer = dataBuffer;
  46. }
  47. @Override
  48. public void run() {
  49. while (true){
  50. synchronized (dataBuffer){
  51. String a = dataBuffer.get();
  52. if(a == null){
  53. try {
  54. dataBuffer.wait();
  55. }catch (Exception e){
  56. e.printStackTrace();
  57. }
  58. }else {
  59. System.out.println("消费的数据为:" + a);
  60. }
  61. }
  62. }
  63. }
  64. }
  65. //测试类
  66. public class TestOne {
  67. public static void main(String[] args){
  68. DataBuffer dataBuffer = new DataBuffer();
  69. new Thread(new Producter(dataBuffer)).start();
  70. new Thread(new Consumer(dataBuffer)).start();
  71. new Thread(new Producter(dataBuffer)).start();
  72. new Thread(new Consumer(dataBuffer)).start();
  73. new Thread(new Producter(dataBuffer)).start();
  74. new Thread(new Consumer(dataBuffer)).start();
  75. }
  76. }

BlockingQueue

  1. //数据缓冲区
  2. public class DataBuffer {
  3. BlockingQueue<String> dataBuffer = new LinkedBlockingQueue<>(20);
  4. public void add(String data){
  5. try {
  6. dataBuffer.put(data);
  7. }catch (Exception e){
  8. }
  9. }
  10. public String get(){
  11. String data = null;
  12. try {
  13. data = dataBuffer.take();
  14. }catch (Exception e){
  15. }
  16. return data;
  17. }
  18. }
  19. //生产者
  20. public class Producter implements Runnable {
  21. private DataBuffer dataBuffer;
  22. AtomicInteger atomicInteger = new AtomicInteger(0);
  23. public Producter(DataBuffer dataBuffer){
  24. this.dataBuffer = dataBuffer;
  25. }
  26. @Override
  27. public void run() {
  28. int b = atomicInteger.incrementAndGet();
  29. dataBuffer.add("a" + b);
  30. System.out.println("生产数据 - a" + b);
  31. }
  32. }
  33. //消费者
  34. public class Consumer implements Runnable {
  35. private DataBuffer dataBuffer;
  36. public Consumer(DataBuffer dataBuffer){
  37. this.dataBuffer = dataBuffer;
  38. }
  39. @Override
  40. public void run() {
  41. while (true){
  42. String a = dataBuffer.get();
  43. System.out.println("消费的数据为:" + a);
  44. }
  45. }
  46. }
  47. //测试类
  48. public class TestTwo {
  49. public static void main(String[] args){
  50. DataBuffer dataBuffer = new DataBuffer();
  51. new Thread(new Producter(dataBuffer)).start();
  52. new Thread(new Consumer(dataBuffer)).start();
  53. new Thread(new Producter(dataBuffer)).start();
  54. new Thread(new Consumer(dataBuffer)).start();
  55. new Thread(new Producter(dataBuffer)).start();
  56. new Thread(new Consumer(dataBuffer)).start();
  57. }
  58. }

ReentrantLock&Condition

  1. public class DataBuffer {
  2. List<String> dataBuffer = new LinkedList<>();
  3. Lock lock = new ReentrantLock();
  4. Condition condition = lock.newCondition();
  5. public Lock getLock(){
  6. return lock;
  7. }
  8. public void add(String data){
  9. dataBuffer.add(data);
  10. }
  11. public String get(){
  12. String data = null;
  13. if(dataBuffer.size() > 0){
  14. data = dataBuffer.get(0);
  15. if(data != null){
  16. dataBuffer.remove(data);
  17. }
  18. }
  19. return data;
  20. }
  21. }
  22. public class Producter implements Runnable {
  23. private DataBuffer dataBuffer;
  24. AtomicInteger atomicInteger = new AtomicInteger(0);
  25. public Producter(DataBuffer dataBuffer){
  26. this.dataBuffer = dataBuffer;
  27. }
  28. @Override
  29. public void run() {
  30. while (true){
  31. Lock lock = dataBuffer.getLock();
  32. try {
  33. lock.lock();
  34. atomicInteger.incrementAndGet();
  35. dataBuffer.add("a" + atomicInteger.intValue());
  36. System.out.println("生产数据,并唤醒等待线程");
  37. dataBuffer.condition.signalAll();
  38. //Thread.sleep(100L);
  39. }catch (Exception e) {
  40. }finally {
  41. lock.unlock();
  42. }
  43. }
  44. }
  45. }
  46. public class Consumer implements Runnable {
  47. private DataBuffer dataBuffer;
  48. public Consumer(DataBuffer dataBuffer){
  49. this.dataBuffer = dataBuffer;
  50. }
  51. @Override
  52. public void run() {
  53. while (true){
  54. Lock lock = dataBuffer.getLock();
  55. try {
  56. lock.lock();
  57. String a = dataBuffer.get();
  58. if(a == null){
  59. try {
  60. dataBuffer.condition.await();
  61. }catch (Exception e){
  62. e.printStackTrace();
  63. }
  64. }else {
  65. System.out.println("消费的数据为:" + a);
  66. }
  67. }catch (Exception e){
  68. }finally {
  69. lock.unlock();
  70. }
  71. }
  72. }
  73. }
  74. public class TestOne {
  75. public static void main(String[] args){
  76. DataBuffer dataBuffer = new DataBuffer();
  77. new Thread(new Producter(dataBuffer)).start();
  78. new Thread(new Consumer(dataBuffer)).start();
  79. new Thread(new Producter(dataBuffer)).start();
  80. new Thread(new Consumer(dataBuffer)).start();
  81. new Thread(new Producter(dataBuffer)).start();
  82. new Thread(new Consumer(dataBuffer)).start();
  83. }
  84. }