1.使用
public static void main(String[] args) throws InterruptedException {final Semaphore semaphore = new Semaphore(2, true);Thread tA = new Thread(() ->{try {semaphore.acquire();System.out.println("线程A获取通行证成功");TimeUnit.SECONDS.sleep(10);} catch (InterruptedException e) {}finally {semaphore.release();}});tA.start();//确保线程A已经执行TimeUnit.MILLISECONDS.sleep(200);Thread tB = new Thread(() ->{try {semaphore.acquire(2);System.out.println("线程B获取通行证成功");} catch (InterruptedException e) {}finally {semaphore.release(2);}});tB.start();//确保线程B已经执行TimeUnit.MILLISECONDS.sleep(200);Thread tC = new Thread(() ->{try {semaphore.acquire();System.out.println("线程C获取通行证成功");} catch (InterruptedException e) {}finally {semaphore.release();}});tC.start();}
package facetest.javase.juc;import java.util.concurrent.Semaphore;/*** @Created by 勺子* @Description pool* @Date 2022/4/6 10:42*/public class Pool {/** 可同时访问资源的最大线程数*/private static final int MAX_AVAILABLE = 100;/** 信号量 表示:可获取的对象通行证*/private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);/** 共享资源,可以想象成 items 数组内存储的都是Connection对象 模拟是链接池*/protected Object[] items = new Object[MAX_AVAILABLE];/** 共享资源占用情况,与items数组一一对应,比如:items[0]对象被外部线程占用,那么 used[0] == true,否则used[0] == false*/protected boolean[] used = new boolean[MAX_AVAILABLE];/*** 获取一个空闲对象* 如果当前池中无空闲对象,则等待..直到有空闲对象为止*/public Object getItem() throws InterruptedException {available.acquire();return getNextAvailableItem();}/*** 归还对象到池中*/public void putItem(Object x) {if (markAsUnused(x))available.release();}/*** 获取池内一个空闲对象,获取成功则返回Object,失败返回Null* 成功后将对应的 used[i] = true*/private synchronized Object getNextAvailableItem() {for (int i = 0; i < MAX_AVAILABLE; ++i) {if (!used[i]) {used[i] = true;return items[i];}}return null;}/*** 归还对象到池中,归还成功返回true* 归还失败:* 1.池中不存在该对象引用,返回false* 2.池中存在该对象引用,但该对象目前状态为空闲状态,也返回false*/private synchronized boolean markAsUnused(Object item) {for (int i = 0; i < MAX_AVAILABLE; ++i) {if (item == items[i]) {if (used[i]) {used[i] = false;return true;} elsereturn false;}}return false;}}
2.源码
Semaphore内部有一个抽象的静态内部类sync,跟CountDownLatch一样,sync继承了AQS,因此Semaphore也有公平的方式,默认是非公平方式
无论公平锁还是非公平锁都是将传递的通行证赋值给AQS.state值
//默认是非公平锁public Semaphore(int permits) {sync = new NonfairSync(permits);}//可以传递参数指定为公平锁public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);}NonfairSync(int permits) {super(permits);}FairSync(int permits) {super(permits);}
1. acquire()
public void acquire() throws InterruptedException {//调用AQS的方法sync.acquireSharedInterruptibly(1);}
2.AQS.acquireSharedInterruptibly
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {//如果线程已经被中断,直接抛出异常if (Thread.interrupted())throw new InterruptedException();//调用FairSync.tryAcquireShared方法//小于0的情况有两种://1 非占用锁的线程调用了acquire方法 阻塞队列有等待者线程//2 当前剩下的通行证数不够当前线程本次获取//什么时候大于0 ? 获取通行证成功,返回剩余的通行证数if (tryAcquireShared(arg) < 0)//条件成立:走阻塞线程逻辑doAcquireSharedInterruptibly(arg);}
3.FairSync.tryAcquireShared
尝试获取锁,获取成功返回剩余的通行证数;获取失败返回小于0的数
//尝试获取通行证,获取成功返回 >=0的值;获取失败 返回<0的值protected int tryAcquireShared(int acquires) {for (;;) {//判断当前AQS阻塞队列是否有 等待者线程,如果有直接返回 -1 ,表示当前acquire操作的线程需要进入队列等待...if (hasQueuedPredecessors())return -1;//执行到这里,哪几种情况?//1.调用acquire时,AQS阻塞队列内没有其他等待者//2.当前节点 在阻塞队列中是head.next节点//获取state,state这里表示 通行证int available = getState();//remaining 表示当前线程 获取通行证之后,semaphore还剩余数量int remaining = available - acquires;//条件一:remaining < 0成立,说明线程获取通行证失败//条件二:前置条件,remaining大于等于0,cas更新state成功,说明线程获取通行证成功,cas失败则自旋if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}
4.AQS.doAcquireSharedInterruptibly
阻塞线程的逻辑
- 将线程封装为node几点入阻塞队列
自旋给自己找个好爸爸,然后阻塞,被唤醒后判断是否是head.next节点,是的话设置为头节点,逐次唤醒后继节点
//AQS的doAcquireSharedInterruptibly方法private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {//将调用semaphore.acquire方法的线程封装为node 添加到AQS的阻塞队列中final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {//获取当前node的前驱节点final Node p = node.predecessor();//条件成立 说明当前node节点是head.next节点,有权利获取 共享锁if (p == head) {int r = tryAcquireShared(arg);//站在semaphore角度,r大于0 说明当前node获取到锁了if (r >= 0) {//设置为头节点并唤醒后继节点setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}//给当前线程找到好爸爸,将好爸爸的状态设置为singal -1 ,返回true//parkAndCheckInterrupt 挂起node对应的线程if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {//阻塞过程中被中断唤醒之后,会抛出中断异常,然后将node节点设置为取消状态,走清除节点逻辑if (failed)cancelAcquire(node);}}
5.AQS.setHeadAndPropagate
//AQS的方法 设置当前node为 head节点,并向后传播(依次唤醒!)private void setHeadAndPropagate(Node node, int propagate) {Node h = head;//将当前节点设置为head节点setHead(node);//propagate 是剩余的通行证数if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {//获取当前节点的后继节点Node s = node.next;//条件一:s == null 成立 当前node节点已经是tail节点了,条件一成立 doReleaseShared会处理这种情况//条件二:前置条件 s!=null 那么要求s节点必须是 共享模式if (s == null || s.isShared())//基本上所有情况都会执行到这里doReleaseShared();}}
6.release
每调用一次,恢复一个通行证
public void release() {//调用AQS的方法sync.releaseShared(1);}
7.AQS.releaseShared
public final boolean releaseShared(int arg) {//条件成立:表示当前线程释放资源成功,释放资源成功后,去唤醒获取资源失败的线程...if (tryReleaseShared(arg)) {//唤醒获取资源失败的线程doReleaseShared();return true;}return false;}
8.Sync.tryReleaseShared
恢复锁资源通常成功
protected final boolean tryReleaseShared(int releases) {//自旋,恢复当前通行证数for (;;) {int current = getState();int next = current + releases;if (next < current) // overflowthrow new Error("Maximum permit count exceeded");//通常cas锁资源修改成功if (compareAndSetState(current, next))return true;}}
9.AQS.doReleaseShared
//AQS的doReleaseShared方法//有哪几种情况会调用当前方法?//1.semaphore.release释放锁资源成功 然后调用doReleaseShared方法去唤醒head.next对应的线程//2.被唤醒的线程在doAcquireSharedInterruptibly方法中调用setHeadAndPropagate方法,然后会调用doReleaseShared该方法private void doReleaseShared() {for (;;) {//获取当前AQS的head节点Node h = head;//条件一 h != null 成立 说明阻塞队列不为空//条件二 h != tail 成立说明当前阻塞队列不只有head一个节点//h ==tail 表示head和tail指向同一个node节点,什么时候会出现?//1.正常唤醒情况,唤醒最后一个node节点时候,head是等于tail的//2.第一个调用await的线程在准备addWaiter入队时给head节点擦屁股后还没有把自己放进阻塞队列时候 这时候与countdown方法发生并发了if (h != null && h != tail) {//执行到这里 说明当前head一定有后继节点int ws = h.waitStatus;if (ws == Node.SIGNAL) {//将当前node状态改为0//为什么用cas 多个线程唤醒head.next节点时候, 可能会失败//案例:t3线程在if (h == head) 返回false时 t3不会退出循环,会继续自旋 参与到唤醒下一个head.next逻辑//t3此时执行cas 成功..t4(head节点线程)在t3修改成功之前,也进入到这里代码块,t4会compareAndSetWaitStatus 修改失败,因为t3改过了if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue;//唤醒head的后继节点unparkSuccessor(h);}else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;}//条件成立//1. 说明刚被unaprk唤醒的后继节点还没有执行到setHeadAndPropagate方法里面的 设置当前后继node为head节点逻辑//2. h==null//3. h==tail head==tail指向一个node对象//条件不成立//被唤醒的节点 很积极 直接将自己设置为head节点, 此时 唤醒它的节点(前驱节点) 执行h==head不成立//此时head节点的前驱节点 不会跳出doReleaseShared方法,会继续唤醒 新head节点的后继节点..if (h == head) // loop if head changedbreak;}}


