实现生产者和消费者

1. wait() / notifyAll()

  1. public class ProducerConsumer {
  2. private static class Producer implements Runnable {
  3. private List<Integer> list;
  4. private int capacity;
  5. public Producer(List list, int capacity) {
  6. this.list = list;
  7. this.capacity = capacity;
  8. }
  9. @Override
  10. public void run() {
  11. while (true) {
  12. synchronized (list) {
  13. try {
  14. String producer = Thread.currentThread().getName();
  15. while (list.size() == capacity) {
  16. System.out.println("生产者 " + producer + ":list 已达到最大容量,进行wait");
  17. list.wait();
  18. System.out.println("生产者 " + producer + ":退出 wait");
  19. }
  20. Random random = new Random();
  21. int i = random.nextInt();
  22. System.out.println("生产者 " + producer + ":生产数据" + i);
  23. list.add(i);
  24. list.notifyAll();
  25. } catch (InterruptedException e) {
  26. e.printStackTrace();
  27. }
  28. }
  29. }
  30. }
  31. }
  32. private static class Consumer implements Runnable {
  33. private List<Integer> list;
  34. public Consumer(List list) {
  35. this.list = list;
  36. }
  37. @Override
  38. public void run() {
  39. while (true) {
  40. synchronized (list) {
  41. try {
  42. String consumer = Thread.currentThread().getName();
  43. while (list.isEmpty()) {
  44. System.out.println("消费者 " + consumer + ":list 为空,进行 wait");
  45. list.wait();
  46. System.out.println("消费者 " + consumer + ":退出wait");
  47. }
  48. Integer element = list.remove(0);
  49. System.out.println("消费者 " + consumer + ":消费数据:" + element);
  50. list.notifyAll();
  51. } catch (InterruptedException e) {
  52. e.printStackTrace();
  53. }
  54. }
  55. }
  56. }
  57. }
  58. public static void main(String[] args) {
  59. final LinkedList linkedList = new LinkedList();
  60. final int capacity = 5;
  61. ThreadPoolExecutor executor = new ThreadPoolExecutor(
  62. 5, 10, 1, TimeUnit.SECONDS,
  63. new ArrayBlockingQueue<>(100), new ThreadPoolExecutor.CallerRunsPolicy());
  64. executor.execute(new Producer(linkedList, capacity));
  65. executor.execute(new Consumer(linkedList));
  66. executor.shutdown();
  67. }
  68. }

2. await() / sigalAll()

  1. public class ProducerConsumer {
  2. private static ReentrantLock lock = new ReentrantLock();
  3. private static Condition full = lock.newCondition();
  4. private static Condition empty = lock.newCondition();
  5. private static class Producer implements Runnable{
  6. private List<Integer> list;
  7. private int capacity;
  8. public Producer(List list, int capacity) {
  9. this.list = list;
  10. this.capacity = capacity;
  11. }
  12. @Override
  13. public void run() {
  14. while (true){
  15. lock.lock();
  16. try {
  17. String producer = Thread.currentThread().getName();
  18. while (list.size() == capacity){
  19. System.out.println("生产者 " + producer + ":list 已达到最大容量,进行wait");
  20. full.await();
  21. System.out.println("生产者 " + producer + ":退出 wait");
  22. }
  23. Random random = new Random();
  24. int i = random.nextInt();
  25. System.out.println("生产者 " + producer + ":生产数据" + i);
  26. list.add(i);
  27. empty.signalAll();
  28. } catch (InterruptedException e) {
  29. e.printStackTrace();
  30. } finally {
  31. lock.unlock();
  32. }
  33. }
  34. }
  35. }
  36. private static class Consumer implements Runnable{
  37. private List<Integer> list;
  38. public Consumer(List list) {
  39. this.list = list;
  40. }
  41. @Override
  42. public void run() {
  43. while (true) {
  44. lock.lock();
  45. try {
  46. String consumer = Thread.currentThread().getName();
  47. while (list.isEmpty()) {
  48. System.out.println("消费者 " + consumer + ":list 为空,进行 wait");
  49. empty.await();
  50. System.out.println("消费者 " + consumer + ":退出wait");
  51. }
  52. Integer element = list.remove(0);
  53. System.out.println("消费者 " + consumer + ":消费数据:" + element);
  54. full.signalAll();
  55. } catch (InterruptedException e) {
  56. e.printStackTrace();
  57. } finally {
  58. lock.unlock();
  59. }
  60. }
  61. }
  62. }
  63. public static void main(String[] args) {
  64. final LinkedList linkedList = new LinkedList();
  65. final int capacity = 5;
  66. ThreadPoolExecutor executor = new ThreadPoolExecutor(
  67. 5, 10, 1, TimeUnit.SECONDS,
  68. new ArrayBlockingQueue<>(100), new ThreadPoolExecutor.CallerRunsPolicy());
  69. executor.execute(new Producer(linkedList,capacity));
  70. executor.execute(new Consumer(linkedList));
  71. }
  72. }

3. 阻塞队列

  1. public class ProducerConsumer {
  2. private static class Producer implements Runnable {
  3. private BlockingQueue<Integer> queue;
  4. public Producer(BlockingQueue<Integer> queue) {
  5. this.queue = queue;
  6. }
  7. @Override
  8. public void run() {
  9. while (true) {
  10. try {
  11. String producer = Thread.currentThread().getName();
  12. Random random = new Random();
  13. int i = random.nextInt();
  14. System.out.println("生产者 " + producer + ":生产数据" + i);
  15. queue.put(i);
  16. } catch (InterruptedException e) {
  17. e.printStackTrace();
  18. }
  19. }
  20. }
  21. }
  22. private static class Consumer implements Runnable {
  23. private BlockingQueue<Integer> queue;
  24. public Consumer(BlockingQueue<Integer> queue) {
  25. this.queue = queue;
  26. }
  27. @Override
  28. public void run() {
  29. while (true) {
  30. try {
  31. String consumer = Thread.currentThread().getName();
  32. Integer element = queue.take();
  33. System.out.println("消费者 " + consumer + ":消费数据:" + element);
  34. } catch (InterruptedException e) {
  35. e.printStackTrace();
  36. }
  37. }
  38. }
  39. }
  40. public static void main(String[] args) {
  41. final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
  42. ThreadPoolExecutor executor = new ThreadPoolExecutor(
  43. 5, 10, 1, TimeUnit.SECONDS,
  44. new ArrayBlockingQueue<>(100), new ThreadPoolExecutor.CallerRunsPolicy());
  45. executor.execute(new Producer(queue));
  46. executor.execute(new Consumer(queue));
  47. executor.shutdown();
  48. }
  49. }