01 | Lock和Condition
并发编程中两大核心问题:
- 互斥:同一时刻只允许一个线程访问共享资源
- 同步:线程之间如何通信、协作
Java SDK并发包通过Lock和Condition两个接口来实现管程,其中Lock用于解决互斥问题,Condition用于解决同步问题。
Lock
public interface Lock {
/**
* 获取锁
*/
void lock();
/**
* 获取锁,可响应中断
*/
void lockInterruptibly() throws InterruptedException;
/**
* 仅在调用时锁为空闲状态才获取该锁,可以响应中断
* @return true 获取成功, false 获取失败
*/
boolean tryLock();
/**
* 在给定的空闲时间内如果锁空闲并且未被中断则获取锁
* @param time 空闲时间
* @param unit 时间单位
* @return true 获取成功, false 获取失败
*/
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
/**
* 释放锁
*/
void unlock();
/**
* 返回绑定到此 Lock 实例的新 Condition 实例
* @return
*/
Condition newCondition();
}
Condition
condition可以通俗的理解为条件队列。当一个线程在调用了await方法以后,直到线程等待的某个条件为真的时候才会被唤醒。这种方式为线程提供了更加简单的等待/通知模式。Condition必须要配合锁一起使用,因为对共享状态变量的访问发生在多线程环境下。一个Condition的实例必须与一个Lock绑定,因此Condition一般都是作为Lock的内部实现。
public interface Condition {
/**
* 当前线程一直处于等待状态直到收到信号(signalled)或者当前线程被中断
*/
void await() throws InterruptedException;
/**
* await()基础上如超过设定的指定时间之前处于等待状态
*/
boolean await(long time, TimeUnit unit) throws InterruptedException;
/**
* 当前线程一直处于等待状态直到收到信号(signalled)(中断不敏感)
*/
void awaitUninterruptibly();
/**
* await(long time, TimeUnit unit)基础上返回剩余超时时间
*/
long awaitNanos(long nanosTimeout) throws InterruptedException;
/**
* 造成当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。如果没有到指定时间就被通知,则返回true,否则表示到了指定时间,返回返回false
*/
boolean awaitUntil(Date deadline) throws InterruptedException;
/**
* 唤醒一个等待线程。该线程从等待方法返回前必须获得与Condition相关的锁
*/
void signal();
/**
* 唤醒所有等待线程。能够从等待方法返回的线程必须获得与Condition相关的锁。
*/
void signalAll();
}
Lock和Synchronized的区别
- Lock提供了超时机制
- Lock阻塞的线程可以响应中断
- Lock支持非阻塞的获取锁
- Lock+Condition可以支持多个条件
Synchronized为非公平锁,Lock两者都可以(默认为非公平锁)
锁的最佳实践
永远只在更新对象的成员变量时加锁
- 永远只在访问可变的成员变量时加锁
- 永远不在调用其他对象的方法时加锁
02 | Semaphore
信号量模型
信号量模型可以简单概括为:
- 一个计数器
- 一个等待队列
- 三个方法
- init():设置计数器的初始值
- down():计数器值减1;如果此时计数器的值小于0,则当前线程将被阻塞
- up():计数器值加1;如果此时计数器的值小于或等于0,则唤醒等待队列中的一个线程,并将其从等待队列中移除
Java SDK中,信号量模型有java.util.concurrent.Semaphore实现
public class Semaphore {
/**
* 等同于init(),初始化信号量大小
* @param permits
*/
public Semaphore(int permits){}
/**
* 等同于down()
*/
public void acquire();
/**
* 等同于up()
*/
public void release();
}
Semaphore能够保证这三个方法都是原子操作(通过Lock实现)
03 | ReadWriteLock
读写锁的基本原则:
- 允许多个线程同时读共享变量
- 只允许一个线程写共享变量
- 如果一个写线程正在执行写操作,此时禁止读线程读共享变量
- 读写锁不允许升级(读锁升级为写锁),但可以降级(写锁降级为读锁)
锁模式
- 读锁
- 写锁
允许多个线程同时获取读锁,但是只允许一个线程获取写锁,写锁和读锁是互斥的。
ReentrantReadWriteLock实现原理
04 | StampedLock
锁模式
- 写锁
- 悲观读锁
- 乐观读
写锁与悲观读锁语义与ReadWriteLock的写锁和读锁语义相似,不同的是StampedLock中的写锁和悲观读锁加锁成功之后都会返回一个stamp,解锁时,需要传入这个stamp。
StampedLock的性能之所以比ReadWriteLock要好,关键StampedLock支持乐观读。ReadWriteLock支持多个线程同时读,但是当多个线程同时读的时候,所有的写操作会被阻塞;而StampedLock提供的乐观读(实质为乐观读这个操作是无锁的),所有允许一个线程获取写锁,并不会阻塞所有的写操作,但因此可能会导致数据不一起的情况,StampedLock通过提供validate(stamp)可以验证读期间是否存在写操作,如存在可升级为悲观读锁保证数据一致。推荐实现如下:
class Point {
private int x, y;
final StampedLock sl =
new StampedLock();
//计算到原点的距离
int distanceFromOrigin() {
// 乐观读
long stamp =
sl.tryOptimisticRead();
// 读入局部变量,
// 读的过程数据可能被修改
int curX = x, curY = y;
//判断执行读操作期间,
//是否存在写操作,如果存在,
//则sl.validate返回false
if (!sl.validate(stamp)){
// 升级为悲观读锁
stamp = sl.readLock();
try {
curX = x;
curY = y;
} finally {
//释放悲观读锁
sl.unlockRead(stamp);
}
}
return Math.sqrt(
curX * curX + curY * curY);
}
}
StampedLock注意事项
- 不支持重入
- 不支持条件变量
- 不要调用中断操作,如果需要支持中断功能,一定使用可中断的悲观读锁 readLockInterruptibly() 和写锁 writeLockInterruptibly(),否则会导致CPU飙升
05 | CountDownLatch和CyclicBarrier
CountDownLatch
CyclicBarrier
使用场景:一组线程互相等待
06 | 并发容器
并发容器关系图
List
CopyOnWriteArrayList
CopyOnWriteArrayList内部维护了一个数组array,所有的读操作都是基于这个数组array进行的,读操作是完全无锁的,如果在遍历array的同时,同时执行了写操作,如增加元素,CopyOnWriteArrayList会将array复制一份,然后在新复制的数据上执行元素增加的操作,执行完成之后再将array指向这个新的数据。需注意,在执行写操作,复制->增加元素->array指向新的数组这个过程中数据会出现短暂的不一致情况。
Map
ConcurrentHashMap
ConcurrentSkipListMap
ConcurrentHashMap和ConcurrentSkipListMap的主要区别为前者的key是无序的而后者的key是有序的
Set
CopyOnWriteArraySet
ConcurrentSkipListSet
Queue
单端阻塞队列:
- ArrayBlockingQueue
- LinkedBlockingQueue
- SynchronousQueue
- LinkedTransferQueue
- PriorityBlockingQueue
-
双端阻塞队列
-
单端非阻塞队列
-
双端非阻塞队列
ConcurrentLinkedDeque
07 | 无锁原子类
无锁方案原理:CAS
CAS指令包含3个参数:共享变量的内存地址A、用于比较的值B和共享变脸的新值C;并且只有当内存中的地址A处的值等于B时,才能将内存中地址A处的值更新为新值C。作为一条CPU指令,CAS指令本身能够保证原子性;
原子类概览
08 | Executor
线程池使用注意事项(by 阿里巴巴Java开发手册)
- 创建线程或线程池时指定有意义的线程名称,方便出错时回溯
- 线程资源应通过线程池提供
- 线程池不允许使用Executors去创建,使用弊端如下
- newSingleThreadExecutor和newFixedThreadPool:允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM
- newCachedThreadPool:允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。
09 | CompletableFuture:异步编程
Java1.8版本提供了CompletableFuture来支持异步编程,使用Completable无需再手动维护线程,也无需关注分配线程的工作,只需关注业务逻辑相关代码,语义更清晰。
创建
可通过CompletableFuture中的以下4个静态方法创建CompletableFuture对象:
- CompletableFuture
runAsync(Runnable runnable); // 使用默认线程池且异步任务无返回值 - CompletableFuture
runAsync(Runnable runnable,Executor executor); // 指定线程池且异步线程无返回值 - CompletableFuture supplyAsync(Supplier supplier); // 使用默认线程池且异步任务包含返回值
- CompletableFuture supplyAsync(Supplier supplier,Executor executor); // 指定线程池且异步线程包含返回
示例:
CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> {
System.out.println("runAsync with:" + Thread.currentThread().getName());
});
CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> {
String threadName = Thread.currentThread().getName();
System.out.println("supplyAsync with:" + threadName);
return "result with:"+threadName;
});
System.out.println("main thread is Running");
String result = supplyAsync.get();
System.out.println(result);
// result
// main thread is Running
// runAsync with:ForkJoinPool.commonPool-worker-3
// supplyAsync with:ForkJoinPool.commonPool-worker-5
// result with:ForkJoinPool.commonPool-worker-5
CompletionStage
作用:描述任务之间的时序关系
- 串行关系
- CompletionStage thenApply(fn);
- CompletionStage thenApplyAsync(fn);
- CompletionStage thenAccept(consumer);
- CompletionStage thenAcceptAsync(consumer);
- CompletionStage thenRun(action);
- CompletionStage thenRunAsync(action);
- CompletionStage thenCompose(fn);
- CompletionStage thenComposeAsync(fn);
- AND汇聚关系
- CompletionStage thenCombine(other, fn);
- CompletionStage thenCombineAsync(other, fn);
- CompletionStage thenAcceptBoth(other, consumer);
- CompletionStage thenAcceptBothAsync(other, consumer);
- CompletionStage runAfterBoth(other, action);
- CompletionStage runAfterBothAsync(other, action);
- OR汇聚关系
- CompletionStage applyToEither(other, fn);
- CompletionStage applyToEitherAsync(other, fn);
- CompletionStage acceptEither(other, consumer);
- CompletionStage acceptEitherAsync(other, consumer);
- CompletionStage runAfterEither(other, action);
- CompletionStage runAfterEitherAsync(other, action);
- 异常处理
- CompletionStage exceptionally(fn);
- CompletionStage
whenComplete(consumer); - CompletionStage
whenCompleteAsync(consumer); - CompletionStage
handle(fn); - CompletionStage
handleAsync(fn);
注: 方法不以Async结尾,意味着任务相同的线程执行,而Async可能会使用不同的线程执行(如使用相同的线程池,也可能被同一个线程选中执行)。
10 | CompletionService:批量执行异步任务
执行流程:
CompletionService将线程池Executor和阻塞队列BlockingQueue融合一起,能够让批量异步任务的执行结果有序化,能够让批量异步任务管理更加简单
核心方法
Future<V> submit(Callable<V> task);
Future<V> submit(Runnable task, V result);
Future<V> take()throws InterruptedException;
Future<V> poll();
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
并行任务如何选择:
- 简单并行任务:线程池+Future
- 任务之间包含聚合关系:CompletableFuture
- 批量的并行任务或对执行结果有有序化要求:CompletionService