本文首先介绍 AQS 的数据结构,然后展开聊聊各同步工具的两种实现:AQS实现、Redis实现。
本文不会介绍同步工具的应用,想了解应用参考 交替打印 FooBar。
下面先大概看一眼 java 并发框架的构成:
1 AQS
使用 CLH队列 + state 实现一个抽象同步器,子类实现 Sync 就能做出公平、读写、独占、条件等功能。
state 整型,在不同的 Sync 实现中有不同的语义
head 指向 CLH队头,表示占有资源的线程
tail 指向 CLH队尾,表示最新的等待线程
条件队列 AQS 可以维护多个条件队列,以实现多个 Condition
exclusiveOwnerThread 记录占用 head 的线程
1.1 CLH Node
prev
next
waitStatus
thread
nextWaiter
1.2 CAS
Unsafe#compareAndSwap 都是 native 方法。修改 state,如果 state 等于期望值,才可以更新该值。
有两种处理机制:
- 缓存锁定。在一个处理器的缓存中操作数据,并使其它处理器中的缓存失效
- 总线锁定。处理器通过总线交流,锁定总线让处理器中间对一个数据的操作串行
ABA 是指在 CAS 更新过程中,线程1 读到的值是A,此时线程2把 A 改为 B 后又改回 A,线程1 回过神来发现值依然是A。ABA 在大部分场景下都不影响并发的最终效果。
Java 提供的 AtomicStampedReference 要求传入 expectedStamp、newStamp,更新时不光检查值,还要检查当前 stamp 是否等于预期 stamp,全部相等的话才会更新。
2 ReentrantLock
ReentrantLock 使用 state 实现重入,默认非公平。
ReentrantReadWriteLock 持有1个读锁,1个写锁,默认同一个 NonfairSync 实现。
StampLock
实现雪花算法的自增序列,可以使用 ReentrantLock 保证序列自增的原子性。
2.1 AQS 实现
2.1.1 NonfairSync
实现非公平锁:
- 尝试加锁(CAS 修改 state)
- 尝试加锁,失败则把表示当前线程的 Node 入队 CLH
- 如果节点的 prev 是 head,自旋加锁,失败则修改 prev 的 waitStatus 为 SIGNAL 后阻塞
- 当 head 释放锁,唤醒下一个节点,让其执行步骤3
2.1.2 FairSync
实现公平锁,和非公平的区别:
- 步骤1 不尝试加锁,所以并发低的场景下效率不如非公平实现
- 步骤2 会先检查CLH队列,如果前面有节点就不加锁,直接入队 CLH
2.1.3 读写锁
ReentrantReadWriteLock 持有1个读锁,1个写锁,默认使用 NonfairSync。
写锁,使其它线程将无法获取互斥锁、共享锁,状态记录在 state 低16位。实现参考 #2.2。
读锁,使所有线程都无法获取互斥锁,状态记录在 state 高16位。使用 AQS 为共享节点设计的方法,比如 acquireShared 等。
其中 setHeadAndPropagate 检查 next Node,如果共享(根据 nextWaiter 区分),则唤醒它。
这个被唤醒的 Node 也会唤醒 next(如果共享),从而形成一个传播唤醒。2.1.4 Condition 实现
AQS 内部类 ConditionObject 维护条件队列,实现线程间的同步。
- 调用 await(),新增一个节点入队条件队列
- 当前线程释放全部锁,并阻塞
调用 signal(),从条件队列 head 寻找一个 Node 入队 CLH(只入队不唤醒)
2.2 分布式实现
3 CountDownLatch
闭锁,允许N个线程一直等待,直到其他线程执行的操作全部完成。
数据库脱敏,每批次查询 1000 条,提交给线程池,线程在脱敏完成后 countDown,然后继续。
主线程 await 阻塞,等待脱敏完 1000 条记录后返回,并批量更新数据库。3.1 AQS 实现
主线程构造 CountDownLatch,设置 AQS state 为 count
- 主线程调用 await(),当 state > 0 时进入 AQS 同步队列
子线程调用 countDown() 原子递减 state,当 state == 0 时唤醒所有调用await()方法阻塞的线程
3.2 Redis 实现
setCount() EXISTS key,如果不存在则 SET count
countDown() DECR key,如果返回值 <= 0,则 DEL key
await() 在 while 循环中 GET key,如果返回值 == 0,则结束循环4 CyclicBarrier
栅栏,让一组线程互相等待,直到所有线程都到达一个同步点。Cylic 表示可以循环利用。
构造器 CyclicBarrier(int parties, Runnable barrierAction) parties 预期到达栅栏的线程数;barrierAction 所有线程到达栅栏后执行的回调。4.1 AQS 实现
主线程构造 CylicBarrier,设置 AQS state 为 parties
- 子线程调用 await(),获取排斥锁,递减 state,进入同步队列,释放锁
当 state == 0 时,唤醒其中一个线程执行构造器中设置的回调,并重置 state 为 parties,循环利用
5 Semaphore
信号量,控制资源可被同时访问的线程个数。适合使用在限流场景。
构造器 Semaphore(int permits, boolean fair) permits 表示许可个数;fair 表示公平竞争,默认 false。5.1 AQS 实现
主线程构造 Semaphore,设置 AQS state 为 permits
- 主线程调用 acquire(N),用 state - N,如果小于0,则进入同步队列,大于0则通过CAS设置当前信号量为剩余值,同时返回剩余值
- 子线程调用 release(N),用 state + N,同时不停的尝试因为调用acquire()进入阻塞的线程
5.2 Redis 实现
setPermits() GET key,如果返回值 == 0,则 SET key
acquire() GET key,如果返回值 > 0,则 INCRBY permits
release() INCRBY permits6 Atomic
雪花算法自增序列,使用 AtomicLong 保证序列原子性。6.1 CAS 实现
6.2 Redis 实现
6.3 按序打印
使用2个 AtomicBoolean 实现按序打印。参考文献