信号量的作用在于限制应用中某个部分的并发量,控制流量。
Semaphore的构造方法里会传入一个int类型的参数

  1. /**
  2. * Creates a {@code Semaphore} with the given number of
  3. * permits and nonfair fairness setting.
  4. *
  5. * @param permits the initial number of permits available.
  6. * This value may be negative, in which case releases
  7. * must occur before any acquires will be granted.
  8. */
  9. public Semaphore(int permits) {
  10. sync = new NonfairSync(permits);
  11. }

从注释可以看出,permits参数可以为负数,这种情况下释放资源(releases)必须在申请资源(acquires)之前调用。

  1. /**
  2. * 从信号量中获得一个许可,线程阻塞直到获取到许可,或者线程被关闭
  3. * 如果有一个许可是可用的,就获取到该许可,并使可用许可数目减少1
  4. */
  5. public void acquire() throws InterruptedException {
  6. sync.acquireSharedInterruptibly(1);
  7. }
  8. /**
  9. * 释放一个许可,可用许可数量增1
  10. */
  11. public void release() {
  12. sync.releaseShared(1);
  13. }

releases()方法和acquires()方法分别执行释放许可和申请许可的操作

使用

example 1:

  1. public static void main(String[] args) throws InterruptedException {
  2. Semaphore semaphore = new Semaphore(3);
  3. ExecutorService executorService=Executors.newCachedThreadPool();
  4. for (int i=0;i<20;i++){
  5. executorService.execute(()->{
  6. try {
  7. semaphore.acquire();
  8. System.out.println(Thread.currentThread().getName()+" 进入!");
  9. Thread.sleep(2000);
  10. } catch (InterruptedException e) {
  11. e.printStackTrace();
  12. }finally {
  13. semaphore.release();
  14. System.out.println(Thread.currentThread().getName()+" 释放!");
  15. }
  16. });
  17. }
  18. executorService.shutdown();
  19. }

20个线程都在请求信号量,但信号量只允许有3个线程进入,所以只有获得信号量的线程结束释放信号量后才能有新的线程进入。如果我们不释放信号呢,就只能打印出3句log了。

  1. pool-1-thread-1 进入!
  2. pool-1-thread-5 进入!
  3. pool-1-thread-2 进入!

example 2:

  1. ExecutorService executor = Executors.newFixedThreadPool(10);
  2. Semaphore semaphore = new Semaphore(5);
  3. Runnable longRunningTask = () -> {
  4. boolean permit = false;
  5. try {
  6. permit = semaphore.tryAcquire(1, TimeUnit.SECONDS);
  7. if (permit) {
  8. System.out.println("Semaphore acquired");
  9. sleep(5);
  10. } else {
  11. System.out.println("Could not acquire semaphore");
  12. }
  13. } catch (InterruptedException e) {
  14. throw new IllegalStateException(e);
  15. } finally {
  16. if (permit) {
  17. semaphore.release();
  18. }
  19. }
  20. }
  21. IntStream.range(0, 10)
  22. .forEach(i -> executor.submit(longRunningTask));
  23. stop(executor);

执行器可能并发执行10个任务,但是我们设置信号量的大小为5,因此并发数被限制为5。切记要使用try/finally 代码块以释放信号量。
执行上述代码获得以下输出:

  1. Semaphore acquired
  2. Semaphore acquired
  3. Semaphore acquired
  4. Semaphore acquired
  5. Semaphore acquired
  6. Could not acquire semaphore
  7. Could not acquire semaphore
  8. Could not acquire semaphore
  9. Could not acquire semaphore
  10. Could not acquire semaphore

example 3:

  1. Semaphore semaphore = new Semaphore(1);
  2. semaphore.acquire();
  3. try {
  4. ......
  5. }
  6. finally {
  7. semaphore.release();
  8. }

如果将信号量的初始大小设置为1,那么它就是一个不可重入的互斥锁。