一、Semaphore

Semaphore,俗称信号量,它是操作系统中PV操作的原语在java的实现,在操作系统中已经存在,但是JAVA中并没有,所以出现了Semaphore工具来实现,它也是基于AQS(AbstractQueuedSynchronizer)实现的。
Semaphore的功能非常强大,大小为1的信号量就类似于互斥锁,通过同时只能有一个线程获取信号量实现。大小为n(n>0)的信号量可以实现限流的功能,它可以实现只能有n个线程同时获取信号量。
信号量可以理解为资源数量,每获取一个资源,那么资源数量减一,当所有资源被获取完,其他没有获取到资源的线程则需要排队等待,默认每个线程获取一个资源。

1、PV

PV操作是操作系统一种实现进程互斥与同步的有效方法。PV操作与信号量(S)的处理相关,P表示通过的意思,V表示释放的意思。用PV操作来管理共享资源时,首先要确保PV操作自身执行的正确性。
P操作的主要动作是:
1)S减1。
2)若S减1后仍大于或等于0,则进程继续执行。
3)若S减1后小于0,则该进程被阻塞后放入等待该信号量的等待队列中,然后转进程调度。
V操作的主要动作是:
1)S加1。
2)若相加后结果大于0,则进程继续执行。
3)若相加后结果小于或等于0,则从该信号的等待队列中释放一个等待进程,然后再返回原进程继续执行或转进程调度。

2、Semaphore的用法

代码中Semaphore对象允许同时三个线程获取锁,在有线程释放锁之后,才会有一个等待线程获取锁。主要用于流控,实现了AQS中的tryAcquireShared抽象方法。

  1. // 声明3个窗口 state: 资源数
  2. Semaphore windows = new Semaphore(3);
  3. for (int i = 0; i < 5; i++) {
  4. new Thread(new Runnable() {
  5. @Override
  6. public void run() {
  7. try {
  8. // 占用窗口 加锁
  9. windows.acquire();
  10. System.out.println(Thread.currentThread().getName() + ": 开始买票");
  11. //模拟买票流程
  12. Thread.sleep(5000);
  13. System.out.println(Thread.currentThread().getName() + ": 购票成功");
  14. } catch (InterruptedException e) {
  15. e.printStackTrace();
  16. } finally {
  17. // 释放窗口
  18. windows.release();
  19. }
  20. }
  21. }).start();
  22. }

Semaphore实现了公平锁和非公平锁,是一种共享锁,可以传入参数代表资源数量,默认是非公平锁。

  1. public Semaphore(int permits) {
  2. sync = new NonfairSync(permits);
  3. }
  4. public Semaphore(int permits, boolean fair) {
  5. sync = fair ? new FairSync(permits) : new NonfairSync(permits);
  6. }

在构造方法中,实例化了公平锁或者非公平锁的实现,并传入了资源数作为参数。
以非公平锁为例,在创建非公平锁时,调用了其父类Sync的构造方法,用来设置资源数(state)。

  1. NonfairSync(int permits) {
  2. super(permits);
  3. }
  4. // Sync的构造方法,把资源数赋值给state
  5. Sync(int permits) {
  6. setState(permits);
  7. }

3、Semaphore的源码实现

1)acquire方法
在Semaphore中,如果资源数量没有了,那么线程将会入队,这里和独占锁不同的是,在创建Node节点时,节点的nextWaiter属性是一个空的node对象常量。
当一个线程获取到锁时,也会再唤醒下一个节点的线程,这里会提前唤醒下一个线程。
在共享锁中,state记录的是一个资源数,每当有线程获取资源时,这个资源数会进行减操作,所以不存在重入的概念。
Semaphore-acquire.png
2)release方法
Semaphore-release.png

二、CountDownLatch

CountDownLatch(闭锁)是一个同步协助类,允许一个或多个线程等待,直到其他线程完成操作集。
CountDownLatch使用给定的计数值(count)初始化。await方法会阻塞直到当前的计数值 (count)由于countDown方法的调用达到0,count为0之后所有等待的线程都会被释放,并 且随后对await方法的调用都会立即返回。这是一个一次性现象 —— count不会被重置。如果你需要一个重置count的版本,那么请考虑使用CyclicBarrier。
CountDownLatch是基于共享锁来实现的,根据构造方法,在创建对象的时候会传入一个int类型的参数,作为资源数,把这个资源数赋值给state属性。

  1. public CountDownLatch(int count) {
  2. if (count < 0) throw new IllegalArgumentException("count < 0");
  3. this.sync = new Sync(count);
  4. }
  5. // Sync的构造方法
  6. Sync(int count) {
  7. setState(count);
  8. }

1、CountDownLatch的用法

代码中,创建了一个CountDownLatch的对象,给了一个资源,创建五个线程,每个线程在执行过程中都会阻塞在await()方法,等主线程执行到countDown()方法时,资源会变为0,此时5个线程会被唤醒,继续执行,这种用法一般在测试并发的场景下使用。

  1. CountDownLatch countDownLatch = new CountDownLatch(1);
  2. for (int i = 0; i < 5; i++) {
  3. new Thread(() -> {
  4. try {
  5. //准备完毕……运动员都阻塞在这,等待号令
  6. countDownLatch.await();
  7. String parter = "【" + Thread.currentThread().getName() + "】";
  8. System.out.println(parter + "开始执行……");
  9. } catch (InterruptedException e) {
  10. e.printStackTrace();
  11. }
  12. }).start();
  13. }
  14. Thread.sleep(2000);// 裁判准备发令
  15. countDownLatch.countDown();// 发令枪:执行发令

比较常用的用法,创建CountDownLatch对象时指定了资源数,多个线程在执行任务,每执行完一个任务就调用countDown()方法,直到最后一个线程执行完,资源数归0,主线程在await()方法处阻塞,当资源数归0时,主线程会被唤醒,继续执行,效果就是主线程会等待所有线程执行完毕之后再继续执行,类似于join()方法。调用 await() 方法的线程会被挂起,它会等待直到 count 值为 0 才继续执行,和 await() 类似,若等待 timeout 时长后,count 值还是没有变为 0,不再等待,继续执行, countDown() 会将 count 减 1,直至为 0。

  1. CountDownLatch countDownLatch = new CountDownLatch(5);
  2. for (int i = 0; i < 5; i++) {
  3. final int index = i;
  4. new Thread(() -> {
  5. try {
  6. Thread.sleep(1000 +
  7. ThreadLocalRandom.current().nextInt(1000));
  8. System.out.println(Thread.currentThread().getName()
  9. + " finish task" + index);
  10. countDownLatch.countDown();
  11. } catch (InterruptedException e) {
  12. e.printStackTrace();
  13. }
  14. }).start();
  15. }
  16. // 主线程在阻塞,当计数器==0,就唤醒主线程往下执行。
  17. countDownLatch.await();
  18. System.out.println("主线程:在所有任务运行完成后,进行结果汇总");

2、CountDownLatch实现原理

底层基于 AbstractQueuedSynchronizer 实现,CountDownLatch 构造函数中指定的 count直接赋给AQS的state;每次countDown()则都是release(1)减1,最后减到0时unpark阻塞线程;这一步是由最后一个执行countdown方法的线程执行的。 而调用await()方法时,当前线程就会判断state属性是否为0,如果为0,则继续往下执行,如果不为0,则使当前线程进入等待状态,直到某个线程将state属性置为0,其就会唤醒在 await()方法中等待的线程。
1)countDown()方法执行流程
CountDownLatch-countDown.png
2)await()方法执行流程
CountDownLatch-await.png

3、CountDownLatch与Thread.join的区别

CountDownLatch的作用就是允许一个或多个线程等待其他线程完成操作,看起来有点类似join() 方法,但其提供了比 join() 更加灵活的API。
CountDownLatch可以手动控制在n个线程里调用n次countDown()方法使计数器进行减一操作,也可以在一个线程里调用n次执行减一操作。而join() 的实现原理是不停检查join线程是否存活,如果 join 线程存活则让当前线 程永远等待。所以两者之间相对来说还是CountDownLatch使用起来较为灵活。
join方法必须获得线程的对象,由线程的对象调用join方法,所以如果是多个线程执行,那么每个线程都需要调用一次join方法。

三、死锁的解决方案

死锁即两个线程互相持有对方线程需要的锁资源,一直在等待对方释放,如果想要打破这种情况,就需要其中一个线程暂时让出锁资源,让其他的线程获取锁先执行。

  1. private static String a = "a";
  2. private static String b = "b";
  3. public static void main(String[] args) {
  4. Thread threadA = new Thread(()->{
  5. synchronized (a) {
  6. log.debug("threadA进入a同步块,执行中...");
  7. try {
  8. // 条件队列作用: 打破死锁的循环
  9. a.wait(5000);
  10. synchronized (b) {
  11. log.debug("threadA进入b同步块,执行中...");
  12. }
  13. } catch (InterruptedException e) {
  14. e.printStackTrace();
  15. }
  16. }
  17. },"threadA");
  18. Thread threadB = new Thread(()->{
  19. synchronized (b) {
  20. log.debug("threadB进入b同步块,执行中...");
  21. try {
  22. Thread.sleep(2000);
  23. synchronized (a) {
  24. log.debug("threadB进入a同步块,执行中...");
  25. }
  26. } catch (InterruptedException e) {
  27. e.printStackTrace();
  28. }
  29. }
  30. },"threadB");
  31. threadA.start();
  32. threadB.start();
  33. }

代码中,线程A在获取锁对象b之前,先调用wait方法,释放了锁资源,进入了条件队列,此时线程B可以获取锁对象继续执行。
一般需要打破死锁,都需要在代码的设计上做修改,避免锁的争抢,可以让一些线程暂时不去争抢锁,等其他线程执行完毕后,再去获取锁资源。

  1. @Data
  2. @AllArgsConstructor
  3. public class Chopstick {
  4. int number;
  5. @Override
  6. public String toString() {
  7. return "筷子{" + number + '}';
  8. }
  9. }
  1. @Slf4j
  2. public class Philosopher extends Thread {
  3. private Chopstick left;
  4. private Chopstick right;
  5. public Philosopher(String name, Chopstick left, Chopstick right) {
  6. super(name);
  7. this.left = left;
  8. this.right = right;
  9. }
  10. public void eat() {
  11. log.debug("eating...");
  12. try {
  13. Thread.sleep(10);
  14. } catch (InterruptedException e) {
  15. e.printStackTrace();
  16. }
  17. }
  18. public void think() {
  19. log.debug("thinking...");
  20. try {
  21. Thread.sleep(2000);
  22. } catch (InterruptedException e) {
  23. e.printStackTrace();
  24. }
  25. }
  26. @Override
  27. public void run() {
  28. while (true) {
  29. // 获得左手筷子
  30. synchronized (left) {
  31. log.debug("获得左手筷子" + left.getNumber());
  32. // 获得右手筷子
  33. synchronized (right) {
  34. log.debug("获得右手筷子" + right.getNumber());
  35. // 吃饭
  36. eat();
  37. }
  38. // 放下右手筷子
  39. }
  40. // 放下左手筷子
  41. log.debug("吃完了,把筷子放回了原处,开始thinking");
  42. think();
  43. }
  44. }
  45. }
  1. public static void main(String[] args) {
  2. //初始化五根筷子
  3. Chopstick c1 = new Chopstick(1);
  4. Chopstick c2 = new Chopstick(2);
  5. Chopstick c3 = new Chopstick(3);
  6. Chopstick c4 = new Chopstick(4);
  7. Chopstick c5 = new Chopstick(5);
  8. // 思考: 如何打破循环
  9. new Philosopher("苏格拉底", c1, c2).start();
  10. new Philosopher("柏拉图", c2, c3).start();
  11. new Philosopher("亚里士多德", c3, c4).start();
  12. new Philosopher("赫拉克利特", c4, c5).start();
  13. new Philosopher("阿基米德", c5,c1).start();
  14. // 暂时放弃优先争抢C5锁对象,让给其他线程,打破死锁
  15. // new Philosopher("阿基米德", c1,c5).start();
  16. }

代码中,5个线程都需要获取到两个锁对象才可以执行逻辑,但是依次获取会出现死锁,即需要的第二个锁对象已经被其他线程获取,这样会导致一直再等待,此时需要某个线程放弃优先争抢别的线程需要的锁对象。
“苏格拉底”在获取C1锁对象后,需要获取C2,但是此时C2已经被“柏拉图”获取,同理,“柏拉图”需要再获取到C3,而C3已经被“亚里士多德”获取,此时就造成了死锁。
打破死锁,例如让“阿基米德”优先获取C1锁对象,这样“赫拉克利特”在获取C5时不会出现被其他线程再用的情况,从而可以在执行完逻辑后,顺利释放C4,让“亚里士多德”获取到C4,从而打破了死锁的情况。