一,使用
@Testpublic void test3()throws InterruptedException{final Semaphore semaphore = new Semaphore(2,true);new Thread(()->{try {semaphore.acquire();System.out.println("线程A获取通行证成功!");TimeUnit.SECONDS.sleep(10);}catch (Exception e){}finally {semaphore.release();}}).start();TimeUnit.MICROSECONDS.sleep(200);new Thread(()->{try {semaphore.acquire();System.out.println("线程B获取通行证成功!");TimeUnit.SECONDS.sleep(10);}catch (Exception e){}finally {semaphore.release();}}).start();TimeUnit.MILLISECONDS.sleep(200);new Thread(() ->{try {semaphore.acquire();System.out.println("线程C获取通行证成功!");} catch (InterruptedException e) {}finally {semaphore.release();}}).start();while (true){}}
public class Pool {/** 可同时访问资源的最大线程数*/private static final int MAX_AVAILABLE = 100;/** 信号量 表示:可获取的对象通行证*/private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);/** 共享资源,可以想象成 items 数组内存储的都是Connection对象 模拟是链接池*/protected Object[] items = new Object[MAX_AVAILABLE];/** 共享资源占用情况,与items数组一一对应,比如:items[0]对象被外部线程占用,那么 used[0] == true,否则used[0] == false*/protected boolean[] used = new boolean[MAX_AVAILABLE];/*** 获取一个空闲对象* 如果当前池中无空闲对象,则等待..直到有空闲对象为止*/public Object getItem() throws InterruptedException {available.acquire();return getNextAvailableItem();}/*** 归还对象到池中*/public void putItem(Object x) {if (markAsUnused(x))available.release();}/*** 获取池内一个空闲对象,获取成功则返回Object,失败返回Null* 成功后将对应的 used[i] = true*/private synchronized Object getNextAvailableItem() {for (int i = 0; i < MAX_AVAILABLE; ++i) {if (!used[i]) {used[i] = true;return items[i];}}return null;}/*** 归还对象到池中,归还成功返回true* 归还失败:* 1.池中不存在该对象引用,返回false* 2.池中存在该对象引用,但该对象目前状态为空闲状态,也返回false*/private synchronized boolean markAsUnused(Object item) {for (int i = 0; i < MAX_AVAILABLE; ++i) {if (item == items[i]) {if (used[i]) {used[i] = false;return true;} elsereturn false;}}return false;}}
二,源码
1.构造器
//默认是非公平锁public Semaphore(int permits) {sync = new NonfairSync(permits);}//可以指定为公平锁的构造器public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);}//=======================//Sync(int permits) {setState(permits);}
2.acquire
public void acquire() throws InterruptedException {//去抢占锁,默认传1,如果想要传其他参数,可以手动指定sync.acquireSharedInterruptibly(1);}
3.AQS.acquireSharedInterruptibly
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {//如果当前线程已经被中断 抛出中断异常if (Thread.interrupted())throw new InterruptedException();//小于0 的情况有两种://锁没有被持有 或者 持有锁的线程不是当前线程//当前剩下的证书不足以支持当前线程本次获取if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);}
4.Semaphore.FairSync.tryAcquireShared
protected int tryAcquireShared(int acquires) {for (;;) {// 当前队列还有元素,头节点的下一个节点不是空节点&&当前线程的节点不是头节点的下一个节点if (hasQueuedPredecessors())//返回-1return -1;//获取当前最新的state值int available = getState();//用state-传入的值int remaining = available - acquires;//条件1成立 :当前剩下的证书不足以支持当前线程获取//条件2成立:前置条件:当前剩下的证书足够支持当前线程持有。//如果cas去设置新的state成功,返回if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}
5.AQS.hasQueuedPredecessors
判断当前AQS阻塞队列里面是否有等待的线程
public final boolean hasQueuedPredecessors() {Node t = tail; // Read fields in reverse initialization orderNode h = head;Node s;/*1.h!=t:头节点不等于尾结点,说明当前队列还有元素2.头节点的下一个节点是空 || 头节点的下一个节点不是空节点&&当前线程的节点不是头节点的下一个节点总结一下:如果返回true:当前队列还有元素,头节点的下一个节点不是空节点&&当前线程的节点不是头节点的下一个节点*/return h != t &&((s = h.next) == null || s.thread != Thread.currentThread());}
6.AQS.doAcquireSharedInterruptibly
private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {//将当前线程构建成一个共享节点入队final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {//获取当前节点的前置节点final Node p = node.predecessor();//如果前置节点是头节点if (p == head) {//尝试去获取锁int r = tryAcquireShared(arg);//大于0说明成功拿到了锁if (r >= 0) {//设置头节点并向后传播唤醒setHeadAndPropagate(node, r);//原头节点出队p.next = null; // help GCfailed = false;return;}}//如果当前节点的前驱节点不是头节点,给当前线程找一个好爸爸if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}
7.AQS.setHeadAndPropagate
private void setHeadAndPropagate(Node node, int propagate) {Node h = head; // Record old head for check below//设置node为头节点setHead(node);if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {Node s = node.next;//如果当前节点的下一个节点为空 || 当前节点的下一个节点不为空且当前节点的下一个节点是共享节点if (s == null || s.isShared())//执行向后唤醒逻辑doReleaseShared();}}
8.release
public void release() {sync.releaseShared(1);}
9.releaseShared
public final boolean releaseShared(int arg) {//如果尝试释放锁成功if (tryReleaseShared(arg)) {//执行向后唤醒逻辑doReleaseShared();return true;}return false;}
10.tryReleaseShared
protected final boolean tryReleaseShared(int releases) {for (;;) {//获取当前最新的stateint current = getState();//将当前的state 和释放的许可证个数相加int next = current + releases;if (next < current) // overflowthrow new Error("Maximum permit count exceeded");//cas成功 返回trueif (compareAndSetState(current, next))return true;}}
11.doReleaseShared
private void doReleaseShared() {for (;;) {//获取头节点Node h = head;//头节点不为空 && 头节点不是尾结点if (h != null && h != tail) {int ws = h.waitStatus;//如果头结点的等待状态 是 -1if (ws == Node.SIGNAL) {//如果cas 设置头节点 -1 , 0 失败//为啥会失败?其他线程获取到锁以后执行向后唤醒逻辑了if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue;// 唤醒节点的线程unparkSuccessor(h);}//如果等待状态 ==0 且cas 设置 头节点的等待状态为向后传播失败else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; // loop on failed CAS}//如果头节点没变 也就是说 ,唤醒的后面的节点 还没来得及将自己设置为头节点 ,跳出循环 。//为什么可以直接跳出?不怕向后唤醒中断么? 不怕 ,首先 ,极端情况已经都判断完了if (h == head) // loop if head changedbreak;}}

