简介

1、具体内容:

在多线程的开发过程之中最为著名的案例就是生产者与消费者操作,该操作的主要流程如下:

  1. 生产者负责信息内容的生产;
  2. 每当生产者生产完成一项完整的信息之后消费者要从这里面取走信息;
  3. 如果生产者没有生产者则消费者要等待它生产完成,如果消费者还没有对信息进行消费,则生产者应该等待消费处理完成后再继续进行生产。

2、程序的基本实现

可以将生产者与消费者定义为两个独立的线程类对象,但是对于现在生产的数据,可以使用如下的组成:

  • 数据一: title = 王建、content = 宇宙大帅哥;
  • 数据二: title = 小高、content = 猥琐第一人;

既然生产者与消费者是两个独立的线程,那么这两个独立的线程之间就需要有一个数据的保存集中点,那么可以单独定义一个 Message 类实现数据的保存。

生产者与消费者

生产者与消费者基本程序模型 - 图1

范例: 程序基本结构

  1. package com.hanliukui;
  2. class Message{
  3. private String title;
  4. private String content;
  5. public String getTitle() {
  6. return title;
  7. }
  8. public void setTitle(String title) {
  9. this.title = title;
  10. }
  11. public String getContent() {
  12. return content;
  13. }
  14. public void setContent(String content) {
  15. this.content = content;
  16. }
  17. }
  18. class Producer implements Runnable{
  19. private Message message;
  20. public Producer(Message message){
  21. this.message = message;
  22. }
  23. @Override
  24. public void run() {
  25. for (int i = 0; i < 100; i++) {
  26. try {
  27. if (i % 2==0) {
  28. message.setTitle("王建");
  29. Thread.sleep(1000);
  30. message.setContent("宇宙大帅哥");
  31. }else {
  32. message.setTitle("小高");
  33. Thread.sleep(1000);
  34. message.setContent(" 猥琐第一人");
  35. }
  36. } catch (InterruptedException e) {
  37. e.printStackTrace();
  38. }
  39. }
  40. }
  41. }
  42. class Consumer implements Runnable{
  43. private Message message;
  44. public Consumer(Message message){
  45. this.message = message;
  46. }
  47. @Override
  48. public void run() {
  49. for (int i = 0; i < 100; i++) {
  50. try {
  51. String title = message.getTitle();
  52. Thread.sleep(1000);
  53. String content = message.getContent();
  54. System.out.println("消费:"+title+"--"+content);
  55. } catch (InterruptedException e) {
  56. e.printStackTrace();
  57. }
  58. }
  59. }
  60. }
  61. public class ThreadDemo01 {
  62. public static void main(String[] args) {
  63. Message message = new Message();
  64. new Thread(new Producer(message)).start();
  65. new Thread(new Consumer(message)).start();
  66. }
  67. }

image.png
通过整个代码的执行你会发现此时有两个主要问题:

  • 问题一:数据不同步了;
  • 问题二: 生产一个取走一个,但是发现有了重复生产和重复取出问题。

解决生产者 - 消费者同步问题

解决数据同步

如果要解决问题,首先解决的就是数据同步的处理问题,如果要想解决数据同步最简单的做法是使用 synchronized 关键字定义同步代码块或同步方法,于是这个时候对于同步的处理就可以直接在 Message 类中完成。

范例: 解决同步操作

  1. package com.hanliukui;
  2. class Message{
  3. private String title;
  4. private String content;
  5. public synchronized void set(String title,String content){
  6. this.title=title;
  7. this.content=content;
  8. }
  9. public synchronized String get() {
  10. return title+"--"+content;
  11. }
  12. }
  13. class Producer implements Runnable{
  14. private Message message;
  15. public Producer(Message message){
  16. this.message = message;
  17. }
  18. @Override
  19. public void run() {
  20. for (int i = 0; i < 100; i++) {
  21. try {
  22. if (i % 2==0) {
  23. message.set("王建","宇宙大帅哥");
  24. Thread.sleep(1000);
  25. }else {
  26. Thread.sleep(1000);
  27. message.set("小高","猥琐第一人");
  28. }
  29. } catch (InterruptedException e) {
  30. e.printStackTrace();
  31. }
  32. }
  33. }
  34. }
  35. class Consumer implements Runnable{
  36. private Message message;
  37. public Consumer(Message message){
  38. this.message = message;
  39. }
  40. @Override
  41. public void run() {
  42. for (int i = 0; i < 100; i++) {
  43. try {
  44. String message = this.message.get();
  45. Thread.sleep(1000);
  46. System.out.println("消费:"+message);
  47. } catch (InterruptedException e) {
  48. e.printStackTrace();
  49. }
  50. }
  51. }
  52. }
  53. public class ThreadDemo01 {
  54. public static void main(String[] args) {
  55. Message message = new Message();
  56. new Thread(new Producer(message)).start();
  57. new Thread(new Consumer(message)).start();
  58. }
  59. }

image.png
在进行同步处理的时候肯定需要有一个同步的处理对象,那么此时肯定要将同步操作交由 Message 类处理是最合适的。

这个时候发现数据已经可以正常的保持一致了,但是对于重复操作的问题依然存在。

等待与唤醒机制

如果说现在要想解决生产者与消费者的问题,那么最好的解决方案就是使用等待与唤醒机制,而对于等待与唤醒的操作机制,主要依靠的是 Object 类中提供的方法处理的。

利用 Object 类解决重复操作

线程等待与唤醒

如果说现在要想解决生产者与消费者的问题,那么最好的解决方案就是使用等待与. 唤醒机制,而对于等待与唤醒的操作机制,主要依靠的是 Object 类中提供的方法处理的:

  1. // 等待
  2. public final void wait() throws InterruptedException; //死等
  3. public final void wait(long timeout) throws InterruptedException; //设置等待时间
  4. public final void wait(long timeout, int nanos) throws InterruptedException; //设置等待时间
  5. public final void notify(); // 唤醒第一个等待线程
  6. public final void notifyAll(); // 唤醒全部等待线程

如果此时有若干个等待线程的话,那么 notify()表示的是唤醒第一个等待的,而其它的线程继续等待. 而 notifyAll()表示醒所有等待的线程,哪个线程的优先级高就有可能先执行。

对于当前的问题主要的解决应该通过 Message 类完成处理。

范例: 修改 Message 类:

  1. package com.hanliukui;
  2. class Message {
  3. private String title;
  4. private String content;
  5. // 用来标记状态,
  6. // true 可以消费,不能生产
  7. // false 可以生产,不能消费
  8. private boolean flag = false;
  9. public synchronized void set(String title, String content) {
  10. try {
  11. if (flag) {
  12. super.wait();
  13. } else {
  14. Thread.sleep(1000);
  15. this.title = title;
  16. this.content = content;
  17. System.out.println("生产:"+title+"--"+content);
  18. flag = true;
  19. super.notify();
  20. }
  21. } catch (InterruptedException e) {
  22. e.printStackTrace();
  23. }
  24. }
  25. public synchronized void get() {
  26. try {
  27. if (flag) {
  28. Thread.sleep(1000);
  29. System.out.println("消费:"+title + "--" + content);
  30. flag = false;
  31. super.notify();
  32. } else {
  33. super.wait();
  34. }
  35. } catch (InterruptedException e) {
  36. e.printStackTrace();
  37. }
  38. }
  39. }
  40. class Producer implements Runnable {
  41. private Message message;
  42. public Producer(Message message) {
  43. this.message = message;
  44. }
  45. @Override
  46. public void run() {
  47. for (int i = 0; i < 100; i++) {
  48. if (i % 2 == 0) {
  49. message.set("王建"+i, "宇宙大帅哥");
  50. } else {
  51. message.set("小高"+i, "猥琐第一人");
  52. }
  53. }
  54. }
  55. }
  56. class Consumer implements Runnable {
  57. private Message message;
  58. public Consumer(Message message) {
  59. this.message = message;
  60. }
  61. @Override
  62. public void run() {
  63. for (int i = 0; i < 100; i++) {
  64. this.message.get();
  65. }
  66. }
  67. }
  68. public class ThreadDemo01 {
  69. public static void main(String[] args) {
  70. Message message = new Message();
  71. new Thread(new Producer(message)).start();
  72. new Thread(new Consumer(message)).start();
  73. }
  74. }

注:这种处理形式就是在进行多线程开发过程之中最原始的处理方案,整个的等待、同步唤醒机制都有开发者自行通过原生代码实现控制。