Java

问题

有两个线程,A 线程向一个集合里面依次添加元素“abc”字符串,一共添加十次,当添加到第五次的时候,希望 B 线程能够收到 A 线程的通知,然后 B 线程执行相关的业务操作。线程间通信的模型有两种:共享内存和消息传递,以下方式都是基本这两种模型来实现的。

一、使用 volatile 关键字

基于 volatile 关键字来实现线程间相互通信是使用共享内存的思想。大致意思就是多个线程同时监听一个变量,当这个变量发生变化的时候 ,线程能够感知并执行相应的业务。这也是最简单的一种实现方式

  1. public class TestSync {
  2. //定义共享变量来实现通信,它需要volatile修饰,否则线程不能及时感知
  3. static volatile boolean notice = false;
  4. public static void main(String[] args) {
  5. List<String> list = new ArrayList<>();
  6. //线程A
  7. Thread threadA = new Thread(() -> {
  8. for (int i = 1; i <= 10; i++) {
  9. list.add("abc");
  10. System.out.println("线程A添加元素,此时list的size为:" + list.size());
  11. try {
  12. Thread.sleep(500);
  13. } catch (InterruptedException e) {
  14. e.printStackTrace();
  15. }
  16. if (list.size() == 5)
  17. notice = true;
  18. }
  19. });
  20. //线程B
  21. Thread threadB = new Thread(() -> {
  22. while (true) {
  23. if (notice) {
  24. System.out.println("线程B收到通知,开始执行自己的业务...");
  25. break;
  26. }
  27. }
  28. });
  29. //需要先启动线程B
  30. threadB.start();
  31. try {
  32. Thread.sleep(1000);
  33. } catch (InterruptedException e) {
  34. e.printStackTrace();
  35. }
  36. // 再启动线程A
  37. threadA.start();
  38. }
  39. }

二、使用 Object 类的 wait()/notify()

Object 类提供了线程间通信的方法:wait()notify()notifyAll(),它们是多线程通信的基础,而这种实现方式的思想自然是线程间通信。
注意:wait/notify 必须配合 synchronized 使用,wait 方法释放锁,notify 方法不释放锁。wait 是指在一个已经进入了同步锁的线程内,让自己暂时让出同步锁,以便其他正在等待此锁的线程可以得到同步锁并运行,只有其他线程调用了notify()notify并不释放锁,只是告诉调用过wait()的线程可以去参与获得锁的竞争了,但不是马上得到锁,因为锁还在别人手里,别人还没释放,调用 wait() 的一个或多个线程就会解除 wait 状态,重新参与竞争对象锁,程序如果可以再次得到锁,就可以继续向下运行。

  1. public class TestSync {
  2. public static void main(String[] args) {
  3. //定义一个锁对象
  4. Object lock = new Object();
  5. List<String> list = new ArrayList<>();
  6. // 线程A
  7. Thread threadA = new Thread(() -> {
  8. synchronized (lock) {
  9. for (int i = 1; i <= 10; i++) {
  10. list.add("abc");
  11. System.out.println("线程A添加元素,此时list的size为:" + list.size());
  12. try {
  13. Thread.sleep(500);
  14. } catch (InterruptedException e) {
  15. e.printStackTrace();
  16. }
  17. if (list.size() == 5)
  18. lock.notify();//唤醒B线程
  19. }
  20. }
  21. });
  22. //线程B
  23. Thread threadB = new Thread(() -> {
  24. while (true) {
  25. synchronized (lock) {
  26. if (list.size() != 5) {
  27. try {
  28. lock.wait();
  29. } catch (InterruptedException e) {
  30. e.printStackTrace();
  31. }
  32. }
  33. System.out.println("线程B收到通知,开始执行自己的业务...");
  34. }
  35. }
  36. });
  37. //需要先启动线程B
  38. threadB.start();
  39. try {
  40. Thread.sleep(1000);
  41. } catch (InterruptedException e) {
  42. e.printStackTrace();
  43. }
  44. //再启动线程A
  45. threadA.start();
  46. }
  47. }

由输出结果,在线程 A 发出 notify() 唤醒通知之后,依然是走完了自己线程的业务之后,线程 B 才开始执行,正好说明 notify() 不释放锁,而 wait() 释放锁。

三、使用JUC工具类 CountDownLatch

jdk1.5 之后在java.util.concurrent包下提供了很多并发编程相关的工具类,简化了并发编程代码的书写,CountDownLatch 基于 AQS 框架,相当于也是维护了一个线程间共享变量 state。

  1. public class TestSync {
  2. public static void main(String[] args) {
  3. CountDownLatch countDownLatch = new CountDownLatch(1);
  4. List<String> list = new ArrayList<>();
  5. //线程A
  6. Thread threadA = new Thread(() -> {
  7. for (int i = 1; i <= 10; i++) {
  8. list.add("abc");
  9. System.out.println("线程A添加元素,此时list的size为:" + list.size());
  10. try {
  11. Thread.sleep(500);
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. }
  15. if (list.size() == 5)
  16. countDownLatch.countDown();
  17. }
  18. });
  19. //线程B
  20. Thread threadB = new Thread(() -> {
  21. while (true) {
  22. if (list.size() != 5) {
  23. try {
  24. countDownLatch.await();
  25. } catch (InterruptedException e) {
  26. e.printStackTrace();
  27. }
  28. }
  29. System.out.println("线程B收到通知,开始执行自己的业务...");
  30. break;
  31. }
  32. });
  33. //需要先启动线程B
  34. threadB.start();
  35. try {
  36. Thread.sleep(1000);
  37. } catch (InterruptedException e) {
  38. e.printStackTrace();
  39. }
  40. //再启动线程A
  41. threadA.start();
  42. }
  43. }

四、使用 ReentrantLock 结合 Condition

  1. public class TestSync {
  2. public static void main(String[] args) {
  3. ReentrantLock lock = new ReentrantLock();
  4. Condition condition = lock.newCondition();
  5. List<String> list = new ArrayList<>();
  6. //线程A
  7. Thread threadA = new Thread(() -> {
  8. lock.lock();
  9. for (int i = 1; i <= 10; i++) {
  10. list.add("abc");
  11. System.out.println("线程A添加元素,此时list的size为:" + list.size());
  12. try {
  13. Thread.sleep(500);
  14. } catch (InterruptedException e) {
  15. e.printStackTrace();
  16. }
  17. if (list.size() == 5)
  18. condition.signal();
  19. }
  20. lock.unlock();
  21. });
  22. //线程B
  23. Thread threadB = new Thread(() -> {
  24. lock.lock();
  25. if (list.size() != 5) {
  26. try {
  27. condition.await();
  28. } catch (InterruptedException e) {
  29. e.printStackTrace();
  30. }
  31. }
  32. System.out.println("线程B收到通知,开始执行自己的业务...");
  33. lock.unlock();
  34. });
  35. threadB.start();
  36. try {
  37. Thread.sleep(1000);
  38. } catch (InterruptedException e) {
  39. e.printStackTrace();
  40. }
  41. threadA.start();
  42. }
  43. }

这种方式使用起来并不是很好,代码编写复杂,而且线程 B 在被 A 唤醒之后由于没有获取锁还是不能立即执行,也就是说,A 在唤醒操作之后,并不释放锁。这种方法跟 Object 的 wait()/notify() 一样。

五、基本 LockSupport 实现线程间的阻塞和唤醒

LockSupport 是一种非常灵活的实现线程间阻塞和唤醒的工具,使用它不用关注是等待线程先进行还是唤醒线程先运行,但是得知道线程的名字。

  1. public class TestSync {
  2. public static void main(String[] args) {
  3. List<String> list = new ArrayList<>();
  4. //线程B
  5. final Thread threadB = new Thread(() -> {
  6. if (list.size() != 5) {
  7. LockSupport.park();
  8. }
  9. System.out.println("线程B收到通知,开始执行自己的业务...");
  10. });
  11. //线程A
  12. Thread threadA = new Thread(() -> {
  13. for (int i = 1; i <= 10; i++) {
  14. list.add("abc");
  15. System.out.println("线程A添加元素,此时list的size为:" + list.size());
  16. try {
  17. Thread.sleep(500);
  18. } catch (InterruptedException e) {
  19. e.printStackTrace();
  20. }
  21. if (list.size() == 5)
  22. LockSupport.unpark(threadB);
  23. }
  24. });
  25. threadA.start();
  26. threadB.start();
  27. }
  28. }