并发编程的核心问题:
- 互斥: 同一时刻只允许一个线程访问共享资源==>Lock
- 同步: 线程之间的通信以及协作==>Condition
隐藏在并发包中的管程
再造管程的理由
为啥有了synchronized还要lock
针对死锁问题, 有破坏”不可抢占条件”的方案, 即当线程申请不到资源的时候, 可以释放已占有的资源
但synchronized做不到, 如果申请不到, 线程会直接进入阻塞状态, 如果重新设计一把锁解决此问题, 有三种方案:
1.能够响应中断. 当给阻塞的线程发送中断的信号时, 能够唤醒他, 那他就有机会释放占用的资源;
2.支持超时. 若线程一段时间内没获取到锁, 不是进入阻塞状态, 而是返回一个错误, 那这个线程也有机会释放曾经持有的锁;
3.非阻塞的获取锁. 如果尝试获取锁失败, 并不进入阻塞状态.
以上对应lock接口的三个方法:
// 支持中断的API
void lockInterruptibly()
throws InterruptedException;
// 支持超时的API
boolean tryLock(long time, TimeUnit unit)
throws InterruptedException;
// 支持非阻塞获取锁的API
boolean tryLock();
如何保证可见性
synchronized: synchronized 的解锁 Happens-Before 于后续对这个锁的加锁。
Lock: 利用了 volatile 相关的 Happens-Before 规则
Java SDK 里面的 ReentrantLock,内部持有一个 volatile 的成员变量 state,获取锁的时候,会读写 state 的值;解锁的时候,也会读写 state 的值(简化后的代码如下面所示)
class SampleLock {
volatile int state;
// 加锁
lock() {
// 省略代码无数
state = 1;
}
// 解锁
unlock() {
// 省略代码无数
state = 0;
}
}
如下面示例代码:
1.顺序性规则:对于线程 T1,value+=1 Happens-Before 释放锁的操作 unlock();
2.volatile 变量规则:由于 state = 1 会先读取 state,所以线程 T1 的 unlock() 操作 Happens-Before 线程 T2 的 lock() 操作;
3.传递性规则:线程 T1 的 value+=1 Happens-Before 线程 T2 的 lock() 操作。
因此, 线程 T1 对 value 进行了 +=1 操作,后续的线程 T2 能够看到 value 的正确结果
class X {
private final Lock rtl =
new ReentrantLock();
int value;
public void addOne() {
// 获取锁
rtl.lock();
try {
value+=1;
} finally {
// 保证锁能释放
rtl.unlock();
}
}
}
什么是可重入锁
ReentrantLock, 可重入锁, 线程可以重复获取同一把锁
示例代码: 在1处加锁, 在2处再次加锁
class X {
private final Lock rtl =
new ReentrantLock();
int value;
public int get() {
// 获取锁
rtl.lock(); ②
try {
return value;
} finally {
// 保证锁能释放
rtl.unlock();
}
}
public void addOne() {
// 获取锁
rtl.lock();
try {
value = 1 + get(); ①
} finally {
// 保证锁能释放
rtl.unlock();
}
}
}
可重入函数: 多个线程可以同时调用该函数, 每个线程都能得到正确结果; 同时在一个线程内支持线程切换, 无论被切换多少次, 结果都是正确的. 即可重入函数是线程安全的
公平锁与非公平锁
ReentrantLock的两个构造函数, true表示构造一个公平锁
//无参构造函数:默认非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
//根据公平策略参数创建锁
public ReentrantLock(boolean fair){
sync = fair ? new FairSync()
: new NonfairSync();
}
管程中谈到入口等待队列,锁都对应着一个等待队列,如果一个线程没有获得锁,就会进入等待队列,当有线程释放锁的时候,就需要从等待队列中唤醒一个等待的线程。
如果是公平锁,唤醒的策略就是谁等待的时间长,就唤醒谁,很公平;如果是非公平锁,则不提供这个公平保证,有可能等待时间短的线程反而先被唤醒。
用锁的最佳实践
出自Doug Lea《Java 并发编程:设计原则与模式》一书:
- 永远只在更新对象的成员变量时加锁
- 永远只在访问可变的成员变量时加锁
- 永远不在调用其他对象的方法时加锁
示例代码: 存在活锁, A,B两账户相互转账,各自持有自己lock的锁,都一直在尝试获取对方的锁,形成了活锁。这个例子可以稍微改下,成功转账后应该跳出循环。加个随机重试时间避免活锁
class Account {
private int balance;
private final Lock lock
= new ReentrantLock();
// 转账
void transfer(Account tar, int amt){
while (true) {
if(this.lock.tryLock()) {
try {
if (tar.lock.tryLock()) {
try {
this.balance -= amt;
tar.balance += amt;
} finally {
tar.lock.unlock();
}
}//if
} finally {
this.lock.unlock();
}
}//if
}//while
}//transfer
}
Dubbo如何用管程实现异步转同步
Condition实现了管程模型里的条件变量
Java 语言内置的管程里只有一个条件变量,而 Lock&Condition 实现的管程是支持多个条件变量的
同步与异步
调用方是否需要等待结果,如果需要等待结果,就是同步;如果不需要等待结果,就是异步。
同步是Java代码默认的处理方式, 让程序支持异步:
1.调用方创建一个子线程, 在子线程中执行方法调用, 这种调用称为异步调用;
2.方法实现的时候, 创建一个新的线程执行主要逻辑, 主线程直接return, 这种方法我们一般称为异步方法.
利用两个条件变量快速实现阻塞队列:**
public class BlockedQueue<T>{
final Lock lock =
new ReentrantLock();
// 条件变量:队列不满
final Condition notFull =
lock.newCondition();
// 条件变量:队列不空
final Condition notEmpty =
lock.newCondition();
// 入队
void enq(T x) {
lock.lock();
try {
while (队列已满){
// 等待队列不满
notFull.await();
}
// 省略入队操作...
//入队后,通知可出队
notEmpty.signal();
}finally {
lock.unlock();
}
}
// 出队
void deq(){
lock.lock();
try {
while (队列已空){
// 等待队列不空
notEmpty.await();
}
// 省略出队操作...
//出队后,通知可入队
notFull.signal();
}finally {
lock.unlock();
}
}
}
Lock 和 Condition 实现的管程,线程等待和通知需要调用 await()、signal()、signalAll(),它们的语义和 wait()、notify()、notifyAll() 是相同的。
但是不一样的是,Lock&Condition 实现的管程里只能使用前面的 await()、signal()、signalAll(),而后面的 wait()、notify()、notifyAll() 只有在 synchronized 实现的管程里才能使用。
Dubbo的异步转同步:
1.通过对调用时等待结果的堆进行快照分析, 与DefaultFuture.get()有关
2.分析get()方法
public class DubboInvoker{
Result doInvoke(Invocation inv){
// 下面这行就是源码中108行
// 为了便于展示,做了修改
return currentClient
.request(inv, timeout)
.get();
}
}
// 创建锁与条件变量
private final Lock lock
= new ReentrantLock();
private final Condition done
= lock.newCondition();
// 调用方通过该方法等待结果
Object get(int timeout){
long start = System.nanoTime();
lock.lock();
try {
while (!isDone()) {
done.await(timeout);
long cur=System.nanoTime();
if (isDone() ||
cur-start > timeout){
break;
}
}
} finally {
lock.unlock();
}
if (!isDone()) {
throw new TimeoutException();
}
return returnFromResponse();
}
// RPC结果是否已经返回
boolean isDone() {
return response != null;
}
// RPC结果返回时调用该方法
private void doReceived(Response res) {
lock.lock();
try {
response = res;
if (done != null) {
done.signalAll();
}
} finally {
lock.unlock();
}
}
本质: 当 RPC 返回结果之前,阻塞调用线程,让调用线程等待;当 RPC 返回结果后,唤醒调用线程,让调用线程重新执行。
**
Lock&Condition 实现的管程相对于 synchronized 实现的管程来说更加灵活、功能也更丰富。
Semaphore: 如何快速实现一个限流器
Semaphore, 信号量
描述代码化:
class Semaphore{
// 计数器
int count;
// 等待队列
Queue queue;
// 初始化操作
Semaphore(int c){
this.count=c;
}
//
void down(){
this.count--;
if(this.count<0){
//将当前线程插入等待队列
//阻塞当前线程
}
}
void up(){
this.count++;
if(this.count<=0) {
//移除等待队列中的某个线程T
//唤醒线程T
}
}
}
补充:
信号量模型里面,down()、up() 这两个操作历史上最早称为 P 操作和 V 操作,所以信号量模型也被称为 PV 原语。另外,还有些人喜欢用 semWait() 和 semSignal() 来称呼它们,虽然叫法不同,但是语义都是相同的。在 Java SDK 并发包里,down() 和 up() 对应的则是 acquire() 和 release()。
如何使用信号量
static int count;
//初始化信号量
static final Semaphore s
= new Semaphore(1);
//用信号量保证互斥
static void addOne() {
s.acquire();
try {
count+=1;
} finally {
s.release();
}
}
理解:
当T1和T2两个线程访问addOne()方法时,
1.由于acquire()是一个原子性操作, 假设T1线程将计数器-1, 计数器变为0; T2线程再-1变成-1, T2线程将阻塞, 进入等待队列;
2.T1执行完后release(), 将计数器加1变为0, 即将T2从等待队列中移除, T2将被唤醒接着执行.
快速实现一个限流器
Semaphore 可以允许多个线程访问一个临界区。
需求: 池化资源, 例如线程池, 连接池等, 在同一时刻,一定是允许多个线程同时使用连接池的,当然,每个连接在被释放前,是不允许其他线程使用的.
实现对象池, 一次性创建出 N 个对象,之后所有的线程重复利用这 N 个对象,当然对象在被释放前,也是不允许其他线程使用的。限流,指的是不允许多于 N 个线程同时进入临界区.
信号量的计数器,在上面的例子中,我们设置成了 1,这个 1 表示只允许一个线程进入临界区,但如果我们把计数器的值设置成对象池里对象的个数 N,就能完美解决对象池的限流问题了。
class ObjPool<T, R> {
final List<T> pool;
// 用信号量实现限流器
final Semaphore sem;
// 构造函数
ObjPool(int size, T t){
pool = new Vector<T>(){};
for(int i=0; i<size; i++){
pool.add(t);
}
sem = new Semaphore(size);
}
// 利用对象池的对象,调用func
R exec(Function<T,R> func) {
T t = null;
sem.acquire();
try {
t = pool.remove(0);
return func.apply(t);
} finally {
pool.add(t);
sem.release();
}
}
}
// 创建对象池
ObjPool<Long, String> pool =
new ObjPool<Long, String>(10, 2);
// 通过对象池获取t,之后执行
pool.exec(t -> {
System.out.println(t);
return t.toString();
});
ReadWriteLock:如何快速实现一个完备的缓存?
什么是读写锁?
读写锁,并不是 Java 语言特有的,而是一个广为使用的通用技术,所有的读写锁都遵守以下三条基本原则:
- 允许多个线程同时读共享变量;
- 只允许一个线程写共享变量;
- 如果一个写线程正在执行写操作,此时禁止读线程读共享变量。
快速实现一个缓存
用 ReadWriteLock 快速实现一个通用的缓存工具类。
class Cache<K,V> {
final Map<K, V> m =
new HashMap<>();
final ReadWriteLock rwl =
new ReentrantReadWriteLock();
// 读锁
final Lock r = rwl.readLock();
// 写锁
final Lock w = rwl.writeLock();
// 读缓存
V get(K key) {
r.lock();
try { return m.get(key); }
finally { r.unlock(); }
}
// 写缓存
V put(K key, V value) {
w.lock();
try { return m.put(key, v); }
finally { w.unlock(); }
}
}
实现缓存的按需加载
class Cache<K,V> {
final Map<K, V> m =
new HashMap<>();
final ReadWriteLock rwl =
new ReentrantReadWriteLock();
final Lock r = rwl.readLock();
final Lock w = rwl.writeLock();
V get(K key) {
V v = null;
//读缓存
r.lock(); ①
try {
v = m.get(key); ②
} finally{
r.unlock(); ③
}
//缓存中存在,返回
if(v != null) { ④
return v;
}
//缓存中不存在,查询数据库
w.lock(); ⑤
try {
//再次验证
//其他线程可能已经查询过数据库
v = m.get(key); ⑥
if(v == null){ ⑦
//查询数据库
v=省略代码无数
m.put(key, v);
}
} finally{
w.unlock();
}
return v;
}
}
读写锁的升级与降级
如下面代码, ①处获取读锁,在②处如果缓存不存在则升级为写锁, 更新缓存后再释放写锁, 最后在③处释放读锁
//读缓存
r.lock(); ①
try {
v = m.get(key); ②
if (v == null) {
w.lock();
try {
//再次验证并更新缓存
//省略详细代码
} finally{
w.unlock();
}
}
} finally{
r.unlock(); ③
}
这称为锁的升级。可惜 ReadWriteLock 并不支持这种升级。在上面的代码示例中,读锁还没有释放,此时获取写锁,会导致写锁永久等待,最终导致相关线程都被阻塞,永远也没有机会被唤醒。锁的升级是不允许的,这个一定要注意。
但是允许锁的降级, 以下代码来源自 ReentrantReadWriteLock 的官方示例,略做了改动。
class CachedData {
Object data;
volatile boolean cacheValid;
final ReadWriteLock rwl =
new ReentrantReadWriteLock();
// 读锁
final Lock r = rwl.readLock();
//写锁
final Lock w = rwl.writeLock();
void processCachedData() {
// 获取读锁
r.lock();
if (!cacheValid) {
// 释放读锁,因为不允许读锁的升级
r.unlock();
// 获取写锁
w.lock();
try {
// 再次检查状态
if (!cacheValid) {
data = ...
cacheValid = true;
}
// 释放写锁前,降级为读锁
// 降级是可以的
r.lock(); ①
} finally {
// 释放写锁
w.unlock();
}
}
// 此处仍然持有读锁
try {use(data);}
finally {r.unlock();}
}
}
补充: 写锁支持条件变量,读锁是不支持条件变量的,读锁调用 newCondition() 会抛出 UnsupportedOperationException 异常。
如何解决缓存数据与源头数据的同步问题?
- 超时机制, 为缓存中的数据设置生存时间
- 源头数据发生变化时反馈给缓存
- 操作数据时, 同时写数据库和缓存