集合不安全解决办法

  1. package com.lyj.demo;
  2. import org.junit.jupiter.api.Test;
  3. import org.springframework.boot.test.context.SpringBootTest;
  4. import java.util.List;
  5. import java.util.UUID;
  6. import java.util.concurrent.CopyOnWriteArrayList;
  7. /**
  8. * @program: java-test-demo
  9. * @Date: 2021/8/11 7:34
  10. * @Author: 凌兮
  11. * @Description:
  12. */
  13. @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE)
  14. public class JUCTest {
  15. /**
  16. * List非线程安全的,底层add方法没有synchronized
  17. * JUC:就是java.util.concurrent包
  18. */
  19. public static void main(String [] args) {
  20. // 方式1:list arrayList是非线程安全集合
  21. // 报错:Exception in thread "1" Exception in thread "2"
  22. // Exception in thread "0" java.util.ConcurrentModificationException
  23. // List<String> list = new ArrayList<>();
  24. //方式2:使用vector线程安全的集合,底层add方法有synchronized,该方式比较古老,jdk1.0时就有了。
  25. // List<String> list = new Vector<>();
  26. // 方式3:使用Collections.synchornizedList方式创建,入参传入一个集合,
  27. // 线程安全的,底层也是用的synchronized,该方式比较古老,jdk1.2就有了
  28. // List<String> list = Collections.synchronizedList(new ArrayList<>());
  29. // 方式4:使用JUC 里的CopyOnWriteArrayList,采用的是写时复制技术,底层采用lock方式
  30. List<String> list = new CopyOnWriteArrayList<>();
  31. for (int i = 0; i < 30; i++) {
  32. new Thread(() -> {
  33. list.add(UUID.randomUUID().toString().substring(0, 8));
  34. System.out.println(list);
  35. }, String.valueOf(i)).start();
  36. }
  37. /** ============hashSet线程不安全测试======== */
  38. // hashset非线程安全,报错:Exception in thread "15" Exception
  39. // in thread "19" java.util.ConcurrentModificationException
  40. // Set<String> set = new HashSet<>();
  41. // 解决方式1:CopyOnWriteArraySet,线程安全的,底层用的是lock
  42. Set<String> set = new CopyOnWriteArraySet<>();
  43. // for (int i = 0; i < 30; i++) {
  44. // new Thread(() -> {
  45. // set.add(UUID.randomUUID().toString().substring(0, 8));
  46. // System.out.println(set);
  47. // }, String.valueOf(i)).start();
  48. // }
  49. /** ============hashMap线程不安全测试======== */
  50. // hashmap线程不安全的,报错:Exception in thread "3" java.util.ConcurrentModificationException
  51. // Map<String, String> map = new HashMap<>();
  52. // 解决方式1:ConcurrentHashMap 线程安全的
  53. Map<String, String> map = new ConcurrentHashMap<>();
  54. for (int i = 0; i < 30; i++) {
  55. String key = String.valueOf(i);
  56. new Thread(() -> {
  57. map.put(key, UUID.randomUUID().toString().substring(0, 8));
  58. System.out.println(map);
  59. }, String.valueOf(i)).start();
  60. }
  61. }
  62. }

juc的三个辅助工具类

countDown测试:

需求:5个学生都必须先离开后,才能锁门。相当于多个线程都执行完后

  1. /**
  2. * countDown测试 :
  3. * 5个学生都必须先离开后,才能锁门。相当于多个线程都执行完后,
  4. * countdown值减为0才执行await之后的代码。
  5. * @throws InterruptedException
  6. */
  7. @Test
  8. public void countDownLatchTest() throws InterruptedException {
  9. CountDownLatch countDownLatch = new CountDownLatch(5);
  10. for (int i = 1; i <= 5; i++) {
  11. new Thread(() -> {
  12. System.out.println(Thread.currentThread().getName() + "我离开教室了");
  13. countDownLatch.countDown();
  14. }, String.valueOf(i)).start();
  15. }
  16. countDownLatch.await();
  17. System.out.println(Thread.currentThread().getName() + "班长锁门了,所有人都离开教室了");
  18. }

这个是只有当countdown值减为0才执行await之后的代码
一个线程A可使用CountDownLatch.await()方法阻塞等待,其他线程可调用CountDownLatch.countDown()方法使CountDownLatch数值减1,当CountDownLatch数值为0的时候,线程A可以继续往下执行

循环栅栏cyclicBarrier测试

需求:集齐7个龙珠召唤神龙

  1. /**
  2. * 循环栅栏cyclicBarrier测试
  3. * 案例:集齐7个龙珠召唤神龙
  4. */
  5. @Test
  6. public void cyclicBarrierTest() {
  7. // 创建循环栅栏
  8. CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {
  9. // 只有parties达到7,才会执行该线程,召唤神龙
  10. System.out.println("集齐了7个龙珠召唤神龙");
  11. });
  12. // 集齐龙珠过程,如果i的最大值改为6,则cyclicBarrier会一直等待,不会召唤神龙
  13. for (int i = 1; i <= 7; i++) {
  14. new Thread(() -> {
  15. System.out.println(Thread.currentThread().getName() + "龙珠");
  16. try {
  17. // 等待,parties加1,未达到7个线程时,cyclicBarrier一直会等待。
  18. cyclicBarrier.await();
  19. } catch (InterruptedException e) {
  20. e.printStackTrace();
  21. } catch (BrokenBarrierException e) {
  22. e.printStackTrace();
  23. }
  24. }, String.valueOf(i)).start();
  25. }
  26. }

Semaphore信号量测试

需求:6辆汽车,3个停车位

  1. /**
  2. * Semaphore信号量测试:
  3. * 6辆汽车,3个停车位
  4. */
  5. @Test
  6. public void semaphoreTest() {
  7. // 创建信号量,具有3个许可证
  8. Semaphore semaphore = new Semaphore(3);
  9. for (int i = 1; i <= 6; i++) {
  10. new Thread(() -> {
  11. try {
  12. // 抢占许可证
  13. semaphore.acquire();
  14. System.out.println(Thread.currentThread().getName() + "获得了许可证并停车了");
  15. // 设置随机停车时间
  16. TimeUnit.SECONDS.sleep(new Random().nextInt(5));
  17. System.out.println(Thread.currentThread().getName() + "归还了许可证并离开车库");
  18. } catch (InterruptedException e) {
  19. e.printStackTrace();
  20. } finally {
  21. // 释放许可证,并离开
  22. semaphore.release();
  23. }
  24. }, String.valueOf(i)).start();
  25. }
  26. }

没有许可证无法执行

读写锁:

  1. static class MapCache {
  2. // 创建缓存器 由于是经常进行读写操作,所以设置为volatile
  3. public static volatile Map<String, Object> cache = new HashMap<>();
  4. // 创建读写锁
  5. public static ReadWriteLock lock = new ReentrantReadWriteLock();
  6. // 读锁
  7. public static Lock readLock = lock.readLock();
  8. // 写锁
  9. public static Lock writeLock = lock.writeLock();
  10. /**
  11. * 写操作
  12. * @param key
  13. * @param value
  14. */
  15. public static void put(String key, Object value) {
  16. writeLock.lock();
  17. // 暂停一会
  18. try {
  19. System.out.println(Thread.currentThread().getName() + "正在进行写操作" + key);
  20. TimeUnit.MILLISECONDS.sleep(300);
  21. cache.put(key, value);
  22. System.out.println(Thread.currentThread().getName() + "写入完毕" + key);
  23. } catch (InterruptedException e) {
  24. e.printStackTrace();
  25. } finally {
  26. writeLock.unlock();
  27. }
  28. }
  29. /**
  30. * 读操作
  31. * @param key
  32. * @return
  33. * @throws InterruptedException
  34. */
  35. public static Object get(String key) {
  36. readLock.lock();
  37. Object result = null;
  38. // 暂停一会
  39. try {
  40. System.out.println(Thread.currentThread().getName() + "正在进行读操作" + key);
  41. TimeUnit.MILLISECONDS.sleep(300);
  42. result = cache.get(key);
  43. System.out.println(Thread.currentThread().getName() + "读取完毕" + key);
  44. } catch (InterruptedException e) {
  45. e.printStackTrace();
  46. } finally {
  47. readLock.unlock();
  48. }
  49. return result;
  50. }
  51. }
  52. // 测试
  53. public static void main(String[] args) {
  54. // 多线程写
  55. for (int i = 1; i <= 5; i++) {
  56. final int key = i;
  57. new Thread(() -> {
  58. JUCTest.MapCache.put(key + "", key + "");
  59. }, String.valueOf(i)).start();
  60. }
  61. // 多线程读
  62. for (int i = 1; i <= 5; i++) {
  63. final int key = i;
  64. new Thread(() -> {
  65. JUCTest.MapCache.get(key + "");
  66. }, String.valueOf(i)).start();
  67. }
  68. }

ReadWriteLock提供了一个读写锁(也称为共享锁和排他锁)的机制,多个线程可以同时获取到读锁(共享锁),而同一时刻只能有一个线程获取到写锁(排他锁)。

阻塞队列

  1. /**
  2. * 阻塞队列测试1--抛出异常
  3. */
  4. @Test
  5. public void blockingQueueTest() {
  6. // 创建数组类型的阻塞队列
  7. BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
  8. System.out.println(queue.add("a"));
  9. System.out.println(queue.add("b"));
  10. System.out.println(queue.add("c"));
  11. // 数组类型的阻塞队列,检查元素时,默认是第一个元素
  12. System.out.println(queue.element());
  13. // 再次添加会报错,queue full
  14. // System.out.println(queue.add("d"));
  15. System.out.println(queue.remove());
  16. System.out.println(queue.remove());
  17. System.out.println(queue.remove());
  18. // 再次移除会报错,Nosuchemenet
  19. // System.out.println(queue.remove());
  20. }
  21. /**
  22. * 阻塞队列测试2--特殊值
  23. */
  24. @Test
  25. public void blockingQueueTest2() {
  26. // 创建数组类型的阻塞队列
  27. BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
  28. System.out.println(queue.offer("a"));
  29. System.out.println(queue.offer("b"));
  30. System.out.println(queue.offer("c"));
  31. // 数组类型的阻塞队列,检查元素时,默认是第一个元素
  32. System.out.println(queue.peek());
  33. // 再次添加返回false, 添加成功返回true
  34. System.out.println(queue.offer("d"));
  35. System.out.println(queue.poll());
  36. System.out.println(queue.poll());
  37. System.out.println(queue.poll());
  38. // 再次移除会返回null
  39. System.out.println(queue.poll());
  40. }
  41. /**
  42. * 阻塞队列测试3--阻塞
  43. */
  44. @Test
  45. public void blockingQueueTest3() throws InterruptedException {
  46. // 创建数组类型的阻塞队列
  47. BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
  48. queue.put("a");
  49. queue.put("b");
  50. queue.put("c");
  51. // 再放程序会阻塞,直到队列空出一个位置。
  52. // queue.put("d");
  53. System.out.println("检测程序是否执行完");
  54. System.out.println(queue.take());
  55. System.out.println(queue.take());
  56. System.out.println(queue.take());
  57. // 再取程序会阻塞,直到队列里有值,获得值
  58. System.out.println(queue.take());
  59. System.out.println("检测程序是否执行完");
  60. }
  61. /**
  62. * 阻塞队列测试4--超时
  63. */
  64. @Test
  65. public void blockingQueueTest4() throws InterruptedException {
  66. // 创建数组类型的阻塞队列
  67. BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
  68. System.out.println(queue.offer("a", 10, TimeUnit.MILLISECONDS));
  69. System.out.println(queue.offer("b", 10, TimeUnit.MILLISECONDS));
  70. System.out.println(queue.offer("c", 10, TimeUnit.MILLISECONDS));
  71. // 再次添加,如果添加不进去,并且超时时间到了,会放弃添加
  72. System.out.println(queue.offer("d", 10, TimeUnit.MILLISECONDS));
  73. System.out.println("检测程序是否执行完");
  74. System.out.println(queue.poll(10, TimeUnit.MILLISECONDS));
  75. System.out.println(queue.poll(10, TimeUnit.MILLISECONDS));
  76. System.out.println(queue.poll(10, TimeUnit.MILLISECONDS));
  77. // 再次移除,如果没有移除的值,并且超时时间到了,会放弃移除,并返回null
  78. System.out.println(queue.poll(10, TimeUnit.MILLISECONDS));
  79. System.out.println("检测程序是否执行完");
  80. }

image.png

Phaser

  1. public class TestPhaser {
  2. static Random r = new Random();
  3. static MarriagePhaser phaser = new MarriagePhaser();
  4. public static void main(String[] args) throws InterruptedException {
  5. phaser.bulkRegister(7);
  6. for (int i = 0; i < 5; i++) {
  7. new Person("person" + i).start();
  8. }
  9. new Person("新郎").start();
  10. new Person("新娘").start();
  11. }
  12. static class Person extends Thread {
  13. String name;
  14. public Person(String name) {
  15. this.name = name;
  16. }
  17. public void arrive() throws InterruptedException {
  18. Thread.sleep(r.nextInt(1000));
  19. System.out.printf("%s 到达现场!\n", name);
  20. phaser.arriveAndAwaitAdvance();
  21. }
  22. public void eat() throws InterruptedException {
  23. Thread.sleep(r.nextInt(1000));
  24. System.out.printf("%s 吃完!\n", name);
  25. phaser.arriveAndAwaitAdvance();
  26. }
  27. public void leave() throws InterruptedException {
  28. if (name.equals("新郎") || name.equals("新娘")) {
  29. phaser.arriveAndAwaitAdvance();
  30. } else {
  31. Thread.sleep(r.nextInt(1000));
  32. System.out.printf("%s 离开!\n", name);
  33. phaser.arriveAndAwaitAdvance();
  34. }
  35. }
  36. private void hug() throws InterruptedException {
  37. if (name.equals("新郎") || name.equals("新娘")) {
  38. Thread.sleep(r.nextInt(1000));
  39. System.out.printf("%s 抱抱!\n", name);
  40. phaser.arriveAndDeregister();
  41. } else {
  42. phaser.arriveAndDeregister();
  43. }
  44. }
  45. @Override
  46. public void run() {
  47. try {
  48. arrive();
  49. eat();
  50. leave();
  51. hug();
  52. } catch (InterruptedException e) {
  53. e.printStackTrace();
  54. }
  55. }
  56. }
  57. }
  58. class MarriagePhaser extends Phaser {
  59. @Override
  60. protected boolean onAdvance(int phase, int registeredParties) {
  61. switch (phase) {
  62. case 0:
  63. System.out.println("所有人到齐");
  64. System.out.println("----------");
  65. return false;
  66. case 1:
  67. System.out.println("所有人吃完");
  68. System.out.println("----------");
  69. return false;
  70. case 2:
  71. System.out.println("所有人离开");
  72. System.out.println("----------");
  73. return false;
  74. case 3:
  75. System.out.println("婚礼结束");
  76. System.out.println("----------");
  77. return true;
  78. default:
  79. return true;
  80. }
  81. }
  82. }

phaser.getPhase() 初始值为0,如果全部线程到达集合点这个Phase+1,如果phaser.getPhase()达到Integer的最大值,这重新清空为0,在这里表示第几次集合了
phaser.arriveAndDeregister(); 表示这个线程到达集合点,就离开这个团体
phaser.arriveAndAwaitAdvance(); 表示这个线程在到某个达集合点,在等待其他线程
phaser.bulkRegister(friendNum); 表示这个线程在某个集合点遇到了friendNum个线程,他们要加入这个团体。
上述代码的意思是一共有7个人,5个person加上新郎新娘一共7个。当phaser.getPhase()为0时代码往下执行,每执行一个线程则加1,加到7后重新置为0,就可以继续向下执行了

Exchanger

  1. public static void testExchanger() {
  2. Exchanger<String> exchanger = new Exchanger<>();
  3. new Thread(() -> {
  4. String name = "Tom";
  5. try {
  6. name = exchanger.exchange(name);
  7. } catch (InterruptedException e) {
  8. e.printStackTrace();
  9. }
  10. System.out.println(Thread.currentThread().getName() + " " + name);
  11. }, "t1").start();
  12. new Thread(() -> {
  13. String name = "Jack";
  14. try {
  15. name = exchanger.exchange(name);
  16. } catch (InterruptedException e) {
  17. e.printStackTrace();
  18. }
  19. System.out.println(Thread.currentThread().getName() + " " + name);
  20. }, "t2").start();
  21. }

输出结果:
t1 Jack
t2 Tom
从上面的程序可以看出,线程t1和t2内部的变量name发生了交换。Exchanger可用于两个线程之间交换数据,也是线程通信的一种方式。

LockSupport

  1. public static void testLockSupport() throws InterruptedException {
  2. Thread t1 = new Thread(() -> {
  3. System.out.println("t1 start and park...");
  4. LockSupport.park();
  5. System.out.println("t1 continue and end...");
  6. }, "t1");
  7. t1.start();
  8. Thread.sleep(3000);
  9. System.out.println("3 second later, unpark t1");
  10. LockSupport.unpark(t1);
  11. }

LockSupport相比于wait()和notify()更具有灵活性,notify()是随机唤醒一个正在等待的线程,而LockSupport可以唤醒一个特定的线程。

  • 其实park/unpark的设计原理核心是“许可”:park是等待一个许可,unpark是为某线程提供一个许可。
    如果某线程A调用park,那么除非另外一个线程调用unpark(A)给A一个许可,否则线程A将阻塞在park操作上。

线程的run()方法是由java虚拟机直接调用的,如果我们没有启动线程(没有调用线程的start()方法)而是在应用代码中直接调用run()方法,那么这个线程的run()方法其实运行在当前线程(即run()方法的调用方所在的线程)之中,而不是运行在其自身的线程中,从而违背了创建线程的初衷;
1.直接调用 run 是在主线程中执行了 run,没有启动新的线程(t1)
2.使用 start 是启动新的线程(t1),通过新的线程(t1)间接执行 run 中的代码

线程中的join()方法

演示示例:

  1. public class Main {
  2. public static void main(String[] args) throws InterruptedException {
  3. Counter counter = new Counter();
  4. Thread tA = new Thread(new Runnable() {
  5. @Override
  6. public void run() {
  7. counter.printA();
  8. }
  9. });
  10. Thread tB = new Thread(new Runnable() {
  11. @Override
  12. public void run() {
  13. counter.printB();
  14. }
  15. });
  16. Thread tC = new Thread(new Runnable() {
  17. @Override
  18. public void run() {
  19. counter.printC();
  20. }
  21. });
  22. tA.start();
  23. tB.start();
  24. tC.start();
  25. }
  26. static class Counter {
  27. public void printA() {
  28. try {
  29. Thread.currentThread().sleep(500);
  30. } catch (InterruptedException e) {
  31. e.printStackTrace();
  32. }
  33. for (int i = 0; i < 5; i++) {
  34. System.out.println("A");
  35. }
  36. }
  37. public void printB() {
  38. try {
  39. Thread.currentThread().sleep(500);
  40. } catch (InterruptedException e) {
  41. e.printStackTrace();
  42. }
  43. for (int i = 0; i < 5; i++) {
  44. System.out.println("B");
  45. }
  46. }
  47. public void printC() {
  48. try {
  49. Thread.currentThread().sleep(500);
  50. } catch (InterruptedException e) {
  51. e.printStackTrace();
  52. }
  53. for (int i = 0; i < 5; i++) {
  54. System.out.println("C");
  55. }
  56. }
  57. }
  58. }

join()方法的源码

  1. public final synchronized void join(long millis)
  2. throws InterruptedException {
  3. long base = System.currentTimeMillis();
  4. long now = 0;
  5. if (millis < 0) {
  6. throw new IllegalArgumentException("timeout value is negative");
  7. }
  8. if (millis == 0) {
  9. while (isAlive()) {
  10. wait(0);
  11. }
  12. } else {
  13. while (isAlive()) {
  14. long delay = millis - now;
  15. if (delay <= 0) {
  16. break;
  17. }
  18. wait(delay);
  19. now = System.currentTimeMillis() - base;
  20. }
  21. }
  22. }

因此在tA.join()当中的wait(0)方法是让main线程陷入了无尽的等待中。正是因为如此,在tA.join()之前的代码都会正常从上往下执行,而在tA.join()之后的代码都随着main线程陷入等待而无法继续执行。这样便达到了网上说的 “t.join()方法会使所有线程都暂停并等待t的执行完毕后再执行”。