一、介绍
1.1 简介
Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。
1.2 Semaphore API
//返回此信号量中当前可用的许可证数。
public int availablePermits();
//返回正在等待获取许可证的线程数。
public final int getQueueLength();
//是否有线程正在等待获取许可证。
public final boolean hasQueuedThreads();
//获取并返回所有立即可用的许可证。
public int drainPermits();
//减少reduction个许可证,是个protected方法。
protected void reducePermits(int reduction)
//返回所有等待获取许可证的线程集合
protected Collection<Thread> getQueuedThreads() ;
//是否公平锁
public boolean isFair();
二、示例
模拟30个线程读取数据保存到数据库中,而数据库的连接数只有10个,这是需要限制获取数据库连接数最多只能10;不然获取不到数据库连接异常。
public class SemaphoreTest {
private static final int THREAD_COUNT = 30;
private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
private static Semaphore s = new Semaphore(10);
public static void main(String[] args) {
for (int i = 0; i < THREAD_COUNT; i++) {
int a=i;
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
s.acquire();
System.out.println("save data" + a);
TimeUnit.SECONDS.sleep(1);
s.release();
} catch (InterruptedException e) {
}
}
});
}
threadPool.shutdown();
}
}
三、源码分析
3.1 UML 图示
从UML 图示可以看到,Semaphore 内部采用AQS 实现了公平锁和非公平锁
3.2 Semaphore的构造方法
//默认初始化非公平锁
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
3.3 acquire 方法
从Semaphore中获取”许可证“,阻塞直到有一个”许可证“可用 或者 线程被打断。
获取许可证,如果有许可证可用, 线程会立即返回执行,并减少许可证的数量;
如果没有许可证可用,那么会阻塞线程的执行,直到:
- 有其他线程释放有许可证, 当前线程是下一个分配许可证
- 有其他线程打断当前线程
实际上是调用acquireSharedInterruptibly方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//判断线程是否被打断
if (Thread.interrupted())
throw new InterruptedException();
//获取共享锁
if (tryAcquireShared(arg) < 0)
//失败后,进入等待队列
doAcquireSharedInterruptibly(arg);
}
非公平锁 获取共享锁实现nonfairTryAcquireShared
final int nonfairTryAcquireShared(int acquires) {
//自旋
for (;;) {
//获取”许可证“数量
int available = getState();
//剩余”许可证“数量
int remaining = available - acquires;
//如果”许可证“数量少于0, 则阻塞线程
if (remaining < 0 ||
//否则,cas 更新可用的”许可证“数量
compareAndSetState(available, remaining))
return remaining;
}
}
公平锁 获取共享锁实现
protected int tryAcquireShared(int acquires) {
//自旋
for (;;) {
//当前是否有线程在等待队列中,等待获取许可证
if (hasQueuedPredecessors())
return -1;
//获取”许可证“数量
int available = getState();
//剩余”许可证“数量
int remaining = available - acquires;
//如果”许可证“数量少于0, 则阻塞线程
if (remaining < 0 ||
//否则,cas 更新可用的许可证数量
compareAndSetState(available, remaining))
return remaining;
}
}
3.4 release 方法
实际上是调用releaseShared 方法
public final boolean releaseShared(int arg) {
//尝试释放共享锁
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared 方法逻辑
protected final boolean tryReleaseShared(int releases) {
//自旋
for (;;) {
//获取”许可证“数量
int current = getState();
//回收后的数量
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
//cas 更新”许可证“数量
if (compareAndSetState(current, next))
return true;
}
}
参考
- 《Java并发编程的艺术》