1 介绍

image.png

方法名 作用
final void wait() 表示线程一直等待,直到其它线程通知
void wait(long timeout) 线程等待指定毫秒参数的时间
final void wait(long timeout,intnanos) 线程等待指定毫秒、微妙的时间
final void notify() 唤醒一个处于等待状态的线程
final void notify() 唤醒一个处于等待状态的线程
final void notifyAll() 唤醒同一个对象上所有调用wait()方法的线程,优先级别高的线程优先运行

均是java.lang.Object类的方法都只能在同步方法或者同步代码块中使用,否则会抛出异常

2 生产者/消费者

2.1 synchronize实现

  1. public class ProductConsumeBySyncTest {
  2. public static void main(String[] args) {
  3. DealData data = new DealData();
  4. new Thread(() -> {
  5. data.product();
  6. }, "线程1").start();
  7. new Thread(() -> {
  8. data.consume();
  9. }, "线程2").start();
  10. new Thread(() -> {
  11. data.product();
  12. }, "线程3").start();
  13. new Thread(() -> {
  14. data.consume();
  15. }, "线程4").start();
  16. try {
  17. Thread.sleep(10000);
  18. } catch (InterruptedException e) {
  19. e.printStackTrace();
  20. }
  21. data.stop();
  22. }
  23. }
  24. class DealData {
  25. private volatile boolean flag = true;
  26. public static final int MAX_COUNT = 10;
  27. public static AtomicInteger atomicInteger = new AtomicInteger();
  28. private static final List<Integer> queue = new ArrayList<>();
  29. public void product() {
  30. while (flag) {
  31. try {
  32. Thread.sleep(1000);
  33. } catch (InterruptedException e) {
  34. e.printStackTrace();
  35. }
  36. synchronized (queue) {
  37. //池子满了,生产者停止生产
  38. //埋个坑,这里用的if
  39. //TODO 判断
  40. while (queue.size() == MAX_COUNT) {
  41. System.out.println("生产已达上限, 等待消费中...");
  42. try {
  43. queue.wait();
  44. } catch (InterruptedException e) {
  45. e.printStackTrace();
  46. }
  47. }
  48. //干活
  49. queue.add(atomicInteger.incrementAndGet());
  50. System.out.println("生产者生产了:" + atomicInteger.get() + "\t" + "生产队列剩余:" + queue.size());
  51. //通知
  52. queue.notifyAll();
  53. }
  54. }
  55. }
  56. public void consume() {
  57. while (flag) {
  58. try {
  59. Thread.sleep(1000);
  60. } catch (InterruptedException e) {
  61. e.printStackTrace();
  62. }
  63. synchronized (queue) {
  64. while (queue.size() == 0) {
  65. System.out.println("没有可消费产品,等待生产中...");
  66. try {
  67. queue.wait();
  68. } catch (InterruptedException e) {
  69. e.printStackTrace();
  70. }
  71. }
  72. int c = queue.get(0);
  73. queue.remove(0);
  74. System.out.println("消费者消费了" + c + ";消费队列队列剩余:" + queue.size());
  75. queue.notifyAll();
  76. }
  77. }
  78. }
  79. public void stop() {
  80. this.flag = false;
  81. }
  82. }

输出:

  1. 没有可消费产品,等待生产中...
  2. 生产者生产了:1 生产队列剩余:1
  3. 生产者生产了:2 生产队列剩余:2
  4. 消费者消费了1;消费队列队列剩余:1
  5. 消费者消费了2;消费队列队列剩余:0
  6. 生产者生产了:3 生产队列剩余:1
  7. 生产者生产了:4 生产队列剩余:2
  8. 消费者消费了3;消费队列队列剩余:1
  9. 消费者消费了4;消费队列队列剩余:0
  10. 生产者生产了:5 生产队列剩余:1
  11. 消费者消费了5;消费队列队列剩余:0
  12. 没有可消费产品,等待生产中...
  13. 生产者生产了:6 生产队列剩余:1
  14. 消费者消费了6;消费队列队列剩余:0
  15. 生产者生产了:7 生产队列剩余:1
  16. 消费者消费了7;消费队列队列剩余:0
  17. 没有可消费产品,等待生产中...
  18. 生产者生产了:8 生产队列剩余:1
  19. 消费者消费了8;消费队列队列剩余:0
  20. 生产者生产了:9 生产队列剩余:1
  21. 生产者生产了:10 生产队列剩余:2
  22. 消费者消费了9;消费队列队列剩余:1
  23. 消费者消费了10;消费队列队列剩余:0
  24. 没有可消费产品,等待生产中...
  25. 没有可消费产品,等待生产中...
  26. 生产者生产了:11 生产队列剩余:1
  27. 生产者生产了:12 生产队列剩余:2
  28. 消费者消费了11;消费队列队列剩余:1
  29. 消费者消费了12;消费队列队列剩余:0
  30. 没有可消费产品,等待生产中...
  31. 生产者生产了:13 生产队列剩余:1
  32. 消费者消费了13;消费队列队列剩余:0
  33. 生产者生产了:14 生产队列剩余:1
  34. 消费者消费了14;消费队列队列剩余:0
  35. 生产者生产了:15 生产队列剩余:1
  36. 消费者消费了15;消费队列队列剩余:0
  37. 没有可消费产品,等待生产中...
  38. 生产者生产了:16 生产队列剩余:1
  39. 消费者消费了16;消费队列队列剩余:0
  40. 生产者生产了:17 生产队列剩余:1
  41. 消费者消费了17;消费队列队列剩余:0
  42. 生产者生产了:18 生产队列剩余:1
  43. 消费者消费了18;消费队列队列剩余:0
  44. 没有可消费产品,等待生产中...
  45. 没有可消费产品,等待生产中...
  46. 生产者生产了:19 生产队列剩余:1
  47. 生产者生产了:20 生产队列剩余:2
  48. 消费者消费了19;消费队列队列剩余:1
  49. 消费者消费了20;消费队列队列剩余:0

2.2 通过Lock实现

  1. public class ProductConsumeByLockTest {
  2. public static void main(String[] args) {
  3. DealDataByLock data = new DealDataByLock();
  4. new Thread(() -> {
  5. Arrays.asList(1, 2, 3, 4, 5).stream().forEach(i -> data.product());
  6. }, "线程1").start();
  7. new Thread(() -> {
  8. Arrays.asList(1, 2, 3, 4, 5).stream().forEach(i -> data.consume());
  9. }, "线程2").start();
  10. new Thread(() -> {
  11. Arrays.asList(1, 2, 3, 4, 5).stream().forEach(i -> data.product());
  12. }, "线程3").start();
  13. new Thread(() -> {
  14. Arrays.asList(1, 2, 3, 4, 5).stream().forEach(i -> data.consume());
  15. }, "线程4").start();
  16. }
  17. }
  18. class DealDataByLock {
  19. private int number = 0;
  20. private Lock lock = new ReentrantLock();
  21. private Condition condition = lock.newCondition();
  22. public void product() {
  23. lock.lock();
  24. try {
  25. while (number != 0)
  26. condition.await();
  27. number++;
  28. System.out.println(Thread.currentThread().getName() + " 生产者生产了number=" + number);
  29. condition.signalAll();
  30. } catch (Exception e) {
  31. e.printStackTrace();
  32. } finally {
  33. lock.unlock();
  34. }
  35. }
  36. public void consume() {
  37. lock.lock();
  38. try {
  39. while (number == 0)
  40. condition.await();
  41. number--;
  42. System.out.println(Thread.currentThread().getName() + " 消费者消费了number=" + number);
  43. condition.signalAll();
  44. } catch (Exception e) {
  45. e.printStackTrace();
  46. } finally {
  47. lock.unlock();
  48. }
  49. }
  50. }

输出:

  1. 线程3 生产者生产了number=1
  2. 线程4 消费者消费了number=0
  3. 线程1 生产者生产了number=1
  4. 线程4 消费者消费了number=0
  5. 线程1 生产者生产了number=1
  6. 线程4 消费者消费了number=0
  7. 线程1 生产者生产了number=1
  8. 线程4 消费者消费了number=0
  9. 线程1 生产者生产了number=1
  10. 线程4 消费者消费了number=0
  11. 线程1 生产者生产了number=1
  12. 线程2 消费者消费了number=0
  13. 线程3 生产者生产了number=1
  14. 线程2 消费者消费了number=0
  15. 线程3 生产者生产了number=1
  16. 线程2 消费者消费了number=0
  17. 线程3 生产者生产了number=1
  18. 线程2 消费者消费了number=0
  19. 线程3 生产者生产了number=1
  20. 线程2 消费者消费了number=0

2.3 通过阻塞队列

  1. public class ProductConsumeByBlockTest {
  2. public static void main(String[] args) {
  3. DealDataByBlock data = new DealDataByBlock();
  4. new Thread(() -> {
  5. data.product();
  6. }, "线程1").start();
  7. new Thread(() -> {
  8. data.consume();
  9. }, "线程2").start();
  10. /* new Thread(() -> {
  11. data.product();
  12. }, "线程3").start();*/
  13. new Thread(() -> {
  14. data.consume();
  15. }, "线程4").start();
  16. try {
  17. Thread.sleep(5000);
  18. } catch (InterruptedException e) {
  19. e.printStackTrace();
  20. }
  21. data.stop();
  22. }
  23. }
  24. class DealDataByBlock {
  25. public static final int MAX_COUNT = 10;
  26. private static BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(MAX_COUNT);
  27. private static boolean flag = true;
  28. private AtomicInteger atomicInteger = new AtomicInteger();
  29. public void product() {
  30. while (flag) {
  31. //offer:在该队列的尾部插入指定的元素,在队列已满的情况下等待指定的等待时间,等待空间变为可用。
  32. boolean retvalue = false;
  33. try {
  34. retvalue = queue.offer(atomicInteger.incrementAndGet(), 2, TimeUnit.SECONDS);
  35. } catch (InterruptedException e) {
  36. e.printStackTrace();
  37. }
  38. if (retvalue == true) {
  39. System.out.println(Thread.currentThread().getName() + "\t 插入队列" + atomicInteger.get() + "成功" + "生产队列大小= " + queue.size());
  40. } else {
  41. System.out.println(Thread.currentThread().getName() + "\t 插入队列" + atomicInteger.get() + "失败" + "生产队列大小= " + queue.size());
  42. }
  43. try {
  44. TimeUnit.SECONDS.sleep(1);
  45. } catch (InterruptedException e) {
  46. e.printStackTrace();
  47. }
  48. }
  49. }
  50. public void consume() {
  51. Integer result = null;
  52. while (flag) {
  53. try {
  54. //poll:检索并删除此队列的头,如果需要元素变为可用,则等待指定的等待时间
  55. result = queue.poll(2, TimeUnit.SECONDS);
  56. } catch (InterruptedException e) {
  57. e.printStackTrace();
  58. }
  59. if (null == result) {
  60. System.out.println("超过两秒没有取道数据,消费者即将退出");
  61. return;
  62. }
  63. System.out.println(Thread.currentThread().getName() + "\t 消费" + result + "成功" + "\t\t" + "消费队列大小= " + queue.size());
  64. try {
  65. Thread.sleep(1500);
  66. } catch (InterruptedException e) {
  67. e.printStackTrace();
  68. }
  69. }
  70. }
  71. public void stop() {
  72. this.flag = false;
  73. }
  74. }

输出(一个生产者俩消费者):

  1. 线程1 插入队列1成功生产队列大小= 1
  2. 线程2 消费1成功 消费队列大小= 0
  3. 线程4 消费2成功 消费队列大小= 0
  4. 线程1 插入队列2成功生产队列大小= 0
  5. 线程1 插入队列3成功生产队列大小= 1
  6. 线程2 消费3成功 消费队列大小= 0
  7. 线程1 插入队列4成功生产队列大小= 1
  8. 线程4 消费4成功 消费队列大小= 0
  9. 线程2 消费5成功 消费队列大小= 0
  10. 线程1 插入队列5成功生产队列大小= 0
  11. 超过两秒没有取道数据,消费者即将退出