1. ReentrantLock

2. CountDownLatch

  1. import java.util.concurrent.CountDownLatch;
  2. public class TestCountDownLatch {
  3. public static void main(String[] args) {
  4. CountDownLatch countDownLatch = new CountDownLatch(100);
  5. Thread[] threads = new Thread[100];
  6. for (int i = 0; i<100; i++){
  7. threads[i] = new Thread(()->{
  8. try {
  9. Thread.sleep(5000L);
  10. } catch (InterruptedException e) {
  11. e.printStackTrace();
  12. }
  13. countDownLatch.countDown();
  14. });
  15. }
  16. for (Thread thread : threads) {
  17. thread.start();
  18. }
  19. new Thread(()->{
  20. try {
  21. countDownLatch.await();
  22. } catch (InterruptedException e) {
  23. e.printStackTrace();
  24. }
  25. System.out.println("全部线程已执行完");
  26. }).start();
  27. }
  28. }

3. CyclicBarrier

  1. import java.util.concurrent.BrokenBarrierException;
  2. import java.util.concurrent.CyclicBarrier;
  3. public class TestCyclicBarrier {
  4. public static void main(String[] args) {
  5. CyclicBarrier cyclicBarrier = new CyclicBarrier(20);
  6. for (int i = 0; i<20; i++){
  7. int finalI = i;
  8. new Thread(()->{
  9. try {
  10. System.out.println("线程"+ finalI +"已开始运行");
  11. cyclicBarrier.await();
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. } catch (BrokenBarrierException e) {
  15. e.printStackTrace();
  16. }
  17. System.out.println("线程"+ finalI +"已运行结束");
  18. }).start();
  19. }
  20. }
  21. }

CyclicBarrier可以用于多线程计算数据,最后合并计算结果的应用场景。比如现在需要计算10个人12个月内的工资详细,可以将线程分为10个,分别计算每个人的工资,最后,再用barrierAction将这些线程的计算结果进行整合,得出最后结果。

4. Condition

5. Phaser

https://blog.csdn.net/u010739551/article/details/51083004

  1. /**
  2. * 下面说说Phaser的高级用法,在Phaser内有2个重要状态,分别是phase和party。
  3. * phase就是阶段,初值为0,当所有的线程执行完本轮任务,同时开始下一轮任务时,
  4. * 意味着当前阶段已结束,进入到下一阶段,phase的值自动加1。party就是线程,
  5. * party=4就意味着Phaser对象当前管理着4个线程。Phaser还有一个重要的方法经常需要被重载,
  6. * 那就是boolean onAdvance(int phase, int registeredParties)方法。此方法有2个作用:
  7. * 1、当每一个阶段执行完毕,此方法会被自动调用,因此,重载此方法写入的代码会在每个阶段执行完毕时执行,
  8. * 相当于CyclicBarrier的barrierAction。
  9. * 2、当此方法返回true时,意味着Phaser被终止,因此可以巧妙的设置此方法的返回值来终止所有线程。
  10. */
  11. public class MyPhaser extends Phaser {
  12. @Override
  13. protected boolean onAdvance(int phase, int registeredParties) {
  14. switch (phase){
  15. case 0:
  16. System.out.println(getRegisteredParties()+"个线程到达,开始考试!");
  17. return false;
  18. case 1:
  19. System.out.println("第1题所有线程做完!");
  20. return false;
  21. case 2:
  22. System.out.println("第2题所有线程做完!");
  23. return false;
  24. case 3:
  25. System.out.println("第3题所有线程做完!");
  26. return false;
  27. default:
  28. System.out.println("所有题做完, 考试结束!");
  29. return true;
  30. }
  31. }
  32. /**
  33. * 题目:5个学生参加考试,一共有三道题,要求所有学生到齐才能开始考试
  34. * ,全部做完第一题,才能继续做第二题,后面类似。
  35. *
  36. * Phaser有phase和party两个重要状态,
  37. * phase表示阶段,party表示每个阶段的线程个数,
  38. * 只有每个线程都执行了phaser.arriveAndAwaitAdvance();
  39. * 才会进入下一个阶段,否则阻塞等待。
  40. * 例如题目中5个学生(线程)都条用phaser.arriveAndAwaitAdvance();就进入下一题
  41. * @param args
  42. */
  43. public static void main(String[] args) {
  44. MyPhaser myPhaser = new MyPhaser();
  45. for (int i = 0; i < 5; i++ ){
  46. new Thread(()->{
  47. myPhaser.register();
  48. System.out.println(Thread.currentThread().getName()+"已到达,准备考试");
  49. myPhaser.arriveAndAwaitAdvance();
  50. System.out.println(Thread.currentThread().getName()+"做第1题完成...");
  51. myPhaser.arriveAndAwaitAdvance();
  52. System.out.println(Thread.currentThread().getName()+"做第2题完成...");
  53. myPhaser.arriveAndAwaitAdvance();
  54. System.out.println(Thread.currentThread().getName()+"做第3题完成...");
  55. myPhaser.arriveAndAwaitAdvance();
  56. },"线程"+i).start();
  57. }
  58. }
  59. }

运行结果:

线程0已到达,准备考试 线程3已到达,准备考试 线程2已到达,准备考试 线程1已到达,准备考试 线程4已到达,准备考试 5个线程到达,开始考试! 线程4做第1题完成… 线程2做第1题完成… 线程3做第1题完成… 线程0做第1题完成… 线程1做第1题完成… 第1题所有线程做完! 线程3做第2题完成… 线程0做第2题完成… 线程4做第2题完成… 线程1做第2题完成… 线程2做第2题完成… 第2题所有线程做完! 线程2做第3题完成… 线程1做第3题完成… 线程0做第3题完成… 线程3做第3题完成… 线程4做第3题完成… 第3题所有线程做完!

6. ReadWriteLock

  1. public class T10_TestReadWriteLock {
  2. //写操作的值
  3. static int i;
  4. /**
  5. * 模拟读操作
  6. * @param lock
  7. */
  8. public static void read(Lock lock) {
  9. try {
  10. lock.lock(); //先获得锁
  11. Thread.sleep(1000); //睡眠1秒钟模拟读操作的执行时间
  12. System.out.println("read over!");
  13. } catch (InterruptedException e) {
  14. e.printStackTrace();
  15. } finally {
  16. lock.unlock(); //释放锁
  17. }
  18. }
  19. /**
  20. * 模拟写操作
  21. * @param lock
  22. * @param v
  23. */
  24. public static void write(Lock lock, int v) {
  25. try {
  26. lock.lock(); //先获得锁
  27. Thread.sleep(1000); //睡眠1秒钟模拟写操作的执行时间
  28. i = v;
  29. System.out.println("write over!");
  30. } catch (InterruptedException e) {
  31. e.printStackTrace();
  32. } finally {
  33. lock.unlock(); //释放锁
  34. }
  35. }
  36. /**
  37. * 测试ReentrantLock
  38. * @throws InterruptedException
  39. */
  40. public static void testReentrantLock() throws InterruptedException {
  41. Lock lock = new ReentrantLock();
  42. CountDownLatch countDownLatch = new CountDownLatch(20);
  43. System.out.println("测试ReentrantLock计时开始");
  44. long start = System.currentTimeMillis();
  45. //18个线程读操作
  46. for(int i=0; i<18; i++){
  47. new Thread(()->{
  48. read(lock);
  49. countDownLatch.countDown();
  50. }).start();
  51. }
  52. //2个线程写操作
  53. for(int i=0; i<2; i++){
  54. new Thread(()->{
  55. write(lock,new Random().nextInt());
  56. countDownLatch.countDown();
  57. }).start();
  58. }
  59. countDownLatch.await();
  60. long end = System.currentTimeMillis();
  61. System.out.println("测试ReentrantLock计时结束,总共消耗时间:"+(end-start)/1000+"秒\n");
  62. }
  63. /**
  64. * 测试ReadWriteLock
  65. * @throws InterruptedException
  66. */
  67. public static void testReadWriteLock() throws InterruptedException {
  68. //创建2把互相配合的读写锁
  69. ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
  70. Lock readLock = readWriteLock.readLock(); //读锁
  71. Lock writeLock = readWriteLock.writeLock(); //写锁
  72. CountDownLatch countDownLatch = new CountDownLatch(20);
  73. System.out.println("测试ReadWriteLock计时开始");
  74. long start = System.currentTimeMillis();
  75. //18个线程读操作
  76. for( int i = 0 ; i < 18 ; i++ ){
  77. new Thread(()->{
  78. read(readLock);
  79. countDownLatch.countDown();
  80. }).start();
  81. }
  82. //2个线程写操作
  83. for( int i = 0 ; i < 2 ; i++){
  84. new Thread(()->{
  85. write(writeLock,new Random().nextInt());
  86. countDownLatch.countDown();
  87. }).start();
  88. }
  89. countDownLatch.await();
  90. long end = System.currentTimeMillis();
  91. System.out.println("测试ReadWriteLock计时结束,总共消耗时间:"+(end-start)/1000+"秒");
  92. }
  93. public static void main(String[] args) throws InterruptedException {
  94. //测试ReentrantLock
  95. testReentrantLock();
  96. //测试ReadWriteLock
  97. testReadWriteLock();
  98. }
  99. }

运行结果:

测试ReadWriteLock计时开始 read over! read over! read over! read over! read over! read over! read over! read over! read over! read over! read over! read over! read over! read over! read over!

read over!

read over!

read over!

write over!

write over!

测试ReadWriteLock计时结束,总共消耗时间:3秒

测试ReentrantLock计时开始

read over!

read over!

read over!

read over!

read over!

read over!

read over!

read over!

read over!

read over!

read over!

read over!

read over!

read over!

read over!

read over!

read over!

read over!

write over!

write over!

测试ReentrantLock计时结束,总共消耗时间:20秒

7. Semaphore

Semaphore是一个线程同步的辅助类,可以维护当前访问自身的线程个数,并提供了同步机制。使用Semaphore可以控制同时访问资源的线程个数。
Semaphore的主要方法摘要:

void acquire() 从此信号量获取一个许可,在提供一个许可前一直将线程阻塞,否则线程被中断。
void release() 释放一个许可,将其返回给信号量。
int availablePermits() 返回此信号量中当前可用的许可数。
boolean hasQueuedThreads() 查询是否有线程正在等待获取。
  1. import java.util.concurrent.Semaphore;
  2. /**
  3. * 模拟多线程去操作资源.
  4. * 假设有10个线程需要访问同一个文件资源, 由于文件资源做大能读取的线程是3,所以同时只能有3个线程访问这个文件. 我们使用Semaphore来控制
  5. */
  6. public class T11_TestSemaphore {
  7. //3个信号灯,只允许3个线程同时执行
  8. static Semaphore semaphore = new Semaphore(3);
  9. public static void main(String[] args) {
  10. for ( int i = 1; i <= 10; i++ ){
  11. int j = i;
  12. new Thread(()->{
  13. try {
  14. System.out.println("第"+ j +"个线程来了.." );
  15. semaphore.acquire();
  16. System.out.println("第"+ j +"个线程拿到了许可证,开始访问");
  17. Thread.sleep(10000L);
  18. System.out.println("第"+ j +"个线程访问完毕");
  19. } catch (InterruptedException e) {
  20. e.printStackTrace();
  21. } finally {
  22. System.out.println("第"+ j +"个线程释放了许可证");
  23. semaphore.release();
  24. }
  25. }).start();
  26. }
  27. }
  28. }

运行结果:

第1个线程来了..

第4个线程来了..

第1个线程拿到了许可证,开始访问

第3个线程来了..

第3个线程拿到了许可证,开始访问

第2个线程来了..

第7个线程来了..

第6个线程来了..

第4个线程拿到了许可证,开始访问

第5个线程来了..

第8个线程来了..

第9个线程来了..

第10个线程来了..

第1个线程访问完毕

第1个线程释放了许可证

第3个线程访问完毕

第3个线程释放了许可证

第2个线程拿到了许可证,开始访问

第7个线程拿到了许可证,开始访问

第4个线程访问完毕

第4个线程释放了许可证

第6个线程拿到了许可证,开始访问

第2个线程访问完毕

第2个线程释放了许可证

第7个线程访问完毕

第5个线程拿到了许可证,开始访问

第7个线程释放了许可证

第8个线程拿到了许可证,开始访问

第6个线程访问完毕

第6个线程释放了许可证

第9个线程拿到了许可证,开始访问

第5个线程访问完毕

第5个线程释放了许可证

第8个线程访问完毕

第8个线程释放了许可证

第9个线程访问完毕

第9个线程释放了许可证

第10个线程拿到了许可证,开始访问

第10个线程访问完毕

第10个线程释放了许可证

8. Exchanger

  1. import java.util.concurrent.Exchanger;
  2. public class TestExchanger {
  3. static Exchanger<String> exchanger = new Exchanger<>();
  4. public static void main(String[] args) {
  5. new Thread(()->{
  6. try {
  7. String t1Str = "T1";
  8. System.out.println("t1线程的变量为 "+t1Str+", 开始交换..");
  9. t1Str = exchanger.exchange(t1Str);
  10. System.out.println("t1线程交换后的变量为"+t1Str);
  11. } catch (InterruptedException e) {
  12. e.printStackTrace();
  13. }
  14. }, "t1").start();
  15. new Thread(()->{
  16. try {
  17. String t2Str = "T2";
  18. System.out.println("t2线程的变量为 "+t2Str+", 开始交换..");
  19. t2Str = exchanger.exchange(t2Str);
  20. System.out.println("t2线程交换后的变量为"+t2Str);
  21. } catch (InterruptedException e) {
  22. e.printStackTrace();
  23. }
  24. }, "t2").start();
  25. }
  26. }

运行结果:

t1线程的变量为 T1, 开始交换..

t2线程的变量为 T2, 开始交换..

t2线程交换后的变量为T1

t1线程交换后的变量为T2

9. LockSupport

**

10. 多线程面试题

10.1 线程监测题

题目:

创建2个线程,第1个线程往集合里面装10个对象, 第二个线程检测第一个线程装满5个时打印。

wait 和 notify 配合

  1. public class WaitAndNotify {
  2. public static void main(String[] args) throws InterruptedException {
  3. //创建一个对象用来锁
  4. Object o = new Object();
  5. //元素集合
  6. ArrayList<Object> arrayList = new ArrayList<>();
  7. new Thread(()->{
  8. synchronized (o){
  9. System.out.println("线程2启动...");
  10. try {
  11. o.wait();
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. }
  15. System.out.println("线程2检测到线程1收集了5个元素");
  16. o.notify();
  17. }
  18. }).start();
  19. //睡眠1秒是为了保证线程2能先启动并等待。
  20. TimeUnit.SECONDS.sleep(1);
  21. new Thread(()->{
  22. synchronized (o){
  23. System.out.println("线程1启动...");
  24. for (int i = 0; i < 10; i++) {
  25. System.out.println("线程1收集第"+i+"个对象");
  26. arrayList.add(new Object());
  27. if ( arrayList.size() == 5 ){
  28. try {
  29. o.notify();
  30. o.wait();
  31. } catch (InterruptedException e) {
  32. e.printStackTrace();
  33. }
  34. }
  35. o.notify();
  36. }
  37. }
  38. }).start();
  39. }
  40. }

使用**LockSupport**

  1. public class WithLockSupport {
  2. static Thread t1 = null, t2 = null;
  3. public static void main(String[] args) throws InterruptedException {
  4. //创建一个对象用来锁
  5. Object o = new Object();
  6. //元素集合
  7. ArrayList<Object> arrayList = new ArrayList<>();
  8. t2 = new Thread(()->{
  9. System.out.println("线程2启动...");
  10. LockSupport.park();
  11. System.out.println("线程2检测到线程1收集了5个元素");
  12. LockSupport.unpark(t1);
  13. });
  14. t1 = new Thread(() -> {
  15. System.out.println("线程1启动...");
  16. for (int i = 0; i < 10; i++) {
  17. arrayList.add(new Object());
  18. System.out.println("线程1收集第"+i+"个对象");
  19. if (arrayList.size() == 5){
  20. LockSupport.unpark(t2);
  21. }
  22. }
  23. });
  24. t2.start();
  25. TimeUnit.SECONDS.sleep(1);
  26. t1.start();
  27. }
  28. }

使用CountDownLatch

  1. public class WithCountDownLatch {
  2. static Thread t1 = null, t2 = null;
  3. public static void main(String[] args) throws InterruptedException {
  4. //创建一个对象用来锁
  5. Object o = new Object();
  6. //元素集合
  7. ArrayList<Object> arrayList = new ArrayList<>();
  8. CountDownLatch t1Latch = new CountDownLatch(1);
  9. CountDownLatch t2Latch = new CountDownLatch(1);
  10. t2 = new Thread(()->{
  11. System.out.println("线程2启动...");
  12. try {
  13. t2Latch.await();
  14. } catch (InterruptedException e) {
  15. e.printStackTrace();
  16. }
  17. System.out.println("线程2检测到线程1收集了5个元素");
  18. t1Latch.countDown();
  19. });
  20. t1 = new Thread(() -> {
  21. System.out.println("线程1启动...");
  22. for (int i = 0; i < 10; i++) {
  23. arrayList.add(new Object());
  24. System.out.println("线程1收集第"+i+"个对象");
  25. if (arrayList.size() == 5){
  26. t2Latch.countDown();
  27. try {
  28. t1Latch.await();
  29. } catch (InterruptedException e) {
  30. e.printStackTrace();
  31. }
  32. }
  33. }
  34. });
  35. t2.start();
  36. TimeUnit.SECONDS.sleep(1);
  37. t1.start();
  38. }
  39. }

10.2 交叉打印题

有2个线程, 线程1打印数组内的元素[1,2,3,4,5,6], 线程2打印数组内元素[‘A’,’B’,’C’,’D’,’E’,’F’], 现在要求两个数组在控制台交叉打印元素, 最终打印的结果显示为:1A2B3C4D5E6F

  1. public class TestCrossPrint {
  2. //线程1需要打印的数组
  3. public static int[] intArr = new int[]{1,2,3,4,5,6};
  4. //线程2需要打印的数组
  5. public static char[] charArr = new char[]{'A','B','C','D','E','F'};
  6. //需要一个对象来当做锁
  7. static Object o = new Object();
  8. public static void main(String[] args) throws InterruptedException {
  9. Thread t1 = null, t2 = null;
  10. t1 = new Thread(()->{
  11. synchronized (o){
  12. for (int i : intArr) {
  13. System.out.print(i);
  14. try {
  15. o.notify();
  16. o.wait();
  17. } catch (InterruptedException e) {
  18. e.printStackTrace();
  19. }
  20. }
  21. }
  22. });
  23. t2 = new Thread(()->{
  24. synchronized (o){
  25. for (char c : charArr) {
  26. try {
  27. o.wait();
  28. } catch (InterruptedException e) {
  29. e.printStackTrace();
  30. }
  31. System.out.print(c);
  32. o.notify();
  33. }
  34. }
  35. });
  36. t2.start();
  37. //保证线程2比线程1线启动之心并等待.
  38. TimeUnit.SECONDS.sleep(1);
  39. t1.start();
  40. }
  41. }

10.3 生产者消费者线程

写一个固定容量的同步容器, 有 get、set、getCount三个方法,能够支持2个生产者线程生产固定容量的元素和10个消费者线程不断地阻塞调用消费。
synchronized版:

  1. import java.util.LinkedList;
  2. import java.util.List;
  3. import java.util.concurrent.TimeUnit;
  4. /**
  5. *写一个固定容量的同步容器, 有 get、set、getCount三个方法,能够支持2个生产者线程生产固定容量的元素和10个消费者线程不断地阻塞调用消费。
  6. * synchronized版
  7. */
  8. public class MyContainer<T> {
  9. //使用链表来模拟容器
  10. private final LinkedList<T> container = new LinkedList<T>();
  11. //容器没最大元素数量
  12. private static final int MAX = 10;
  13. //容器内挡墙元素数量
  14. private int count = 0;
  15. /**
  16. * 存储元素
  17. * @param t
  18. * @throws InterruptedException
  19. */
  20. public void put(T t) throws InterruptedException {
  21. synchronized (this){
  22. //为什么这里是while而不是if? 因为wait时可能会被非消费者线程线程唤醒。notifyAll?
  23. while (this.container.size() == MAX ){
  24. this.wait();
  25. }
  26. this.container.add(t); //集合内添加元素
  27. count++; //元素数量+1
  28. this.notifyAll(); //通知消费者进行消费
  29. }
  30. }
  31. /**
  32. * 从头部删除一个元素
  33. * @return
  34. * @throws InterruptedException
  35. */
  36. public T get() throws InterruptedException {
  37. synchronized (this){
  38. //为什么这里是while而不是if? 因为wait时可能会被非生产者线程线程唤醒。notifyAll?
  39. while (this.container.size() == 0){
  40. this.wait();
  41. }
  42. T t = this.container.removeFirst(); //删除头部的一个元素
  43. this.count--; //集合内元素数量-1
  44. this.notifyAll(); //通知生产者进行生产
  45. return t;
  46. }
  47. }
  48. /**
  49. * 获取元素数量。
  50. * @return
  51. */
  52. public int getCount(){
  53. return this.count;
  54. }
  55. //测试
  56. public static void main(String[] args) throws InterruptedException {
  57. MyContainer<Integer> myContainer = new MyContainer<>();
  58. //启动10个消费者进行消费
  59. for (int i = 0; i < 10; i++) {
  60. new Thread(()->{
  61. for (int j = 0; j < 5; j++) {
  62. try {
  63. System.out.println(myContainer.get());
  64. } catch (InterruptedException e) {
  65. e.printStackTrace();
  66. }
  67. }
  68. }).start();
  69. }
  70. //睡眠2秒钟,保证消费者线程先启动。
  71. TimeUnit.SECONDS.sleep(2);
  72. //启动2个生产者线程进行生产
  73. for (int i = 0; i < 2; i++) {
  74. new Thread(()->{
  75. for (int j = 0; j < 25; j++) {
  76. try {
  77. myContainer.put(j);
  78. } catch (InterruptedException e) {
  79. e.printStackTrace();
  80. }
  81. }
  82. }).start();
  83. }
  84. }
  85. }

Condition版:

  1. import java.util.LinkedList;
  2. import java.util.concurrent.TimeUnit;
  3. import java.util.concurrent.locks.Condition;
  4. import java.util.concurrent.locks.Lock;
  5. import java.util.concurrent.locks.ReentrantLock;
  6. /**
  7. *写一个固定容量的同步容器, 有 get、set、getCount三个方法,能够支持2个生产者线程生产固定容量的元素和10个消费者线程不断地阻塞调用消费。
  8. * Condition版
  9. */
  10. public class MyContainer1<T> {
  11. //使用链表来模拟容器
  12. private final LinkedList<T> container = new LinkedList<T>();
  13. //容器没最大元素数量
  14. private static final int MAX = 10;
  15. //容器内挡墙元素数量
  16. private int count = 0;
  17. //可重入锁
  18. private Lock lock = new ReentrantLock();
  19. private Condition producter = lock.newCondition(); //生产者线程队列
  20. private Condition consumer = lock.newCondition(); //消费者线程队列
  21. /**
  22. * 存储元素
  23. * @param t
  24. * @throws InterruptedException
  25. */
  26. public void put(T t) {
  27. this.lock.lock(); //锁定
  28. try {
  29. //满了就等一等,等待消费者线程消费了元素后唤醒此线程
  30. while ( this.container.size() == MAX ){
  31. this.producter.await();
  32. }
  33. this.container.add(t); //集合内添加元素
  34. this.count++; //元素数量+1
  35. this.consumer.signalAll(); //唤醒消费者线程进程消费
  36. }catch (Exception e){
  37. e.printStackTrace();
  38. }finally {
  39. this.lock.unlock();//解锁
  40. }
  41. }
  42. /**
  43. * 从头部删除一个元素
  44. * @return
  45. * @throws InterruptedException
  46. */
  47. public T get() {
  48. this.lock.lock(); //锁定
  49. T t = null;
  50. try {
  51. //没有了就等一等,等待生产者生产了之后唤醒此线程
  52. while (this.container.size() == 0){
  53. this.consumer.await();
  54. }
  55. t = this.container.removeFirst(); //删除头部的一个元素
  56. this.count--; //集合内元素数量-1
  57. this.producter.signalAll(); //唤醒生产者线程进行生产
  58. }catch (Exception e){
  59. e.printStackTrace();
  60. }finally {
  61. this.lock.unlock();//解锁
  62. return t;
  63. }
  64. }
  65. /**
  66. * 获取元素数量。
  67. * @return
  68. */
  69. public int getCount(){
  70. return this.count;
  71. }
  72. //测试
  73. public static void main(String[] args) throws InterruptedException {
  74. MyContainer1<Integer> myContainer = new MyContainer1<>();
  75. //启动10个消费者进行消费
  76. for (int i = 0; i < 10; i++) {
  77. new Thread(()->{
  78. for (int j = 0; j < 5; j++) {
  79. System.out.println(myContainer.get());
  80. }
  81. }).start();
  82. }
  83. //睡眠2秒钟,保证消费者线程先启动。
  84. TimeUnit.SECONDS.sleep(2);
  85. //启动2个生产者线程进行生产
  86. for (int i = 0; i < 2; i++) {
  87. new Thread(()->{
  88. for (int j = 0; j < 25; j++) {
  89. myContainer.put(j);
  90. }
  91. }).start();
  92. }
  93. }
  94. }

11. AQS

AQS时Java的一个抽象类,全称为(AbstractQueuedSynchronizer)抽象队列同步器,它的包路径为**java.util.concurrent.locks.AbstractQueuedSynchronizer**,是除了java自带的synchronized关键字之外的锁机制。

AQS的核心思想是:如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并将共享资源设置为锁定状态,如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用
CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。

CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列,虚拟的双向队列即不存在队列实例,仅存在节点之间的关联关系。

AQS是将每一条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node),来实现锁的分配。**

用大白话来说,AQS 的 底层实现是CLH队列+volatile修饰的共享变量state,线程通过CAS去尝试改变state,成功则获取锁,失败则进入等待队列,等待被唤醒。

**注意:AQS是自旋锁:**在等待唤醒的时候,经常会使用自旋while(!cas())的方式,不停地尝试获取锁,直到被其他线程获取成功在等待唤醒的时候,经常会使用自旋while(!cas())的方式,不停地尝试获取锁,直到被其他线程获取成功

实现了AQS的锁有:自旋锁、互斥锁、读锁写锁、条件产量、信号量、栅栏都是AQS的衍生物

AQS实现的具体方式如下:
6. java多线程工具类 - 图1

如图示,AQS维护了一个volatile int state 的变量和线程等待队列,这个队列里面装的是每一个等待的线程,多线程争用资源被阻塞的时候就会进入这个队列。state就是共享资源,其访问方式有如下三种:
getState(); : 获得状态
setState(); : 设置状态
compareAndSetState(); : CAS的方式设置状态

AQS 定义了两种资源共享方式:

  1. Exclusive独占/排他,只有一个线程能执行,如ReentrantLock类
  2. Share共享,多个线程可以同时执行,如Semaphore、CountDownLatch、ReadWriteLock,CyclicBarrier

不同的自定义的同步器争用共享资源的方式也不同。

AQS底层使用了模板方法模式

同步器的设计是基于模板方法模式的,如果需要自定义同步器一般的方式是这样(模板方法模式很经典的一个应用):

  1. 使用者继承AbstractQueuedSynchronizer并重写指定的方法。(这些重写方法很简单,无非是对于共享资源state的获取和释放)
  2. 将AQS组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。
    这和我们以往通过实现接口的方式有很大区别,这是模板方法模式很经典的一个运用。

自定义同步器在实现的时候只需要实现共享资源state的获取和释放方式即可,至于具体线程等待队列的维护,AQS已经在顶层实现好了。自定义同步器实现的时候主要实现下面几种方法:

isHeldExclusively() 该线程是否正在独占资源。只有用到condition才需要去实现它。
tryAcquire(int) 独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int) 独占方式。尝试释放资源,成功则返回true,失败则返回false。
tryAcquireShared(int) 共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int) 共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

独占锁以**ReentrantLock**为例:
state初始化为0,表示未锁定状态,A线程调用lock()时,会调用tryAcquire()独占锁并将state+1.之后其他线程再想tryAcquire的时候就会失败,直到A线程unlock() s时state-1 这时 stata=0,其他线程才有机会获取该锁。A释放锁之前,自己也是可以重复获取此锁(state累加),这就是可重入的概念。
注意:获取多少次锁就要释放多少次锁,保证state是能回到零态的。只要stats回到0状态其他线程就能尝试获取锁。

共享锁以**CountDownLatch**为例:
任务分n个子线程去执行,state就初始化为n,n个线程并行执行,每个线程执行完之后countDown() 一次时state就会compareAndSetState的方式 -1。当n子线程全部执行完毕后state=0,会unpark()主调用线程,主调用线程就会从await()函数返回,继续之后的动作。
一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire -> tryReleasetryAcquireShared -> tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如**ReentrantReadWriteLock**
 在acquire()acquireShared()两种方式下,线程在等待队列中都是忽略中断的,acquireInterruptibly()/acquireSharedInterruptibly()是支持响应中断的。

AQS的简单应用**
Mutex:不可重入互斥锁,锁资源(state)只有两种状态:0:未被锁定;1:锁定。

AQS源码:
VarHandle

  1. 普通变量可以原子性
  2. 比反射快,直接操作二进制。