Stampedlock

在之前的读写锁中读-读并发时底层还是每次都用cas去修改state中属于读锁的高16位的状态,它的性能还是比不上不加锁时的性能,为了进一步优化读锁,将读锁的性能优化到极致,JDK8加入Stampedlock类。
Stampedlock类它的特点是在使用读锁、写锁时都必须配合【戳】使用加解读锁。
这个戳是什么呢?举个例子,Stampedlock类加解锁时涉及的语句如下

  1. //加解读锁
  2. long stamp = lock. readLock();
  3. lock.unlockRead(stamp) ;
  4. //加解写锁
  5. long stamp = lock.writeLock();
  6. lock. unlockwrite( stamp ) ;

由上面的语句可以知道,调用Stampedlock对象锁readLock方法去获取锁的时候,会返回一个Long型的数据,这个数据就是【戳】; 这样解锁的时候就必须拿着这个【戳】才有可能解锁成功。
其实这个戳并不是提高性能的关键,但它却是性能提升的保证。
真正能提高性能的是乐观读操作,StampedLock支持tryoptimisticRead()方法(尝试一次乐观读),读取完毕后需要做一次戳校验;如果校验通过,表示这期间确实没有写操作,数据可以安全使用;如果校验没通过,需要重新获取读锁,保证数据安全。
image.png
值得注意的是tryoptimisticRead方法内部并没有加任何的锁,它仅仅是返回一个【戳】而已。但是在真正进行读取操作之前需要做一次检验【戳】的操作(在获取乐观读方法的戳到真正读取之间,可能会有其他的写线程对数据做出变化)。
检验【戳】的方法是validate,若果在检验期间没有其他线程的干扰,则检验【戳】无异常,返回一个true,该if条件就不会被执行,反之若其他线程将【戳】进行更新,检验【戳】就会失败,进入if内部进行锁升级,从乐观读锁升级到读锁。

Stampedlock缺点

1、不支持条件变量
2、不支持可重入

Semaphore

  1. https://blog.csdn.net/javazejian/article/details/76167357?

概述

semaphore是信号量:用来限制能同时访问共享资源的线程上限。它跟之前学习的ReentrantLock等独占锁是由区别的,独占锁同一时刻只允许有一个线程来访问。而semaphore的使用场景是共享资源有多个,而且允许有多个线程同时来访问,但是对同时访问的线程数量进行限制;
有些文章会将其跟红绿灯进行相比喻,我们这里将其比喻为停车场,在停车场中有多个停车位(多个共享资源),汽车就相当于需要停车位的线程,但是车位是有限的,不能让无限的车辆进入停车场,所以在汽车进入停车场之前有一个告示牌告知剩余车位还有多少。每进入一辆汽车告示牌上的数字就减一,当其变为0的时候就不允许汽车进入了。

演示

  1. public class SemaphoreTest {
  2. public static void main(String[] args) {
  3. //1、创建对象,这个参数表示同时允许的线程数量
  4. Semaphore semaphore = new Semaphore(3);
  5. //2、开启10个线程,每个线程执行3秒
  6. for (int i = 0; i < 10; i++) {
  7. new Thread(()->{
  8. try {
  9. //获取许可,开车到停车位
  10. semaphore.acquire();
  11. } catch (InterruptedException e) {
  12. e.printStackTrace();
  13. }
  14. try {
  15. System.out.println("running....");
  16. Thread.sleep(3000);
  17. System.out.println("end ......");
  18. } catch (InterruptedException e) {
  19. e.printStackTrace();
  20. }finally {
  21. //释放许可,将车开走
  22. semaphore.release();
  23. }
  24. }).start();
  25. }
  26. }
  27. }

从运行的效果来看,每次同时执行的线程数量是3个,而若果不是使用semaphore信号量的方式去开启线程的话,那么这十个线程肯定是同时开始同时结束(几乎)。

应用

1、使用Semaphore限流,在访问高峰期时,让请求线程阻塞,高峰期过去再释放许可,当然它只适合限制单机线程数量(没有考虑分布式),并且仅是限制线程的数量,而不是限制资源数(例如Tomcat中限制连接数量,请对比Tomcat LimitLatch的实现,跟Semaphore不一样) 不要将线程数量跟资源数据混淆!!

2、当我们的资源数跟线程数量一样的时候用Semaphore去限流就比较好一点。比如说数据库连接池,一个线程对应一个连接,用Semaphore实现简单连接池,对比『享元模式』下的实现(用wait notify),性能和可读性显然更好,注意下面的实现中线程数和数据库连接数是相等的

原理

1、加锁解锁流程

Semaphore有点像一个停车场,permits 就好像停车位数量,当线程获得了permits 就像是获得了停车位,然后停车场显示空余车位减一
刚开始,我们假设permits (类似于state)的值为3,这时5个线程来获取资源(5辆车来争抢停车位)。
因为Semaphore最终继承了AQS,当使用Semaphore中一个参数的构造器时,其内部会将permits 传递给父类的构造参数,也就是最终这个permits 的值是赋值给了AQS中的state状态的。
image.png
假设其中 Thread-1 ,I Thread-2,Thread-4 调用acquire方法竞争,该方法内部用到了CAS操作更新state;
竞争成功了,则获得停车许可。

三个线程都获取成功之后state的值变为0,而Thread-0和Thread-3则竞争失败,进入AQS队列 park阻塞。
也就是说当Thread-0和Thread-3尝试获取锁的时候,由于将state预减之后state的值小于0,此时根本不会使用CAS操作更新state的值;即tryAcquireShared方法直接返回-1表示失败了。然后执行doAcquireSharedInterruptibly方法。
该方法首先创建节点并插入等待队列,然后查看该节点的前驱节点是否,也就是查看它是不是老二节点,是老二节点才有资格去尝试获取锁许可。这个时候还不能获取成功的话就会将其阻塞。
image.png

相关源码
  1. public void acquire() throws InterruptedException {
  2. sync.acquireSharedInterruptibly(1);
  3. }
  4. public final void acquireSharedInterruptibly(int arg)
  5. throws InterruptedException {
  6. if (Thread.interrupted())
  7. throw new InterruptedException();
  8. // 该方法如果返回负数表示获取锁失败,大于等于0表示获取锁成功,且返回值还表示剩余资源数
  9. if (tryAcquireShared(arg) < 0)
  10. doAcquireSharedInterruptibly(arg);
  11. }
  12. //该方法的流程大概跟分析ReentrantLock、读写锁时差不多。不再赘述。
  13. private void doAcquireSharedInterruptibly(int arg)
  14. throws InterruptedException {
  15. //创建节点
  16. final Node node = addWaiter(Node.SHARED);
  17. boolean failed = true;
  18. try {
  19. for (;;) {
  20. final Node p = node.predecessor();
  21. if (p == head) {
  22. int r = tryAcquireShared(arg);
  23. if (r >= 0) {
  24. setHeadAndPropagate(node, r);
  25. p.next = null; // help GC
  26. failed = false;
  27. return;
  28. }
  29. }
  30. if (shouldParkAfterFailedAcquire(p, node) &&
  31. parkAndCheckInterrupt())
  32. throw new InterruptedException();
  33. }
  34. } finally {
  35. if (failed)
  36. cancelAcquire(node);
  37. }
  38. }

2、解锁流程

假如这时Thread-4执行完代码释放了一个permits,状态如下
image.png
接下来Thread-0竞争成功,permits再次设置为0,设置自己为head节点,断开原来的head节点,unpark唤醒接下来的Thread-3节点,但由于permits是0,因此Thread-3在尝试不成功后再次进入park 状态。

相关源码
  1. public void release() {
  2. sync.releaseShared(1);
  3. }
  4. public final boolean releaseShared(int arg) {
  5. if (tryReleaseShared(arg)) {
  6. doReleaseShared();
  7. return true;
  8. }
  9. return false;
  10. }
  11. protected final boolean tryReleaseShared(int releases) {
  12. for (;;) {
  13. int current = getState();
  14. int next = current + releases;
  15. if (next < current) // overflow
  16. throw new Error("Maximum permit count exceeded");
  17. //试图用CAS更新state
  18. if (compareAndSetState(current, next))
  19. return true;
  20. }
  21. }
  22. private void doReleaseShared() {
  23. for (;;) {
  24. Node h = head;
  25. if (h != null && h != tail) {
  26. int ws = h.waitStatus;
  27. if (ws == Node.SIGNAL) {
  28. if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
  29. continue; // loop to recheck cases
  30. unparkSuccessor(h);
  31. }
  32. else if (ws == 0 &&
  33. !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
  34. continue; // loop on failed CAS
  35. }
  36. if (h == head) // loop if head changed
  37. break;
  38. }
  39. }

Countdownlatch(倒计时锁)

用来进行线程同步协作,让某个线程等待所有线程完成倒计时。
其中构造参数用来初始化等待计数值,某个线程调用await()用来等待计数归零,当其他线程执行完任务之后就会调用countDown()方法用来让计数减一。
这个Countdownlatch的实现方式很简单,其内部同样是维护了一个继承自AQS的同步器,跟其他同步器不一样的地方在于,该类的tryAcquireShared方法 规定线程是否获取锁的条件是state是否是0,state大于0时就会被阻塞住。

用法演示

  1. public class CountDownLatchDemo {
  2. public static void main(String[] args) throws InterruptedException {
  3. CountDownLatch countDownLatch= new CountDownLatch(3);
  4. new Thread(()->{
  5. System.out.println("开始...");
  6. mySleep(1);
  7. countDownLatch.countDown();
  8. System.out.println("结束...");
  9. }).start();
  10. new Thread(()->{
  11. System.out.println("开始...");
  12. mySleep(3);
  13. countDownLatch.countDown();
  14. System.out.println("结束...");
  15. }).start();
  16. new Thread(()->{
  17. System.out.println("开始...");
  18. mySleep(2);
  19. countDownLatch.countDown();
  20. System.out.println("结束...");
  21. }).start();
  22. System.out.println("正在等待......");
  23. //一般都是主线程等待其他线程执行完返回结果,再恢复运行
  24. countDownLatch.await();
  25. System.out.println("主线程结束....");
  26. }
  27. public static void mySleep(int time){
  28. try {
  29. Thread.sleep(1000*time);
  30. } catch (InterruptedException e) {
  31. e.printStackTrace();
  32. }
  33. }
  34. }

这里就可以看到主线程需要等待三个线程将计数减为0时才能继续往下执行。

用法改进

我们之前肯定学过join方法,这个方法也可以用来等待其他线程;也就是说在上面的案例中我们同样可以在主线程中调用那三个线程的join方法等待它们运行结束即可,效果跟案例是一样的。
但是join方法是属于比较底层的API,用起来相对比较繁琐;而countDownLatch是属于高级的API,比join更适用与各种场景。虽然在案例中我们是自己创建了三个线程,但是在实际开发中我们肯定是使用线程池的方式获取线程来达到线程的重用的;这个时候你就会发现,有时候线程池中的线程是一直在运行的,不会轻易结束线程。这个时候你再调用join方法的话,就麻烦了。
而我们使用countDownLatch的话就不必需要等待线程结束,直接使其计数减1即可。

  1. public static void main(String[] args) {
  2. CountDownLatch countDownLatch= new CountDownLatch(3);
  3. //创建线程池
  4. ExecutorService service = Executors.newFixedThreadPool(4);
  5. service.submit(()->{
  6. System.out.println("begin...");
  7. //就是简化了代码,本质还是让线程睡眠
  8. mySleep(1);
  9. countDownLatch.countDown();
  10. System.out.println("end...");
  11. });
  12. service.submit(()->{
  13. System.out.println("begin...");
  14. mySleep(2);
  15. countDownLatch.countDown();
  16. System.out.println("end...");
  17. });
  18. service.submit(()->{
  19. System.out.println("begin...");
  20. mySleep(3);
  21. countDownLatch.countDown();
  22. System.out.println("end...");
  23. });
  24. service.submit(()->{
  25. try {
  26. System.out.println("等待线程正在等待....");
  27. countDownLatch.await();
  28. System.out.println("等待线程结束运行....");
  29. } catch (InterruptedException e) {
  30. e.printStackTrace();
  31. }
  32. });
  33. }

相关源码

  1. public class CountDownLatch {
  2. private static final class Sync extends AbstractQueuedSynchronizer {
  3. Sync(int count) {
  4. setState(count);
  5. }
  6. int getCount() {
  7. return getState();
  8. }
  9. protected int tryAcquireShared(int acquires) {
  10. return (getState() == 0) ? 1 : -1;
  11. }
  12. //其他线程调用release方法的时候就会调用该方法
  13. protected boolean tryReleaseShared(int releases) {
  14. // Decrement count; signal when transition to zero
  15. for (;;) {
  16. int c = getState();
  17. if (c == 0)
  18. return false;
  19. //每次释放将其减1
  20. int nextc = c-1;
  21. //用CAS操作更新其状态
  22. if (compareAndSetState(c, nextc))
  23. //倘若减到0了,就会去尝试唤醒阻塞线程;没有减到0就返回false就不会唤醒阻塞线程
  24. return nextc == 0;
  25. }
  26. }
  27. }
  28. private final Sync sync;
  29. public CountDownLatch(int count) {
  30. if (count < 0) throw new IllegalArgumentException("count < 0");
  31. this.sync = new Sync(count);
  32. }
  33. public void await() throws InterruptedException {
  34. sync.acquireSharedInterruptibly(1);
  35. }
  36. public boolean await(long timeout, TimeUnit unit)
  37. throws InterruptedException {
  38. return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
  39. }
  40. public void countDown() {
  41. sync.releaseShared(1);
  42. }
  43. public long getCount() {
  44. return sync.getCount();
  45. }
  46. }

应用之等待远程调用结束

CyclicBarrier问题

下面这个小例子大家都能看懂,就是在2个线程执行完之后等待线程才能继续往下执行

  1. public static void main(String[] args) {
  2. CountDownLatch countDownLatch= new CountDownLatch(2);
  3. //创建线程池
  4. ExecutorService service = Executors.newFixedThreadPool(5);
  5. service.submit(()->{
  6. System.out.println("task1 begin...");
  7. mySleep(1);
  8. countDownLatch.countDown();
  9. System.out.println("task1 end...");
  10. });
  11. service.submit(()->{
  12. System.out.println("task2 begin...");
  13. mySleep(2);
  14. countDownLatch.countDown();
  15. System.out.println("task2 end...");
  16. });
  17. service.submit(()->{
  18. try {
  19. System.out.println("等待线程正在等待....");
  20. countDownLatch.await();
  21. System.out.println("等待线程结束运行....");
  22. } catch (InterruptedException e) {
  23. e.printStackTrace();
  24. }
  25. });
  26. }

但是我们现在的需求是希望task1、task2反复被运行3遍;同样等待线程这边也要做三遍结果的汇总;
这个时候如果使用一个for循环完成的话,虽然也能实现功能以及效果;但是这个countDownLatch对象也被创建了三次,这里的countDownLatch对象能被重用吗?答案是不行,这个countDownLatch只能在构造方法时给它一个倒计时计数,这个计数在构造完成后就不能修改了。

要满足这种场景就需要使用到CyclicBarrier循环栅栏工具类,它跟countDownLatch非常类似,也是用来进行线程协作的,也需要等待线程满足某个计数,当计数减到0就放行。
但是它跟countDownLatch最大的区别就是它的计数是可以重置的,也就是可以修改的,这就意味着它是能够被重用的。
使用的时候构造时设置『计数个数』,每个线程执行到某个需要“同步”的时刻调用await()方法进行等待,当等待的线程数满足『计数个数』时,继续执行。

使用演示

用法跟countDownLatch非常相似,也可以达到重用的效果

  1. public class CyclicBarrierTest {
  2. public static void main(String[] args) {
  3. ExecutorService service = Executors.newFixedThreadPool(2);
  4. CyclicBarrier cyclicBarrier=new CyclicBarrier(2,()->{
  5. //第二个参数开启的线程是用来汇总其他线程的结果的,也就是等其他线程都执行完成才会执行该线程
  6. System.out.println("等待线程执行....");
  7. });
  8. for (int i = 0; i < 3; i++) {
  9. service.submit(()->{
  10. System.out.println("task1 ...");
  11. mySleep(1);
  12. try {
  13. //等待
  14. cyclicBarrier.await();
  15. System.out.println("task1 ..end.");
  16. } catch (InterruptedException |BrokenBarrierException e) {
  17. e.printStackTrace();
  18. }
  19. });
  20. service.submit(()->{
  21. System.out.println("task2 ...");
  22. mySleep(2);
  23. try {
  24. //等待
  25. cyclicBarrier.await();
  26. System.out.println("task2 ..end.");
  27. } catch (InterruptedException |BrokenBarrierException e) {
  28. e.printStackTrace();
  29. }
  30. });
  31. }
  32. service.shutdown();
  33. }
  34. public static void mySleep(int time) {
  35. try {
  36. Thread.sleep(1000 * time);
  37. } catch (InterruptedException e) {
  38. e.printStackTrace();
  39. }
  40. }
  41. }

注意,线程池的数量跟栅栏中线程数一致。