Semaphore

Semaphore,俗称信号量,它的功能非常强大,大小为1的信号量就类似于互斥锁,通过同时只能有一个线程获取信号量实现。大小为n(n>0)的信号量可以实现限流的功能,它可以实现只能有n个线程同时获取信号量。
image.png

常用方法

image.png
permits:表示许可证的数量(资源数)
fair:表示公平性,如果这个设为 true 的话,下次执行的线程会是等待最久的线程

  1. //常用方法:
  2. public void acquire() throws InterruptedException //表示阻塞并获取许可
  3. public boolean tryAcquire() //方法在没有许可的情况下会立即返回false,要获取许可的线程不会阻塞
  4. public void release() //表示释放许可
  5. public int availablePermits() //返回此信号量中当前可用的许可证数
  6. public final int getQueueLength() //返回正在等待获取许可证的线程数
  7. public final boolean hasQueuedThreads() //是否有线程正在等待获取许可证
  8. protected void reducePermits(int reduction) //减少 reduction 个许可证
  9. protected Collection<Thread> getQueuedThreads() //返回所有等待获取许可证的线程集合

应用场景

可以用于做流量控制,特别是公用资源有限的应用场景

  1. public class SemaphoreTest {
  2. public static void main(String[] args) {
  3. // 声明3个窗口 state: 资源数
  4. Semaphore windows = new Semaphore(3);
  5. for (int i = 0; i < 5; i++) {
  6. new Thread(new Runnable() {
  7. @Override
  8. public void run() {
  9. try {
  10. // 占用窗口 加锁
  11. windows.acquire();
  12. System.out.println(Thread.currentThread().getName() + ": 开始买票");
  13. //模拟买票流程
  14. Thread.sleep(5000);
  15. System.out.println(Thread.currentThread().getName() + ": 购票成功");
  16. } catch (InterruptedException e) {
  17. e.printStackTrace();
  18. } finally {
  19. // 释放窗口
  20. windows.release();
  21. }
  22. }
  23. }).start();
  24. }
  25. }
  26. }

源码分析

Semaphore(信号量)是基于AQS实现的,也是共享锁,大概流程是默认必须传入permits参数(表示许可证的数量),大小为n(n>0)的信号量可以实现限流的功能。当多个线程调用.acquire()方法时,就会n-1,为0时,则入队阻塞(入队逻辑与Reentrantlock一致);当其他某一线程处理完业务,调用.release()方法时,就会n+1,然后唤醒被阻塞的线程,这里需要注意的是,共享锁唤醒节点中的线程时是可以唤醒多个,这点和互斥锁不一样。
Semaphore的特性:
1.支持公平/非公平
2.共享锁
3.释放锁是可释放多个节点的线程(互斥锁只能释放一个)
未命名文件.jpg

CountDownLatch

CountDownLatch(闭锁)是一个同步协助类,允许一个或多个线程等待,直到其他线程完成 操作集。 CountDownLatch使用给定的计数值(count)初始化。await方法会阻塞直到当前的计数值 (count)由于countDown方法的调用达到0,count为0之后所有等待的线程都会被释放,并且随后对await方法的调用都会立即返回。这是一个一次性现象——count不会被重置。如果你需要一个重置count的版本,那么请考虑使用CyclicBarrier。

常用方法

image.png

//常用方法
//调⽤ await() ⽅法的线程会被挂起,它会等待直到 count 值为 0 才继续执⾏
public void await() throws InterruptedException { };
//和await()类似,若等待 timeout 时⻓后,count 值还是没有变为 0,不再等待,继续执⾏
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { };
//会将 count 减 1,直⾄为 0
public void countDown() { };

注:countDown⽅法不⽌可以每个线程使⽤⼀次,也可以每个线程使⽤多次。

应用场景

CountDownLatch⼀般⽤作多线程倒计时计数器,强制它们等待其他⼀组(CountDownLatch的初始化决定)任务执⾏完成。
CountDownLatch的两种使⽤场景:
场景1:让单个线程等待
场景2:让多个线程等待

场景1:让单个线程等待

很多时候,我们的并发任务,存在前后依赖关系,⽐如数据详情⻚需要同时调⽤多个接⼝获取数据,并发请求获取到数据后、需要进⾏结果合并;
或者多个数据操作完成后,需要数据check;
这其实都是:在多个线程(任务)完成后,进⾏汇总合并的场景。

/**
* 让单个线程等待:多个线程(任务)完成后,进⾏汇总合并
*/
@Slf4j
public class CountDownLatchTest2 {
    public static void main(String[] args) throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(5);
        for (int i = 0; i < 5; i++) {
            final int index = i;
            new Thread(() -> {
                try {
                    Thread.sleep(1000 +ThreadLocalRandom.current().nextInt(1000));
                    System.out.println(Thread.currentThread().getName() +" finish task" + index);
                    countDownLatch.countDown(); //线程执行完就调用
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    // 主线程在阻塞,当计数器==0,就唤醒主线程往下执⾏。
    countDownLatch.await();
    System.out.println("主线程:在所有任务运⾏完成后,进⾏结果汇总");
    }
}

场景2:让多个线程等待

模拟并发,让并发线程一起执行。

/**
* 让多个线程等待:模拟并发,让并发线程⼀起执⾏
*/
@Slf4j
public class CountDownLatchTest {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                try {
                    //准备完毕……运动员都阻塞在这,等待号令
                    countDownLatch.await();
                    String parter = "【" + Thread.currentThread().getName()+ "】";
                    log.debug(parter + "开始执⾏……");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    Thread.sleep(2000);// 裁判准备发令
    countDownLatch.countDown();// 发令枪:执⾏发令
    }
}

源码分析

底层基于AbstractQueuedSynchronizer实现,CountDownLatch构造函数中指定的count直接赋给AQS的state;每次countDown()则都是release(1)减1,最后减到0时unpark阻塞线程;这⼀步是由最后⼀个执⾏countdown⽅法的线程执⾏的。
⽽调⽤await()⽅法时,当前线程就会判断state属性是否为0,如果为0,则继续往下执⾏,如果不为0,则使当前线程进⼊等待状态,直到某个线程将state属性置为0,其就会唤醒在await()⽅法中等待的线程。
未命名文件 (1).jpg
CountDownLatch与Thread.join区别
CountDownLatch和join⽅法都可以⽤于多线程同步场景,但它们使⽤起来有不同:⼀句话概括,即 CountDownLatch⽐join更灵活强⼤,但相对⽽⾔使⽤复杂。
1.CountDownLatch的作⽤就是允许⼀个或多个线程等待其他线程完成操作,看起来有点类似join()⽅法,但其提供了⽐ join() 更加灵活的API。
2.CountDownLatch可以⼿动控制在n个线程⾥调⽤n次countDown()⽅法使计数器进⾏减⼀操作,也可以在⼀个线程⾥调⽤n次执⾏减⼀操作。
3.⽽join()的实现原理是不停检查join线程是否存活,如果 join线程存活则让当前线程永远等待。所以两者之间相对来说还是CountDownLatch使⽤起来较为灵活。

CyclicBarrier

字面意思回环栅栏(循环屏障),通过它可以实现让一组线程等待至某个状态(屏障点)之后再全部同时执行。叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。
image.png

常用方法

构造方法
1.参数parties表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。
深入理解AQS-Semaphore&CountDownLatch&CyclicBarrie - 图7
2.用于在线程到达屏障时,优先执行 barrierAction,方便处理更复杂的业务场景(该线程的执行时机是在到达屏障之后再执行)
image.png
image.png
重要方法

//初始化一个cyclicBarrier计数器为3
CyclicBarrier cyclicBarrier = new CyclicBarrier(3);

//屏障  指定数量的线程全部调用await()方法时,这些线程不再阻塞
// BrokenBarrierException 表示栅栏已经被破坏,破坏的原因可能是其中一个线程await()时被中断或者超时
public int await() throws InterruptedException, BrokenBarrierException
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException

//循环 通过reset()方法可以进行重置
public void reset()

应用场景

CyclicBarrier可以用于多线程计算数据,最后合并计算结果的场景
image.png

/**
 * 栅栏与闭锁的关键区别在于,所有的线程必须同时到达栅栏位置,才能继续执行。
 */
public class CyclicBarrierTest {  
    public static void main(String[] args) {
        AtomicInteger i = new AtomicInteger();
        CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> {
            System.out.println("t1 t2 end");
        });
        ExecutorService service = Executors.newFixedThreadPool(2);
        for (int j = 0; j < 2; j++) {
            service.submit(() -> {
                System.out.println("start");
                try {
                    TimeUnit.SECONDS.sleep(1);
                    System.out.println("working");
                    cyclicBarrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
            service.submit(() -> {
                System.out.println("start");
                try {
                    TimeUnit.SECONDS.sleep(3);
                    System.out.println("working");
                    cyclicBarrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
        service.shutdown();
    }
}

image.png

源码分析

关注点:
1.一组线程在触发屏障之前互相等待,最后一个线程到达屏障后唤醒逻辑是如何实现的?
2.栅栏循环使用是如何实现的?
3.条件队列到同步队列的转换实现逻辑。
未命名文件 (3).jpg
CyclicBarrier与CountDownLatch的区别
1. CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset() 方 法重置。所以CyclicBarrier能处理更为复杂的业务场景,比如如果计算发生错误,可以重 置计数器,并让线程们重新执行一次 2. CyclicBarrier还提供getNumberWaiting(可以获得CyclicBarrier阻塞的线程数量)、 isBroken(用来知道阻塞的线程是否被中断)等方法。
3. CountDownLatch会阻塞主线程,CyclicBarrier不会阻塞主线程,只会阻塞子线程。
4. CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不 同。CountDownLatch一般用于一个或多个线程,等待其他线程执行完任务后,再执行。 CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行。
5. CyclicBarrier 还可以提供一个 barrierAction,合并多线程计算结果。
6. CyclicBarrier是通过ReentrantLock的”独占锁”和Conditon来实现一组线程的阻塞唤 醒的,而CountDownLatch则是通过AQS的“共享锁”实现。

问题一:什么是同步和互斥?

并发编程技术就两个重要思想:同步和互斥
同步:对于一个任务,同步指的是进程或线程间协调工作,保证程序正确执行(有序);
互斥:在某一时刻,只允许一个线程执行,其他的线程需等待这一个线程执行完之后,方可执行(只能一个访问)。

问题二:什么是管程?

管程是由共享变量/资源以及一组共享变量/资源操作的过程组成

问题三:管程是怎么解决同步和互斥问题的?

管程的互斥是由编译器保证的,通过设置条件变量以及等待唤醒操作解决同步问题

问题四:管程运行原理

第一个线程/进程去访问管程,管程里面有一个条件变量,线程/进程进入管程之后发现条件不满足,则进入条件队列;另一个进程/线程进来,因为管程是同步+互斥的,会发生什么呢?
里面涉及的问题有:
1.谁唤醒谁?
2.被唤醒的进程/线程去执行,那执行唤醒操作的进程/线程去了哪里?
后面线程唤醒前面线程,前面线程执行,后面线程等待,同时进入紧急等待队列,前面线程执行完后发送signal,唤醒后面线程——Hoare模型
前面线程继续等待,后面线程执行,同时发送notify给前面线程——MESA模型

问题五:CountDownLatch的应用场景

1.让多个线程等待:模拟并发,让并发线程一起执行
2.让单个线程等待:多个线程(任务)完成后,进行汇总合并

问题六:如何自定义类加载器

自定义类加载器只需要继承 java.lang.ClassLoader 类,该类有两个核心方法,一个是loadClass(String, boolean),实现了双亲委派机制,还有一个方法是findClass,默认实现是空方法,所以我们自定义类加载器主要是重写findClass方法。

问题七:条件队列到同步队列的转换实现逻辑

调用await()的时候会释放锁,然后线程会加入到条件队列,调用signal()唤醒的时候会把条件队列中的线程节点移动到同步队列中,等待再次获得锁,结合CyclicBarrier源码理解。