1. 在上一篇文章中我们手写了AQS和利用AQS实现了ReentrantLockReentrantReadWriteLock的,今天我们再来用自己的AQSjdkAQS来继续实现它的衍生物:计数器CountDownLatch、信号量Semaphone、回旋栅栏CyclicBarrier(不是AQS实现)。

计数器CountDownLatch

  利用它可以实现类似计数器的功能。比如有一个任务A,它要等待其他任务执行完毕之后才能执行,或者说当其他任务执行到一定数量后才执行,此时就可以利用CountDownLatch来实现这种功能了。<br />      计数器实际上就是一把共享锁,每个线程(读线程)执行完后将锁的readCount-1。直到减少到我们认为合理的值(一般是0),而这个减少的方法就是我们countDown()的释放锁的方法,每次释放锁都会-1,但此时并不会真正的释放整个读锁,释放的只是单个读线程的锁。所以此时await()方法实际上就是另外一个写线程去抢锁,只有等到整个读线程释放锁,另一个写线程(我们测试的是main线程)才能拿到锁,才会打断阻塞。<br />

基于JDK的AQS实现

/**
 * @author heian
 * @create 2020-03-07-9:08 下午
 * @description 计数器就是一把共享锁,只有当线程执行之后-1,直至0
 */
public class MyCountDownLatch {

    private Sync sync;

    public MyCountDownLatch(int count){
        if (count<0){
            throw new IllegalArgumentException("参数传入有误");
        }
        this.sync= new Sync(count);
    }

    //阻塞 只有当readCount == 0,释放共享锁,类比ReentrantLock的lock拿到锁
    public void await() {
        sync.acquireShared(1);
    }
    //countDown 就是释放锁,并把state-1
    public void countDown() {
        sync.releaseShared(1);
    }

    class Sync extends AbstractQueuedSynchronizer{
        //开始存值
        public Sync(int count){
            setState(count);
        }
        //await 就是抢占共享锁,当state变为0的时候就表示锁释放,就会去唤醒main或者抢锁线程抢锁
        @Override
        protected int tryAcquireShared(int arg) {
            return getState()<=0 ? 1:-1;
        }

        //countDown 释放锁就是-1,直至减少到0,才会释放共享锁
        @Override
        protected boolean tryReleaseShared(int arg) {
            //System.out.println("打印队列长度" + getQueueLength());
            for (;;){
                int newNum = getState() - arg;
                if (compareAndSetState(getState(),newNum)){
                    return newNum <= 0;
                }
            }
        }
    }

}

基于我们的AQS实现

/**
 * @author heian
 * @create 2020-03-07-10:53 下午
 * @description
 */
public class MyCountDownLatch2 {

    private Sync sync;

    public MyCountDownLatch2(int count){
        if (count<0){
            throw new IllegalArgumentException("参数传入有误");
        }
        this.sync = new Sync(count);
    }


    class Sync extends MyAQS{
        public Sync(int count){
            readCount.compareAndSet(readCount.get(),count);
        }

        // await readCount<=0 >0 表示别的线程抢锁成功  <0 失败
        @Override
        public int tryLockShared(int acquires) {
            return readCount.get() <=0 ? 1:-1;
        }
        //countDown别的线程抢锁,只有当readCount ==0 才会抢成功
        @Override
        public boolean tryUnlockShared(int releases) {
            for (;;){
               int newNum = readCount.get() - releases;
               if (readCount.compareAndSet(readCount.get(),newNum)){
                   return readCount.get()<=0;
               }
            }
        }

    }

    public void await()  {
        //我的AQS这里arg 默认是1  就没传参输
        this.sync.lockShared();
    }

    public void countDown() {
        //我的AQS这里arg 默认是1  就没传参输
        this.sync.unlockShared();
    }

}

测试代码

   public static void main(String[] args) {
        MyCountDownLatch countDownLatch = new MyCountDownLatch(5);
        IntStream.range(0,5).forEach(value -> {
            new Thread(() -> {
                LockSupport.parkNanos(1000000000*1L);
                System.out.println(Thread.currentThread().getName() + "结束");
                countDownLatch.countDown();
            }).start();
        });
        countDownLatch.await();
        System.out.println("main开始");
    }
Thread-2结束
Thread-3结束
Thread-1结束
Thread-4结束
Thread-0结束
main开始
 ![屏幕快照 2020-03-08 下午12.55.07.png](https://cdn.nlark.com/yuque/0/2020/png/771792/1583643327855-5e3087be-dd3a-4969-bc71-fd10466311a3.png#align=left&display=inline&height=327&name=%E5%B1%8F%E5%B9%95%E5%BF%AB%E7%85%A7%202020-03-08%20%E4%B8%8B%E5%8D%8812.55.07.png&originHeight=654&originWidth=1190&size=99376&status=done&style=none&width=595)<br />      需要说明的就是我们这里实现AQS的两个tryLockShared/tryUnlockShared 方法是<=0。它的语义就是说:因为我们这里仅仅开辟了5个读线程去调用了释放锁的方法,而释放锁的方法会调用tryUnlockShared方法,而我们设置好的readCount数量是5,只有当它一直减少至0或者<0(读线程数量大于5)的时候才会唤醒在队列中的写线程,我们这里是main线程,才会打断await的阻塞,唤醒它重新调用tryLockShared方法,而此时由于readCount已经<=0了,这时候会抢锁成功,于是乎便打断阻塞,便可以执行main线程了。(读线程不会放入队列)

信号量Semaphone

  Semaphore字面意思为信号量,Semaphore可以控同时访问的线程个数,以此来达到限流的作用。<br />      其主要有两个常用的方法 acquire() 和release(),前者的语义是获得一个许可,其实说白了就是去抢锁,抢到了则readCount+1,并从队列中移除,没抢则该线程挂起,只有当别的读线程执行完调用了release()方法,被挂起的线程才有机会去抢锁。<br />![屏幕快照 2020-03-08 下午2.27.30.png](https://cdn.nlark.com/yuque/0/2020/png/771792/1583648877304-7a3254e1-4a7c-4c2d-8b64-96b0f3cb9ea0.png#align=left&display=inline&height=300&name=%E5%B1%8F%E5%B9%95%E5%BF%AB%E7%85%A7%202020-03-08%20%E4%B8%8B%E5%8D%882.27.30.png&originHeight=600&originWidth=1046&size=79213&status=done&style=none&width=523)

基于JDK的AQS实现

/**
 * @author heian
 * @create 2020-03-07-9:08 下午
 * @description 信号量 进来就拿锁,拿到了就+1,释放了就-1,共享锁的线程只允许最大连接permitCount个线程
 */
public class MySemaphone {

    private Sync sync;
    private int permitCount;
    public MySemaphone(int count){
        if (count<0){
            throw new IllegalArgumentException("参数传入有误");
        }
        this.sync= new Sync();
        this.permitCount = count;
    }

    public void acquire() {
        sync.acquireShared(1);
    }
    public void release() {
        sync.releaseShared(1);
    }

    class Sync extends AbstractQueuedSynchronizer{

        @Override
        protected int tryAcquireShared(int arg) {
            for (;;){
                int currentReadCount = getState() + arg;
                if (currentReadCount<=permitCount){
                    if (compareAndSetState(getState(),currentReadCount)){
                        return 1;
                    }
                }else {
                    return -1;
                }
            }
        }

        @Override
        protected boolean tryReleaseShared(int arg) {
            //System.out.println("打印队列长度" + getQueueLength());
            for (;;){
                int remainNum = getState() - arg;
                return compareAndSetState(getState(),remainNum);
            }
        }
    }
}

测试代码

 public static void main(String[] args)  {
        MySemaphone semaphore = new MySemaphone(3);
        IntStream.range(0,5).forEach(value -> {
            new Thread(() -> {
                semaphore.acquire();
                System.out.println(Thread.currentThread().getName() + "    获得许可");
                LockSupport.parkNanos(1000000000*2L);
                System.out.println(Thread.currentThread().getName() + "释放许可");
                semaphore.release();
            }).start();
        });
    }
Thread-0    获得许可
Thread-2    获得许可
Thread-1    获得许可
Thread-0释放许可
Thread-1释放许可
Thread-2释放许可
Thread-4    获得许可
Thread-3    获得许可
Thread-3释放许可
Thread-4释放许可

回旋栅栏CyclicBarrier

  回环栅栏:让一组线程等待至某个状态之后再全部同时执行,叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。叫做栅栏,描述所有线程被栅栏挡住了,当都达到时,一起跳过栅栏执行,我们可以把这个状态就叫做barrier。<br />      CyclicBarrier并不是ASQ实现的,因为AQS内部只有一个队列,而这个队列可以用来装所有进来的线程,但是我们还需要一个队列用来装正在运行的线程。而我们每次调用await方法都会去做一个自增操作,当自增操作count=许可数目,我们便启动这个栅栏,然后重置count,在来装下一轮。<br />     可以用我们生活的例子举例,比如我们排队等公交,一辆公车只允许装在3个人,而此时 你有7个人,则会一开始装走3个下一辆在装走3个,但是最后只剩下一个因为不满足3个人一车的条件只能被阻塞在外面不允许运行该线程。<br />      实现回旋栅栏的方式一般有两种,一个是基于ReentranLock,另外也可以基于syncronized。这里我已syncronized+notifyAll写出实现代码。这里代码演示就是每过一秒增加一个线程运行,只有当线程数达到3方可运行。

基于syncronized+notifyAll实现

/**
 * @author heian
 * @create 2020-03-08-2:34 下午
 * @description 循环栅栏 实现原理
 */
public class MyCyclicBarrier {

    private int initCount = 0;
    private int permitCount;
    private Object generation = new Object();

    public MyCyclicBarrier(int permitCount){
        this.permitCount = permitCount;
    }

    public void await() throws InterruptedException {
        synchronized (this){
            initCount++;
            Object currentG = generation;
            if (initCount == permitCount){
                //进入下一次计数
                nextGeneration();
            }else {
                //防止伪唤醒
                for (;;){
                    this.wait();//释放锁
                    //被更改了,则直接跳出阻塞
                    if (currentG != generation){
                        break;
                    }
                }
            }
        }
    }

    public void nextGeneration(){
        synchronized (this){
            initCount =0;
            generation = new Object();
            notifyAll();
        }
    }

}

测试代码

  public static void main(String[] args) {
        MyCyclicBarrier cyclicBarrier = new MyCyclicBarrier(3);
        IntStream.range(0,7).forEach(value -> {
            new Thread(() -> {
                try {
                    cyclicBarrier.await ();
                    System.out.println (Thread.currentThread ().getName () + "一起跳出栅栏");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
            LockSupport.parkNanos(1000000000*1L);
        });
    }
//少一个线程未跳出
Thread-1一起跳出栅栏
Thread-0一起跳出栅栏
Thread-2一起跳出栅栏
Thread-5一起跳出栅栏
Thread-3一起跳出栅栏
Thread-4一起跳出栅栏