在上一篇文章中我们手写了AQS和利用AQS实现了ReentrantLock和ReentrantReadWriteLock的,今天我们再来用自己的AQS和jdk的AQS来继续实现它的衍生物:计数器CountDownLatch、信号量Semaphone、回旋栅栏CyclicBarrier(不是AQS实现)。
计数器CountDownLatch
利用它可以实现类似计数器的功能。比如有一个任务A,它要等待其他任务执行完毕之后才能执行,或者说当其他任务执行到一定数量后才执行,此时就可以利用CountDownLatch来实现这种功能了。<br /> 计数器实际上就是一把共享锁,每个线程(读线程)执行完后将锁的readCount-1。直到减少到我们认为合理的值(一般是0),而这个减少的方法就是我们countDown()的释放锁的方法,每次释放锁都会-1,但此时并不会真正的释放整个读锁,释放的只是单个读线程的锁。所以此时await()方法实际上就是另外一个写线程去抢锁,只有等到整个读线程释放锁,另一个写线程(我们测试的是main线程)才能拿到锁,才会打断阻塞。<br />
基于JDK的AQS实现
/**
* @author heian
* @create 2020-03-07-9:08 下午
* @description 计数器就是一把共享锁,只有当线程执行之后-1,直至0
*/
public class MyCountDownLatch {
private Sync sync;
public MyCountDownLatch(int count){
if (count<0){
throw new IllegalArgumentException("参数传入有误");
}
this.sync= new Sync(count);
}
//阻塞 只有当readCount == 0,释放共享锁,类比ReentrantLock的lock拿到锁
public void await() {
sync.acquireShared(1);
}
//countDown 就是释放锁,并把state-1
public void countDown() {
sync.releaseShared(1);
}
class Sync extends AbstractQueuedSynchronizer{
//开始存值
public Sync(int count){
setState(count);
}
//await 就是抢占共享锁,当state变为0的时候就表示锁释放,就会去唤醒main或者抢锁线程抢锁
@Override
protected int tryAcquireShared(int arg) {
return getState()<=0 ? 1:-1;
}
//countDown 释放锁就是-1,直至减少到0,才会释放共享锁
@Override
protected boolean tryReleaseShared(int arg) {
//System.out.println("打印队列长度" + getQueueLength());
for (;;){
int newNum = getState() - arg;
if (compareAndSetState(getState(),newNum)){
return newNum <= 0;
}
}
}
}
}
基于我们的AQS实现
/**
* @author heian
* @create 2020-03-07-10:53 下午
* @description
*/
public class MyCountDownLatch2 {
private Sync sync;
public MyCountDownLatch2(int count){
if (count<0){
throw new IllegalArgumentException("参数传入有误");
}
this.sync = new Sync(count);
}
class Sync extends MyAQS{
public Sync(int count){
readCount.compareAndSet(readCount.get(),count);
}
// await readCount<=0 >0 表示别的线程抢锁成功 <0 失败
@Override
public int tryLockShared(int acquires) {
return readCount.get() <=0 ? 1:-1;
}
//countDown别的线程抢锁,只有当readCount ==0 才会抢成功
@Override
public boolean tryUnlockShared(int releases) {
for (;;){
int newNum = readCount.get() - releases;
if (readCount.compareAndSet(readCount.get(),newNum)){
return readCount.get()<=0;
}
}
}
}
public void await() {
//我的AQS这里arg 默认是1 就没传参输
this.sync.lockShared();
}
public void countDown() {
//我的AQS这里arg 默认是1 就没传参输
this.sync.unlockShared();
}
}
测试代码
public static void main(String[] args) {
MyCountDownLatch countDownLatch = new MyCountDownLatch(5);
IntStream.range(0,5).forEach(value -> {
new Thread(() -> {
LockSupport.parkNanos(1000000000*1L);
System.out.println(Thread.currentThread().getName() + "结束");
countDownLatch.countDown();
}).start();
});
countDownLatch.await();
System.out.println("main开始");
}
Thread-2结束
Thread-3结束
Thread-1结束
Thread-4结束
Thread-0结束
main开始
<br /> 需要说明的就是我们这里实现AQS的两个tryLockShared/tryUnlockShared 方法是<=0。它的语义就是说:因为我们这里仅仅开辟了5个读线程去调用了释放锁的方法,而释放锁的方法会调用tryUnlockShared方法,而我们设置好的readCount数量是5,只有当它一直减少至0或者<0(读线程数量大于5)的时候才会唤醒在队列中的写线程,我们这里是main线程,才会打断await的阻塞,唤醒它重新调用tryLockShared方法,而此时由于readCount已经<=0了,这时候会抢锁成功,于是乎便打断阻塞,便可以执行main线程了。(读线程不会放入队列)
信号量Semaphone
Semaphore字面意思为信号量,Semaphore可以控同时访问的线程个数,以此来达到限流的作用。<br /> 其主要有两个常用的方法 acquire() 和release(),前者的语义是获得一个许可,其实说白了就是去抢锁,抢到了则readCount+1,并从队列中移除,没抢则该线程挂起,只有当别的读线程执行完调用了release()方法,被挂起的线程才有机会去抢锁。<br />
基于JDK的AQS实现
/**
* @author heian
* @create 2020-03-07-9:08 下午
* @description 信号量 进来就拿锁,拿到了就+1,释放了就-1,共享锁的线程只允许最大连接permitCount个线程
*/
public class MySemaphone {
private Sync sync;
private int permitCount;
public MySemaphone(int count){
if (count<0){
throw new IllegalArgumentException("参数传入有误");
}
this.sync= new Sync();
this.permitCount = count;
}
public void acquire() {
sync.acquireShared(1);
}
public void release() {
sync.releaseShared(1);
}
class Sync extends AbstractQueuedSynchronizer{
@Override
protected int tryAcquireShared(int arg) {
for (;;){
int currentReadCount = getState() + arg;
if (currentReadCount<=permitCount){
if (compareAndSetState(getState(),currentReadCount)){
return 1;
}
}else {
return -1;
}
}
}
@Override
protected boolean tryReleaseShared(int arg) {
//System.out.println("打印队列长度" + getQueueLength());
for (;;){
int remainNum = getState() - arg;
return compareAndSetState(getState(),remainNum);
}
}
}
}
测试代码
public static void main(String[] args) {
MySemaphone semaphore = new MySemaphone(3);
IntStream.range(0,5).forEach(value -> {
new Thread(() -> {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " 获得许可");
LockSupport.parkNanos(1000000000*2L);
System.out.println(Thread.currentThread().getName() + "释放许可");
semaphore.release();
}).start();
});
}
Thread-0 获得许可
Thread-2 获得许可
Thread-1 获得许可
Thread-0释放许可
Thread-1释放许可
Thread-2释放许可
Thread-4 获得许可
Thread-3 获得许可
Thread-3释放许可
Thread-4释放许可
回旋栅栏CyclicBarrier
回环栅栏:让一组线程等待至某个状态之后再全部同时执行,叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。叫做栅栏,描述所有线程被栅栏挡住了,当都达到时,一起跳过栅栏执行,我们可以把这个状态就叫做barrier。<br /> CyclicBarrier并不是ASQ实现的,因为AQS内部只有一个队列,而这个队列可以用来装所有进来的线程,但是我们还需要一个队列用来装正在运行的线程。而我们每次调用await方法都会去做一个自增操作,当自增操作count=许可数目,我们便启动这个栅栏,然后重置count,在来装下一轮。<br /> 可以用我们生活的例子举例,比如我们排队等公交,一辆公车只允许装在3个人,而此时 你有7个人,则会一开始装走3个下一辆在装走3个,但是最后只剩下一个因为不满足3个人一车的条件只能被阻塞在外面不允许运行该线程。<br /> 实现回旋栅栏的方式一般有两种,一个是基于ReentranLock,另外也可以基于syncronized。这里我已syncronized+notifyAll写出实现代码。这里代码演示就是每过一秒增加一个线程运行,只有当线程数达到3方可运行。
基于syncronized+notifyAll实现
/**
* @author heian
* @create 2020-03-08-2:34 下午
* @description 循环栅栏 实现原理
*/
public class MyCyclicBarrier {
private int initCount = 0;
private int permitCount;
private Object generation = new Object();
public MyCyclicBarrier(int permitCount){
this.permitCount = permitCount;
}
public void await() throws InterruptedException {
synchronized (this){
initCount++;
Object currentG = generation;
if (initCount == permitCount){
//进入下一次计数
nextGeneration();
}else {
//防止伪唤醒
for (;;){
this.wait();//释放锁
//被更改了,则直接跳出阻塞
if (currentG != generation){
break;
}
}
}
}
}
public void nextGeneration(){
synchronized (this){
initCount =0;
generation = new Object();
notifyAll();
}
}
}
测试代码
public static void main(String[] args) {
MyCyclicBarrier cyclicBarrier = new MyCyclicBarrier(3);
IntStream.range(0,7).forEach(value -> {
new Thread(() -> {
try {
cyclicBarrier.await ();
System.out.println (Thread.currentThread ().getName () + "一起跳出栅栏");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
LockSupport.parkNanos(1000000000*1L);
});
}
//少一个线程未跳出
Thread-1一起跳出栅栏
Thread-0一起跳出栅栏
Thread-2一起跳出栅栏
Thread-5一起跳出栅栏
Thread-3一起跳出栅栏
Thread-4一起跳出栅栏