StampedLock:有没有比读写锁更快的锁?

StampedLock性能比读写锁更好, 支持三种模式

  • 写锁
  • 悲观锁
  • 乐观读

写锁、悲观读锁的语义和 ReadWriteLock 的写锁、读锁的语义非常类似,允许多个线程同时获取悲观读锁,但是只允许一个线程获取写锁,写锁和悲观读锁是互斥的。不同的是:StampedLock 里的写锁和悲观读锁加锁成功之后,都会返回一个 stamp;然后解锁的时候,需要传入这个 stamp

  1. final StampedLock sl =
  2. new StampedLock();
  3. // 获取/释放悲观读锁示意代码
  4. long stamp = sl.readLock();
  5. try {
  6. //省略业务相关代码
  7. } finally {
  8. sl.unlockRead(stamp);
  9. }
  10. // 获取/释放写锁示意代码
  11. long stamp = sl.writeLock();
  12. try {
  13. //省略业务相关代码
  14. } finally {
  15. sl.unlockWrite(stamp);
  16. }

StampedLock 的性能之所以比 ReadWriteLock 还要好,其关键是 StampedLock 支持乐观读的方式。ReadWriteLock 支持多个线程同时读,但是当多个线程同时读的时候,所有的写操作会被阻塞;而 StampedLock 提供的乐观读,是允许一个线程获取写锁的,也就是说不是所有的写操作都被阻塞。
注意这里,乐观读这个操作是无锁的,所以相比较 ReadWriteLock 的读锁,乐观读的性能更好一些。

Java SDK中的乐观读代码示例:

  1. class Point {
  2. private int x, y;
  3. final StampedLock sl =
  4. new StampedLock();
  5. //计算到原点的距离
  6. int distanceFromOrigin() {
  7. // 乐观读
  8. long stamp =
  9. sl.tryOptimisticRead();
  10. // 读入局部变量,
  11. // 读的过程数据可能被修改
  12. int curX = x, curY = y;
  13. //判断执行读操作期间,
  14. //是否存在写操作,如果存在,
  15. //则sl.validate返回false
  16. if (!sl.validate(stamp)){
  17. // 升级为悲观读锁
  18. stamp = sl.readLock();
  19. try {
  20. curX = x;
  21. curY = y;
  22. } finally {
  23. //释放悲观读锁
  24. sl.unlockRead(stamp);
  25. }
  26. }
  27. return Math.sqrt(
  28. curX * curX + curY * curY);
  29. }
  30. }

进一步理解乐观读

数据库里的乐观锁,查询的时候需要把 version 字段查出来,更新的时候要利用 version 字段做验证。这个 version 字段就类似于 StampedLock 里面的 stamp。

数据库的乐观锁示例:

  1. // 如果SQL执行成功并且返回的条数为1, 说明没有其他线程修改过这条数据
  2. update product_doc
  3. set version=version+1,...
  4. where id=777 and version=9

StampedLock 使用注意事项

  • StampedLock 不支持重入
  • StampedLock 的悲观读锁、写锁都不支持条件变量
  • 如果线程阻塞在 StampedLock 的 readLock() 或者 writeLock() 上时,此时调用该阻塞线程的 interrupt() 方法,会导致 CPU 飙升; 所以, 使用 StampedLock 一定不要调用中断操作,如果需要支持中断功能,一定使用可中断的悲观读锁 readLockInterruptibly() 和写锁 writeLockInterruptibly() ```java

final StampedLock lock = new StampedLock(); Thread T1 = new Thread(()->{ // 获取写锁 lock.writeLock(); // 永远阻塞在此处,不释放写锁 LockSupport.park(); }); T1.start(); // 保证T1获取写锁 Thread.sleep(100); Thread T2 = new Thread(()-> //阻塞在悲观读锁 lock.readLock() ); T2.start(); // 保证T2阻塞在读锁 Thread.sleep(100); //中断线程T2 //会导致线程T2所在CPU飙升 T2.interrupt(); T2.join();

  1. <a name="yUgL0"></a>
  2. ### StampedLock使用最佳模板
  3. StampedLock 读模板:
  4. ```java
  5. final StampedLock sl =
  6. new StampedLock();
  7. // 乐观读
  8. long stamp =
  9. sl.tryOptimisticRead();
  10. // 读入方法局部变量
  11. ......
  12. // 校验stamp
  13. if (!sl.validate(stamp)){
  14. // 升级为悲观读锁
  15. stamp = sl.readLock();
  16. try {
  17. // 读入方法局部变量
  18. .....
  19. } finally {
  20. //释放悲观读锁
  21. sl.unlockRead(stamp);
  22. }
  23. }
  24. //使用方法局部变量执行业务操作
  25. ......

StampedLock 写模板:

  1. long stamp = sl.writeLock();
  2. try {
  3. // 写共享变量
  4. ......
  5. } finally {
  6. sl.unlockWrite(stamp);
  7. }

CountDownLatch和CyclicBarrier:如何让多线程步调一致?

线程的join()方法:

  • 在A线程中调用了B线程的join()方法时,表示只有当B线程执行完毕时,A线程才能继续执行。
  • 如果A线程中掉用B线程的join(10),则表示A线程会等待B线程执行10毫秒,10毫秒过后,A、B线程并行执行。需要注意的是,jdk规定,join(0)的意思不是A线程等待B线程0秒,而是A线程等待B线程无限时间,直到B线程执行完毕,即join(0)等价于join()。
  • join方法必须在线程start方法调用之后调用才有意义. ```java

while(存在未对账订单){ // 查询未对账订单 Thread T1 = new Thread(()->{ pos = getPOrders(); }); T1.start(); // 查询派送单 Thread T2 = new Thread(()->{ dos = getDOrders(); }); T2.start(); // 等待T1、T2结束 T1.join(); T2.join(); // 执行对账操作 diff = check(pos, dos); // 差异写入差异库 save(diff); }

  1. <a name="yeavH"></a>
  2. ###
  3. <a name="5coAW"></a>
  4. ### CountDownLatch用法
  5. ```java
  6. // 创建2个线程的线程池
  7. Executor executor =
  8. Executors.newFixedThreadPool(2);
  9. while(存在未对账订单){
  10. // 计数器初始化为2
  11. CountDownLatch latch =
  12. new CountDownLatch(2);
  13. // 查询未对账订单
  14. executor.execute(()-> {
  15. pos = getPOrders();
  16. latch.countDown();
  17. });
  18. // 查询派送单
  19. executor.execute(()-> {
  20. dos = getDOrders();
  21. latch.countDown();
  22. });
  23. // 等待两个查询操作结束
  24. latch.await();
  25. // 执行对账操作
  26. diff = check(pos, dos);
  27. // 差异写入差异库
  28. save(diff);
  29. }

用 CyclicBarrier 实现线程同步

image.png
示例代码
补充说明:
1.为啥要用线程池,而不是在回调函数中直接调用?
使用线程池是为了异步操作,否则回掉函数是同步调用的,也就是本次对账操作执行完才能进行下一轮的检查。
2.线程池为啥使用单线程的?
线程数量固定为1,防止了多线程并发导致的数据不一致,因为订单和派送单是两个队列,只有单线程去两个队列中取消息才不会出现消息不匹配的问题。

  1. // 订单队列
  2. Vector<P> pos;
  3. // 派送单队列
  4. Vector<D> dos;
  5. // 执行回调的线程池
  6. Executor executor =
  7. Executors.newFixedThreadPool(1);
  8. // 创建计数器初始值为 2 的 CyclicBarrier, 当计数器减到 0 的时候,会调用回调函数, 且自动重置到初始值2
  9. final CyclicBarrier barrier =
  10. new CyclicBarrier(2, ()->{
  11. executor.execute(()->check());
  12. });
  13. void check(){
  14. P p = pos.remove(0);
  15. D d = dos.remove(0);
  16. // 执行对账操作
  17. diff = check(p, d);
  18. // 差异写入差异库
  19. save(diff);
  20. }
  21. void checkAll(){
  22. // 循环查询订单库
  23. Thread T1 = new Thread(()->{
  24. while(存在未对账订单){
  25. // 查询订单库
  26. pos.add(getPOrders());
  27. // 等待, 计数器-1
  28. barrier.await();
  29. }
  30. });
  31. T1.start();
  32. // 循环查询运单库
  33. Thread T2 = new Thread(()->{
  34. while(存在未对账订单){
  35. // 查询运单库
  36. dos.add(getDOrders());
  37. // 等待,计数器-1
  38. barrier.await();
  39. }
  40. });
  41. T2.start();
  42. }

总结:
CountDownLatch 主要用来解决一个线程等待多个线程的场景,可以类比旅游团团长要等待所有的游客到齐才能去下一个景点;而 CyclicBarrier 是一组线程之间互相等待,更像是几个驴友之间不离不弃。
除此之外 CountDownLatch 的计数器是不能循环利用的,也就是说一旦计数器减到 0,再有线程调用 await(),该线程会直接通过。但 CyclicBarrier 的计数器是可以循环利用的,而且具备自动重置的功能,一旦计数器减到 0 会自动重置到你设置的初始值。除此之外,CyclicBarrier 还可以设置回调函数,可以说是功能丰富。

并发容器:都有哪些“坑”需要我们填?

如何将非线程安全的容器变成线程安全的容器?

只要把非线程安全的容器封装在对象内部,然后控制好访问路径就可以了。

代码示例

  1. SafeArrayList<T>{
  2. //封装ArrayList
  3. List<T> c = new ArrayList<>();
  4. //控制访问路径
  5. synchronized
  6. T get(int idx){
  7. return c.get(idx);
  8. }
  9. synchronized
  10. void add(int idx, T t) {
  11. c.add(idx, t);
  12. }
  13. synchronized
  14. boolean addIfNotExist(T t){
  15. if(!c.contains(t)) {
  16. c.add(t);
  17. return true;
  18. }
  19. return false;
  20. }
  21. }

Java SDK在Collections 这个类中还提供了一套完备的包装类

  1. List list = Collections.
  2. synchronizedList(new ArrayList());
  3. Set set = Collections.
  4. synchronizedSet(new HashSet());
  5. Map map = Collections.
  6. synchronizedMap(new HashMap());

组合操作需要注意竞态条件问题

例如上面提到的 addIfNotExist() 方法就包含组合操作。组合操作往往隐藏着竞态条件问题,即便每个操作都能保证原子性,也并不能保证组合操作的原子性

用迭代器遍历容器

示例代码(存在并发问题,这些组合的操作不具备原子性。)

  1. List list = Collections.
  2. synchronizedList(new ArrayList());
  3. Iterator i = list.iterator();
  4. while (i.hasNext())
  5. foo(i.next());

修正:

  1. List list = Collections.
  2. synchronizedList(new ArrayList());
  3. synchronized (list) {
  4. Iterator i = list.iterator();
  5. while (i.hasNext())
  6. foo(i.next());
  7. }

并发容器及其注意事项

并发工具类(二) - 图2

List

List 里面只有一个实现类就是 CopyOnWriteArrayList。CopyOnWrite,顾名思义就是写的时候会将共享变量新复制一份出来,这样做的好处是读操作完全无锁。
实现原理:
CopyOnWriteArrayList 内部维护了一个数组,成员变量 array 就指向这个内部数组,所有的读操作都是基于 array 进行的,如下图所示,迭代器 Iterator 遍历的就是 array 数组。如果在遍历 array 的同时,还有一个写操作,CopyOnWriteArrayList 会将 array 复制一份,然后在新复制处理的数组上执行增加元素的操作,执行完之后再将 array 指向这个新的数组
并发工具类(二) - 图3
注意:

  • CopyOnWriteArrayList 仅适用于写操作非常少的场景,而且能够容忍读写的短暂不一致。例如上面的例子中,写入的新元素并不能立刻被遍历到。
  • CopyOnWriteArrayList 迭代器是只读的,不支持增删改。因为迭代器遍历的仅仅是一个快照,而对快照进行增删改是没有意义的

Map

ConcurrentHashMap 和 ConcurrentSkipListMap,它们从应用的角度来看,主要区别在于 ConcurrentHashMap 的 key 是无序的,而 ConcurrentSkipListMap 的 key 是有序的。
需要注意的是key和value的非空情况。
并发工具类(二) - 图4

Set

CopyOnWriteArraySet 和 ConcurrentSkipListSet,使用场景可以参考前面讲述的 CopyOnWriteArrayList 和 ConcurrentSkipListMap,它们的原理都是一样的

Queue

Java 并发包里面 Queue 这类并发容器是最复杂的,你可以从以下两个维度来分类。一个维度是阻塞与非阻塞,所谓阻塞指的是当队列已满时,入队操作阻塞;当队列已空时,出队操作阻塞。另一个维度是单端与双端,单端指的是只能队尾入队,队首出队;而双端指的是队首队尾皆可入队出队。Java 并发包里阻塞队列都用 Blocking 关键字标识,单端队列使用 Queue 标识,双端队列使用 Deque 标识。

这两个维度组合后,可以将 Queue 细分为四大类,分别是:
1.单端阻塞队列:其实现有 ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue、LinkedTransferQueue、PriorityBlockingQueue 和 DelayQueue。内部一般会持有一个队列,这个队列可以是数组(其实现是 ArrayBlockingQueue)也可以是链表(其实现是 LinkedBlockingQueue);甚至还可以不持有队列(其实现是 SynchronousQueue),此时生产者线程的入队操作必须等待消费者线程的出队操作。而 LinkedTransferQueue 融合 LinkedBlockingQueue 和 SynchronousQueue 的功能,性能比 LinkedBlockingQueue 更好;PriorityBlockingQueue 支持按照优先级出队;DelayQueue 支持延时出队。单端阻塞队列示意图
并发工具类(二) - 图5
2.双端阻塞队列:其实现是 LinkedBlockingDeque。双端阻塞队列示意图
并发工具类(二) - 图6
3.单端非阻塞队列:其实现是 ConcurrentLinkedQueue。

4.双端非阻塞队列:其实现是 ConcurrentLinkedDeque。

另外,使用队列时,需要格外注意队列是否支持有界(所谓有界指的是内部的队列是否有容量限制)。实际工作中,一般都不建议使用无界的队列,因为数据量大了之后很容易导致 OOM。上面我们提到的这些 Queue 中,只有 ArrayBlockingQueue 和 LinkedBlockingQueue 是支持有界的,所以在使用其他无界队列时,一定要充分考虑是否存在导致 OOM 的隐患。