一,使用
@Test
public void test3()throws InterruptedException{
final Semaphore semaphore = new Semaphore(2,true);
new Thread(()->{
try {
semaphore.acquire();
System.out.println("线程A获取通行证成功!");
TimeUnit.SECONDS.sleep(10);
}catch (Exception e){
}finally {
semaphore.release();
}
}).start();
TimeUnit.MICROSECONDS.sleep(200);
new Thread(()->{
try {
semaphore.acquire();
System.out.println("线程B获取通行证成功!");
TimeUnit.SECONDS.sleep(10);
}catch (Exception e){
}finally {
semaphore.release();
}
}).start();
TimeUnit.MILLISECONDS.sleep(200);
new Thread(() ->{
try {
semaphore.acquire();
System.out.println("线程C获取通行证成功!");
} catch (InterruptedException e) {
}finally {
semaphore.release();
}
}).start();
while (true){}
}
public class Pool {
/** 可同时访问资源的最大线程数*/
private static final int MAX_AVAILABLE = 100;
/** 信号量 表示:可获取的对象通行证*/
private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
/** 共享资源,可以想象成 items 数组内存储的都是Connection对象 模拟是链接池*/
protected Object[] items = new Object[MAX_AVAILABLE];
/** 共享资源占用情况,与items数组一一对应,比如:items[0]对象被外部线程占用,那么 used[0] == true,否则used[0] == false*/
protected boolean[] used = new boolean[MAX_AVAILABLE];
/**
* 获取一个空闲对象
* 如果当前池中无空闲对象,则等待..直到有空闲对象为止
*/
public Object getItem() throws InterruptedException {
available.acquire();
return getNextAvailableItem();
}
/**
* 归还对象到池中
*/
public void putItem(Object x) {
if (markAsUnused(x))
available.release();
}
/**
* 获取池内一个空闲对象,获取成功则返回Object,失败返回Null
* 成功后将对应的 used[i] = true
*/
private synchronized Object getNextAvailableItem() {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (!used[i]) {
used[i] = true;
return items[i];
}
}
return null;
}
/**
* 归还对象到池中,归还成功返回true
* 归还失败:
* 1.池中不存在该对象引用,返回false
* 2.池中存在该对象引用,但该对象目前状态为空闲状态,也返回false
*/
private synchronized boolean markAsUnused(Object item) {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (item == items[i]) {
if (used[i]) {
used[i] = false;
return true;
} else
return false;
}
}
return false;
}
}
二,源码
1.构造器
//默认是非公平锁
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
//可以指定为公平锁的构造器
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
//=======================//
Sync(int permits) {
setState(permits);
}
2.acquire
public void acquire() throws InterruptedException {
//去抢占锁,默认传1,如果想要传其他参数,可以手动指定
sync.acquireSharedInterruptibly(1);
}
3.AQS.acquireSharedInterruptibly
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//如果当前线程已经被中断 抛出中断异常
if (Thread.interrupted())
throw new InterruptedException();
//小于0 的情况有两种:
//锁没有被持有 或者 持有锁的线程不是当前线程
//当前剩下的证书不足以支持当前线程本次获取
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
4.Semaphore.FairSync.tryAcquireShared
protected int tryAcquireShared(int acquires) {
for (;;) {
// 当前队列还有元素,头节点的下一个节点不是空节点&&当前线程的节点不是头节点的下一个节点
if (hasQueuedPredecessors())
//返回-1
return -1;
//获取当前最新的state值
int available = getState();
//用state-传入的值
int remaining = available - acquires;
//条件1成立 :当前剩下的证书不足以支持当前线程获取
//条件2成立:前置条件:当前剩下的证书足够支持当前线程持有。
//如果cas去设置新的state成功,返回
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
5.AQS.hasQueuedPredecessors
判断当前AQS阻塞队列里面是否有等待的线程
public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
/*
1.h!=t:头节点不等于尾结点,说明当前队列还有元素
2.头节点的下一个节点是空 || 头节点的下一个节点不是空节点&&当前线程的节点不是头节点的下一个节点
总结一下:如果返回true:
当前队列还有元素,头节点的下一个节点不是空节点&&当前线程的节点不是头节点的下一个节点
*/
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
6.AQS.doAcquireSharedInterruptibly
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//将当前线程构建成一个共享节点入队
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
//获取当前节点的前置节点
final Node p = node.predecessor();
//如果前置节点是头节点
if (p == head) {
//尝试去获取锁
int r = tryAcquireShared(arg);
//大于0说明成功拿到了锁
if (r >= 0) {
//设置头节点并向后传播唤醒
setHeadAndPropagate(node, r);
//原头节点出队
p.next = null; // help GC
failed = false;
return;
}
}
//如果当前节点的前驱节点不是头节点,给当前线程找一个好爸爸
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
7.AQS.setHeadAndPropagate
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
//设置node为头节点
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
//如果当前节点的下一个节点为空 || 当前节点的下一个节点不为空且当前节点的下一个节点是共享节点
if (s == null || s.isShared())
//执行向后唤醒逻辑
doReleaseShared();
}
}
8.release
public void release() {
sync.releaseShared(1);
}
9.releaseShared
public final boolean releaseShared(int arg) {
//如果尝试释放锁成功
if (tryReleaseShared(arg)) {
//执行向后唤醒逻辑
doReleaseShared();
return true;
}
return false;
}
10.tryReleaseShared
protected final boolean tryReleaseShared(int releases) {
for (;;) {
//获取当前最新的state
int current = getState();
//将当前的state 和释放的许可证个数相加
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
//cas成功 返回true
if (compareAndSetState(current, next))
return true;
}
}
11.doReleaseShared
private void doReleaseShared() {
for (;;) {
//获取头节点
Node h = head;
//头节点不为空 && 头节点不是尾结点
if (h != null && h != tail) {
int ws = h.waitStatus;
//如果头结点的等待状态 是 -1
if (ws == Node.SIGNAL) {
//如果cas 设置头节点 -1 , 0 失败
//为啥会失败?其他线程获取到锁以后执行向后唤醒逻辑了
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
// 唤醒节点的线程
unparkSuccessor(h);
}
//如果等待状态 ==0 且cas 设置 头节点的等待状态为向后传播失败
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
//如果头节点没变 也就是说 ,唤醒的后面的节点 还没来得及将自己设置为头节点 ,跳出循环 。
//为什么可以直接跳出?不怕向后唤醒中断么? 不怕 ,首先 ,极端情况已经都判断完了
if (h == head) // loop if head changed
break;
}
}