作者

J.U.C 的作者是 Doug Lea

纲要

AQS与J.U.C - 图1

AQS

全称 AbstractQueueSynchronizer ,是阻塞式锁和相关的同步器工具的框架;
Lock 的底层就是 AQS
特点:

  1. 用 state 属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和释放锁
    • getState 获取 state 状态
    • setState 设置 state 状态
    • compareAndSetState 通过 CAS 机制设置 state 状态
  2. 提供了基于 FIFO 的等待队列,类似于 Monitor 的 EntryList
  3. 条件变量来实现等待、唤醒机制,支持多个条件变量,类似 Monitor 的 WaitSet

子类主要实现的方法

  • tryAcquire
  • tryRelease
  • tryAcquireShared
  • tryReleaseShared
  • isHeldExclusively

    获取锁的姿势

    1. if (!tryAcquire(arg)){
    2. // 入队,可以选择阻塞当前线程 park unpark
    3. }

    释放锁的姿势

    1. if (tryRelease){
    2. // 让阻塞线程恢复运行
    3. }

    读写锁

    ReentrantReadWriteLock

    当读操作远远高于写操作时,这时候使用读写锁让读-读可以并发,提高性能。
    类似于数据库中 select … from … lock in share mode
    提供了一个数据容器类内部分别使用读锁保护数据的 read() 方法,写锁保护数据的 write() 方法; ```java package top.simba1949;

import java.util.concurrent.locks.*;

/**

  • @author Anthony
  • @date 2020/11/1 9:00 */ public class App1 { public static void main(String[] args) throws InterruptedException {

    1. DataContainer dataContainer = new DataContainer();
    2. new Thread(() -> dataContainer.read(), "t1").start();
    3. Thread.sleep(10);
    4. new Thread(() -> dataContainer.read(), "t2").start();
    5. new Thread(() -> dataContainer.write(), "t1").start();
    6. Thread.sleep(10);
    7. new Thread(() -> dataContainer.write(), "t2").start();

    } }

class DataContainer{ private Object data;

  1. private ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
  2. // 读锁
  3. private ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock();
  4. // 写锁
  5. private ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock();
  6. public Object read(){
  7. // 加读锁
  8. readLock.lock();
  9. try {
  10. Thread.sleep(5000);
  11. System.out.println(" read:" + Thread.currentThread().getName() + ">" + System.currentTimeMillis());
  12. return data;
  13. } catch (InterruptedException e) {
  14. e.printStackTrace();
  15. return null;
  16. } finally {
  17. // 释放读锁
  18. readLock.unlock();
  19. }
  20. }
  21. public void write(){
  22. // 加写锁
  23. writeLock.lock();
  24. try {
  25. Thread.sleep(5000);
  26. System.out.println("write:" + Thread.currentThread().getName() + ">" + System.currentTimeMillis());
  27. } catch (InterruptedException e) {
  28. e.printStackTrace();
  29. } finally {
  30. // 释放写锁
  31. writeLock.unlock();
  32. }
  33. }

}

  1. <a name="n55lY"></a>
  2. ## StampedLock
  3. 该类自 JDK8 加入,是为了进一步优化读性能,它的特点是在使用读锁、写锁时都必须配合【戳】使用<br />缺点:
  4. 1. StampedLock 不支持条件变量
  5. 1. StampedLock 不支持可重入
  6. 加解读锁
  7. ```java
  8. StampedLock stampedLock = new StampedLock();
  9. stampedLock.unlockRead(stamp);

加解写锁

  1. StampedLock stampedLock = new StampedLock();
  2. stampedLock.unlockWrite(stamp);

乐观读,StampedLock 支持 tryOptimisticRead() 方法(乐观读),读取完毕后需要再做一次戳校验,如果校验成功,表示这期间确实没有写操作,数据可以安全使用,如果校验没有通过,需要重新获取读锁,保证数据安全

  1. StampedLock stampedLock = new StampedLock();
  2. long stamp = stampedLock.tryOptimisticRead();
  3. if (!stampedLock.validate(stamp)){
  4. // 锁升级
  5. }

demo

提供一个数据容器内部分别使用读锁保护数据的read方法,写锁保护数据write方法

  1. public class DataContainerStamped {
  2. private int data;
  3. private final StampedLock stampedLock = new StampedLock();
  4. public DataContainerStamped(int data) {
  5. this.data = data;
  6. }
  7. public int read(int readTime){
  8. long stamp = stampedLock.tryOptimisticRead();
  9. // 如果校验通过直接返回数据
  10. if (stampedLock.validate(stamp)){
  11. return data;
  12. }
  13. // 校验不通过,说明数据已经被更新,乐观读锁升级为读锁
  14. try {
  15. stamp = stampedLock.readLock();
  16. return data;
  17. } finally {
  18. stampedLock.unlockRead(stamp);
  19. }
  20. }
  21. public void write(int newData){
  22. long stamp = stampedLock.writeLock();
  23. try {
  24. this.data = newData;
  25. } finally {
  26. stampedLock.unlockWrite(stamp);
  27. }
  28. }
  29. }

Semaphore

信号量,用来限制能同时访问共享资源的线程上限,其本质上是一个“共享锁”;
实现原理:内部维护了一个共享锁,Semaphore 初始化的时候相当于初始化共享锁的个数,当 acquire 的时候共享锁的个数减一,当为零的时候不能获取到共享锁。当 release 的时候共享锁的个数增加一。同时也支持公平锁,在构造函数初始化的时候可以指定。

  1. package top.simba1949;
  2. import java.util.concurrent.Semaphore;
  3. /**
  4. * @author Anthony
  5. * @date 2020/11/1 9:00
  6. */
  7. public class App1 {
  8. public static void main(String[] args) {
  9. Semaphore semaphore = new Semaphore(3);
  10. for (int i = 0; i < 10; i++) {
  11. new Thread(() -> {
  12. try {
  13. // 获取次信号量
  14. semaphore.acquire();
  15. System.out.println(Thread.currentThread().getName());
  16. Thread.sleep(2000);
  17. } catch (InterruptedException e) {
  18. e.printStackTrace();
  19. } finally {
  20. // 释放信号量
  21. semaphore.release();
  22. }
  23. }, "t" + i).start();
  24. }
  25. }
  26. }

CountDownLatch

用于进行线程同步写作,等待所有线程完成倒计时。
其中构造方法用来初始化等待计数值,await() 用来等待计数归零,countDown() 用来让计数减一;
实现原理:内部通过共享锁实现,CountDownLatch 在创建的时候需要指定初始化参数,该参数就是共享锁的总次数。当某个线程调用 awit 的时候,会判断是否还持有共享锁,一直会等到共享锁的状态为0为止;当其他线程调用 countDown() 方法时,则执行释放共享锁状态,共享锁的次数减1;注意CountDownLatch不能回滚重置;

  1. package top.simba1949;
  2. import java.util.concurrent.CountDownLatch;
  3. /**
  4. * @author Anthony
  5. * @date 2020/11/1 9:00
  6. */
  7. public class App1 {
  8. public static void main(String[] args) throws InterruptedException {
  9. CountDownLatch countDownLatch = new CountDownLatch(10);
  10. for (int i = 0; i < 10; i++) {
  11. new Thread(() -> {
  12. System.out.println(System.currentTimeMillis());
  13. countDownLatch.countDown();
  14. }, "t" + i).start();
  15. }
  16. countDownLatch.await();
  17. System.out.println("ending");
  18. }
  19. }

CyclicBarrier

CyclicBarrier 的作用就是会让所有线程都等待完成后才会继续下一步行动。
可以用于多线程计算数据,最后合并计算结果的场景。
实现原理:内部是使用可重入锁 ReentrantLock 和 Condition,ReentrantLock 控制每个线程的锁的状态,Condition 对线程进行等待、唤醒机制,当线程发生中断户这话完成等待时,对线程进行唤醒。

  1. // parties 是参与线程的个数
  2. // 第二个构造方法有一个 Runnable 参数,这个参数的意思是最后一个到达线程要做的任务
  3. public CyclicBarrier(int parties);
  4. public CyclicBarrier(int parties, Runnable barrierAction);
  5. // 等待所有的线程都到达指定的临界点
  6. cyclicBarrier.await();
  7. // 获取当前有多少个线程阻塞等待在临界点上
  8. cyclicBarrier.getNumberWaiting();
  9. // 用于查询阻塞等待的线程是否被中断
  10. cyclicBarrier.isBroken();
  11. // 将屏障重置为初始状态,如果当有线程正在临界点等待的话,将抛出异常
  12. cyclicBarrier.reset();

CyclicBarrierc 和 CountDownLatch区别

  • CountDownLatch 是一次性的,CyclicBarrier 是可循环利用的
  • CountDownLatch 参与的线程的职责是不一样的,有的在倒计时,有的在等待倒计时结束。CyclicBarrier 参与的线程职责是一样的。

    线程安全的集合类

    线程安全的集合类可以分为三类

  • 遗留的线程安全集合,如:Hashtable,Vector

  • 使用Collections装饰的线程安全集合,使用装饰模式,本质上还是使用 synchronized 修饰,例如:
    • Collections.synchronizedCollection()
    • Collections.synchronizedList()
    • Collections.synchronizedMap()
    • Collections.synchronizedSet()
    • Collections.synchronizedNavigableMap()
    • Collections.synchronizedNavigableSet()
    • Collections.synchronizedSortedMap()
    • Collections.synchronizedSortedSet()
  • j.u.c 包下的集合类
    • Blocking 大部分实现基于锁,并提供用来阻塞的方法;
    • CopyOnWrite 之类的容器修改开销相对较重,适用于读多写少的操作;
    • Concurrent 类型的容器;
      • 内部很多操作使用 cas 优化,一般可以提供较高的吞吐量
      • 弱一致性
        • 遍历时弱一致性,例如,当利用迭代器遍历时,如果容器发生了修改,迭代器仍可以继续遍历,但是遍历的值为旧值;
        • 求大小弱一致性,size操作未必是100%准确;
        • 读取弱一致性;

          Exchanger

          允许在并发任务之间交换数据。具体来说,Exchanger 类允许在两个线程之间定义同步点。当两个线程都到达同步点时,他们交换数据结构,因此第一个线程的数据结构进入到第二个线程中,第二线程的数据结构进入第一个线程中。