一,使用

  1. @Test
  2. public void test3()throws InterruptedException{
  3. final Semaphore semaphore = new Semaphore(2,true);
  4. new Thread(()->{
  5. try {
  6. semaphore.acquire();
  7. System.out.println("线程A获取通行证成功!");
  8. TimeUnit.SECONDS.sleep(10);
  9. }catch (Exception e){
  10. }finally {
  11. semaphore.release();
  12. }
  13. }).start();
  14. TimeUnit.MICROSECONDS.sleep(200);
  15. new Thread(()->{
  16. try {
  17. semaphore.acquire();
  18. System.out.println("线程B获取通行证成功!");
  19. TimeUnit.SECONDS.sleep(10);
  20. }catch (Exception e){
  21. }finally {
  22. semaphore.release();
  23. }
  24. }).start();
  25. TimeUnit.MILLISECONDS.sleep(200);
  26. new Thread(() ->{
  27. try {
  28. semaphore.acquire();
  29. System.out.println("线程C获取通行证成功!");
  30. } catch (InterruptedException e) {
  31. }finally {
  32. semaphore.release();
  33. }
  34. }).start();
  35. while (true){}
  36. }
  1. public class Pool {
  2. /** 可同时访问资源的最大线程数*/
  3. private static final int MAX_AVAILABLE = 100;
  4. /** 信号量 表示:可获取的对象通行证*/
  5. private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
  6. /** 共享资源,可以想象成 items 数组内存储的都是Connection对象 模拟是链接池*/
  7. protected Object[] items = new Object[MAX_AVAILABLE];
  8. /** 共享资源占用情况,与items数组一一对应,比如:items[0]对象被外部线程占用,那么 used[0] == true,否则used[0] == false*/
  9. protected boolean[] used = new boolean[MAX_AVAILABLE];
  10. /**
  11. * 获取一个空闲对象
  12. * 如果当前池中无空闲对象,则等待..直到有空闲对象为止
  13. */
  14. public Object getItem() throws InterruptedException {
  15. available.acquire();
  16. return getNextAvailableItem();
  17. }
  18. /**
  19. * 归还对象到池中
  20. */
  21. public void putItem(Object x) {
  22. if (markAsUnused(x))
  23. available.release();
  24. }
  25. /**
  26. * 获取池内一个空闲对象,获取成功则返回Object,失败返回Null
  27. * 成功后将对应的 used[i] = true
  28. */
  29. private synchronized Object getNextAvailableItem() {
  30. for (int i = 0; i < MAX_AVAILABLE; ++i) {
  31. if (!used[i]) {
  32. used[i] = true;
  33. return items[i];
  34. }
  35. }
  36. return null;
  37. }
  38. /**
  39. * 归还对象到池中,归还成功返回true
  40. * 归还失败:
  41. * 1.池中不存在该对象引用,返回false
  42. * 2.池中存在该对象引用,但该对象目前状态为空闲状态,也返回false
  43. */
  44. private synchronized boolean markAsUnused(Object item) {
  45. for (int i = 0; i < MAX_AVAILABLE; ++i) {
  46. if (item == items[i]) {
  47. if (used[i]) {
  48. used[i] = false;
  49. return true;
  50. } else
  51. return false;
  52. }
  53. }
  54. return false;
  55. }
  56. }

二,源码

semaphore.jpg

1.构造器

  1. //默认是非公平锁
  2. public Semaphore(int permits) {
  3. sync = new NonfairSync(permits);
  4. }
  5. //可以指定为公平锁的构造器
  6. public Semaphore(int permits, boolean fair) {
  7. sync = fair ? new FairSync(permits) : new NonfairSync(permits);
  8. }
  9. //=======================//
  10. Sync(int permits) {
  11. setState(permits);
  12. }

2.acquire

  1. public void acquire() throws InterruptedException {
  2. //去抢占锁,默认传1,如果想要传其他参数,可以手动指定
  3. sync.acquireSharedInterruptibly(1);
  4. }

3.AQS.acquireSharedInterruptibly

  1. public final void acquireSharedInterruptibly(int arg)
  2. throws InterruptedException {
  3. //如果当前线程已经被中断 抛出中断异常
  4. if (Thread.interrupted())
  5. throw new InterruptedException();
  6. //小于0 的情况有两种:
  7. //锁没有被持有 或者 持有锁的线程不是当前线程
  8. //当前剩下的证书不足以支持当前线程本次获取
  9. if (tryAcquireShared(arg) < 0)
  10. doAcquireSharedInterruptibly(arg);
  11. }

4.Semaphore.FairSync.tryAcquireShared

  1. protected int tryAcquireShared(int acquires) {
  2. for (;;) {
  3. // 当前队列还有元素,头节点的下一个节点不是空节点&&当前线程的节点不是头节点的下一个节点
  4. if (hasQueuedPredecessors())
  5. //返回-1
  6. return -1;
  7. //获取当前最新的state值
  8. int available = getState();
  9. //用state-传入的值
  10. int remaining = available - acquires;
  11. //条件1成立 :当前剩下的证书不足以支持当前线程获取
  12. //条件2成立:前置条件:当前剩下的证书足够支持当前线程持有。
  13. //如果cas去设置新的state成功,返回
  14. if (remaining < 0 ||
  15. compareAndSetState(available, remaining))
  16. return remaining;
  17. }
  18. }

5.AQS.hasQueuedPredecessors

判断当前AQS阻塞队列里面是否有等待的线程

  1. public final boolean hasQueuedPredecessors() {
  2. Node t = tail; // Read fields in reverse initialization order
  3. Node h = head;
  4. Node s;
  5. /*
  6. 1.h!=t:头节点不等于尾结点,说明当前队列还有元素
  7. 2.头节点的下一个节点是空 || 头节点的下一个节点不是空节点&&当前线程的节点不是头节点的下一个节点
  8. 总结一下:如果返回true:
  9. 当前队列还有元素,头节点的下一个节点不是空节点&&当前线程的节点不是头节点的下一个节点
  10. */
  11. return h != t &&
  12. ((s = h.next) == null || s.thread != Thread.currentThread());
  13. }

6.AQS.doAcquireSharedInterruptibly

  1. private void doAcquireSharedInterruptibly(int arg)
  2. throws InterruptedException {
  3. //将当前线程构建成一个共享节点入队
  4. final Node node = addWaiter(Node.SHARED);
  5. boolean failed = true;
  6. try {
  7. for (;;) {
  8. //获取当前节点的前置节点
  9. final Node p = node.predecessor();
  10. //如果前置节点是头节点
  11. if (p == head) {
  12. //尝试去获取锁
  13. int r = tryAcquireShared(arg);
  14. //大于0说明成功拿到了锁
  15. if (r >= 0) {
  16. //设置头节点并向后传播唤醒
  17. setHeadAndPropagate(node, r);
  18. //原头节点出队
  19. p.next = null; // help GC
  20. failed = false;
  21. return;
  22. }
  23. }
  24. //如果当前节点的前驱节点不是头节点,给当前线程找一个好爸爸
  25. if (shouldParkAfterFailedAcquire(p, node) &&
  26. parkAndCheckInterrupt())
  27. throw new InterruptedException();
  28. }
  29. } finally {
  30. if (failed)
  31. cancelAcquire(node);
  32. }
  33. }

7.AQS.setHeadAndPropagate

  1. private void setHeadAndPropagate(Node node, int propagate) {
  2. Node h = head; // Record old head for check below
  3. //设置node为头节点
  4. setHead(node);
  5. if (propagate > 0 || h == null || h.waitStatus < 0 ||
  6. (h = head) == null || h.waitStatus < 0) {
  7. Node s = node.next;
  8. //如果当前节点的下一个节点为空 || 当前节点的下一个节点不为空且当前节点的下一个节点是共享节点
  9. if (s == null || s.isShared())
  10. //执行向后唤醒逻辑
  11. doReleaseShared();
  12. }
  13. }

8.release

  1. public void release() {
  2. sync.releaseShared(1);
  3. }

9.releaseShared

  1. public final boolean releaseShared(int arg) {
  2. //如果尝试释放锁成功
  3. if (tryReleaseShared(arg)) {
  4. //执行向后唤醒逻辑
  5. doReleaseShared();
  6. return true;
  7. }
  8. return false;
  9. }

10.tryReleaseShared

  1. protected final boolean tryReleaseShared(int releases) {
  2. for (;;) {
  3. //获取当前最新的state
  4. int current = getState();
  5. //将当前的state 和释放的许可证个数相加
  6. int next = current + releases;
  7. if (next < current) // overflow
  8. throw new Error("Maximum permit count exceeded");
  9. //cas成功 返回true
  10. if (compareAndSetState(current, next))
  11. return true;
  12. }
  13. }

11.doReleaseShared

  1. private void doReleaseShared() {
  2. for (;;) {
  3. //获取头节点
  4. Node h = head;
  5. //头节点不为空 && 头节点不是尾结点
  6. if (h != null && h != tail) {
  7. int ws = h.waitStatus;
  8. //如果头结点的等待状态 是 -1
  9. if (ws == Node.SIGNAL) {
  10. //如果cas 设置头节点 -1 , 0 失败
  11. //为啥会失败?其他线程获取到锁以后执行向后唤醒逻辑了
  12. if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
  13. continue;
  14. // 唤醒节点的线程
  15. unparkSuccessor(h);
  16. }
  17. //如果等待状态 ==0 且cas 设置 头节点的等待状态为向后传播失败
  18. else if (ws == 0 &&
  19. !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
  20. continue; // loop on failed CAS
  21. }
  22. //如果头节点没变 也就是说 ,唤醒的后面的节点 还没来得及将自己设置为头节点 ,跳出循环 。
  23. //为什么可以直接跳出?不怕向后唤醒中断么? 不怕 ,首先 ,极端情况已经都判断完了
  24. if (h == head) // loop if head changed
  25. break;
  26. }
  27. }

semaphore流程.jpg