并发编程的核心问题:

  • 互斥: 同一时刻只允许一个线程访问共享资源==>Lock
  • 同步: 线程之间的通信以及协作==>Condition

隐藏在并发包中的管程

再造管程的理由

为啥有了synchronized还要lock
针对死锁问题, 有破坏”不可抢占条件”的方案, 即当线程申请不到资源的时候, 可以释放已占有的资源
但synchronized做不到, 如果申请不到, 线程会直接进入阻塞状态, 如果重新设计一把锁解决此问题, 有三种方案:
1.能够响应中断. 当给阻塞的线程发送中断的信号时, 能够唤醒他, 那他就有机会释放占用的资源;
2.支持超时. 若线程一段时间内没获取到锁, 不是进入阻塞状态, 而是返回一个错误, 那这个线程也有机会释放曾经持有的锁;
3.非阻塞的获取锁. 如果尝试获取锁失败, 并不进入阻塞状态.
以上对应lock接口的三个方法:

  1. // 支持中断的API
  2. void lockInterruptibly()
  3. throws InterruptedException;
  4. // 支持超时的API
  5. boolean tryLock(long time, TimeUnit unit)
  6. throws InterruptedException;
  7. // 支持非阻塞获取锁的API
  8. boolean tryLock();

如何保证可见性

synchronized: synchronized 的解锁 Happens-Before 于后续对这个锁的加锁。
Lock: 利用了 volatile 相关的 Happens-Before 规则
Java SDK 里面的 ReentrantLock,内部持有一个 volatile 的成员变量 state,获取锁的时候,会读写 state 的值;解锁的时候,也会读写 state 的值(简化后的代码如下面所示)

  1. class SampleLock {
  2. volatile int state;
  3. // 加锁
  4. lock() {
  5. // 省略代码无数
  6. state = 1;
  7. }
  8. // 解锁
  9. unlock() {
  10. // 省略代码无数
  11. state = 0;
  12. }
  13. }

如下面示例代码:
1.顺序性规则:对于线程 T1,value+=1 Happens-Before 释放锁的操作 unlock();
2.volatile 变量规则:由于 state = 1 会先读取 state,所以线程 T1 的 unlock() 操作 Happens-Before 线程 T2 的 lock() 操作;
3.传递性规则:线程 T1 的 value+=1 Happens-Before 线程 T2 的 lock() 操作。
因此, 线程 T1 对 value 进行了 +=1 操作,后续的线程 T2 能够看到 value 的正确结果

  1. class X {
  2. private final Lock rtl =
  3. new ReentrantLock();
  4. int value;
  5. public void addOne() {
  6. // 获取锁
  7. rtl.lock();
  8. try {
  9. value+=1;
  10. } finally {
  11. // 保证锁能释放
  12. rtl.unlock();
  13. }
  14. }
  15. }

什么是可重入锁

ReentrantLock, 可重入锁, 线程可以重复获取同一把锁
示例代码: 在1处加锁, 在2处再次加锁

  1. class X {
  2. private final Lock rtl =
  3. new ReentrantLock();
  4. int value;
  5. public int get() {
  6. // 获取锁
  7. rtl.lock();
  8. try {
  9. return value;
  10. } finally {
  11. // 保证锁能释放
  12. rtl.unlock();
  13. }
  14. }
  15. public void addOne() {
  16. // 获取锁
  17. rtl.lock();
  18. try {
  19. value = 1 + get();
  20. } finally {
  21. // 保证锁能释放
  22. rtl.unlock();
  23. }
  24. }
  25. }

可重入函数: 多个线程可以同时调用该函数, 每个线程都能得到正确结果; 同时在一个线程内支持线程切换, 无论被切换多少次, 结果都是正确的. 即可重入函数是线程安全的

公平锁与非公平锁

ReentrantLock的两个构造函数, true表示构造一个公平锁

  1. //无参构造函数:默认非公平锁
  2. public ReentrantLock() {
  3. sync = new NonfairSync();
  4. }
  5. //根据公平策略参数创建锁
  6. public ReentrantLock(boolean fair){
  7. sync = fair ? new FairSync()
  8. : new NonfairSync();
  9. }

管程中谈到入口等待队列,锁都对应着一个等待队列,如果一个线程没有获得锁,就会进入等待队列,当有线程释放锁的时候,就需要从等待队列中唤醒一个等待的线程。
如果是公平锁,唤醒的策略就是谁等待的时间长,就唤醒谁,很公平;如果是非公平锁,则不提供这个公平保证,有可能等待时间短的线程反而先被唤醒。

用锁的最佳实践

出自Doug Lea《Java 并发编程:设计原则与模式》一书:

  • 永远只在更新对象的成员变量时加锁
  • 永远只在访问可变的成员变量时加锁
  • 永远不在调用其他对象的方法时加锁

示例代码: 存在活锁, A,B两账户相互转账,各自持有自己lock的锁,都一直在尝试获取对方的锁,形成了活锁。这个例子可以稍微改下,成功转账后应该跳出循环。加个随机重试时间避免活锁

  1. class Account {
  2. private int balance;
  3. private final Lock lock
  4. = new ReentrantLock();
  5. // 转账
  6. void transfer(Account tar, int amt){
  7. while (true) {
  8. if(this.lock.tryLock()) {
  9. try {
  10. if (tar.lock.tryLock()) {
  11. try {
  12. this.balance -= amt;
  13. tar.balance += amt;
  14. } finally {
  15. tar.lock.unlock();
  16. }
  17. }//if
  18. } finally {
  19. this.lock.unlock();
  20. }
  21. }//if
  22. }//while
  23. }//transfer
  24. }

Dubbo如何用管程实现异步转同步

Condition实现了管程模型里的条件变量
Java 语言内置的管程里只有一个条件变量,而 Lock&Condition 实现的管程是支持多个条件变量的

同步与异步
调用方是否需要等待结果,如果需要等待结果,就是同步;如果不需要等待结果,就是异步。
同步是Java代码默认的处理方式, 让程序支持异步:
1.调用方创建一个子线程, 在子线程中执行方法调用, 这种调用称为异步调用;
2.方法实现的时候, 创建一个新的线程执行主要逻辑, 主线程直接return, 这种方法我们一般称为异步方法.

利用两个条件变量快速实现阻塞队列:**

  1. public class BlockedQueue<T>{
  2. final Lock lock =
  3. new ReentrantLock();
  4. // 条件变量:队列不满
  5. final Condition notFull =
  6. lock.newCondition();
  7. // 条件变量:队列不空
  8. final Condition notEmpty =
  9. lock.newCondition();
  10. // 入队
  11. void enq(T x) {
  12. lock.lock();
  13. try {
  14. while (队列已满){
  15. // 等待队列不满
  16. notFull.await();
  17. }
  18. // 省略入队操作...
  19. //入队后,通知可出队
  20. notEmpty.signal();
  21. }finally {
  22. lock.unlock();
  23. }
  24. }
  25. // 出队
  26. void deq(){
  27. lock.lock();
  28. try {
  29. while (队列已空){
  30. // 等待队列不空
  31. notEmpty.await();
  32. }
  33. // 省略出队操作...
  34. //出队后,通知可入队
  35. notFull.signal();
  36. }finally {
  37. lock.unlock();
  38. }
  39. }
  40. }

Lock 和 Condition 实现的管程,线程等待和通知需要调用 await()、signal()、signalAll(),它们的语义和 wait()、notify()、notifyAll() 是相同的。
但是不一样的是,Lock&Condition 实现的管程里只能使用前面的 await()、signal()、signalAll(),而后面的 wait()、notify()、notifyAll() 只有在 synchronized 实现的管程里才能使用。

Dubbo的异步转同步:
1.通过对调用时等待结果的堆进行快照分析, 与DefaultFuture.get()有关
并发工具类(一) - 图1
2.分析get()方法

  1. public class DubboInvoker{
  2. Result doInvoke(Invocation inv){
  3. // 下面这行就是源码中108行
  4. // 为了便于展示,做了修改
  5. return currentClient
  6. .request(inv, timeout)
  7. .get();
  8. }
  9. }
  1. // 创建锁与条件变量
  2. private final Lock lock
  3. = new ReentrantLock();
  4. private final Condition done
  5. = lock.newCondition();
  6. // 调用方通过该方法等待结果
  7. Object get(int timeout){
  8. long start = System.nanoTime();
  9. lock.lock();
  10. try {
  11. while (!isDone()) {
  12. done.await(timeout);
  13. long cur=System.nanoTime();
  14. if (isDone() ||
  15. cur-start > timeout){
  16. break;
  17. }
  18. }
  19. } finally {
  20. lock.unlock();
  21. }
  22. if (!isDone()) {
  23. throw new TimeoutException();
  24. }
  25. return returnFromResponse();
  26. }
  27. // RPC结果是否已经返回
  28. boolean isDone() {
  29. return response != null;
  30. }
  31. // RPC结果返回时调用该方法
  32. private void doReceived(Response res) {
  33. lock.lock();
  34. try {
  35. response = res;
  36. if (done != null) {
  37. done.signalAll();
  38. }
  39. } finally {
  40. lock.unlock();
  41. }
  42. }

本质: 当 RPC 返回结果之前,阻塞调用线程,让调用线程等待;当 RPC 返回结果后,唤醒调用线程,让调用线程重新执行。
**
Lock&Condition 实现的管程相对于 synchronized 实现的管程来说更加灵活、功能也更丰富。

Semaphore: 如何快速实现一个限流器

Semaphore, 信号量
image.png
描述代码化:

  1. class Semaphore{
  2. // 计数器
  3. int count;
  4. // 等待队列
  5. Queue queue;
  6. // 初始化操作
  7. Semaphore(int c){
  8. this.count=c;
  9. }
  10. //
  11. void down(){
  12. this.count--;
  13. if(this.count<0){
  14. //将当前线程插入等待队列
  15. //阻塞当前线程
  16. }
  17. }
  18. void up(){
  19. this.count++;
  20. if(this.count<=0) {
  21. //移除等待队列中的某个线程T
  22. //唤醒线程T
  23. }
  24. }
  25. }

补充:
信号量模型里面,down()、up() 这两个操作历史上最早称为 P 操作和 V 操作,所以信号量模型也被称为 PV 原语。另外,还有些人喜欢用 semWait() 和 semSignal() 来称呼它们,虽然叫法不同,但是语义都是相同的。在 Java SDK 并发包里,down() 和 up() 对应的则是 acquire() 和 release()。

如何使用信号量

  1. static int count;
  2. //初始化信号量
  3. static final Semaphore s
  4. = new Semaphore(1);
  5. //用信号量保证互斥
  6. static void addOne() {
  7. s.acquire();
  8. try {
  9. count+=1;
  10. } finally {
  11. s.release();
  12. }
  13. }

理解:
当T1和T2两个线程访问addOne()方法时,
1.由于acquire()是一个原子性操作, 假设T1线程将计数器-1, 计数器变为0; T2线程再-1变成-1, T2线程将阻塞, 进入等待队列;
2.T1执行完后release(), 将计数器加1变为0, 即将T2从等待队列中移除, T2将被唤醒接着执行.

快速实现一个限流器

Semaphore 可以允许多个线程访问一个临界区。

需求: 池化资源, 例如线程池, 连接池等, 在同一时刻,一定是允许多个线程同时使用连接池的,当然,每个连接在被释放前,是不允许其他线程使用的.
实现对象池, 一次性创建出 N 个对象,之后所有的线程重复利用这 N 个对象,当然对象在被释放前,也是不允许其他线程使用的。限流,指的是不允许多于 N 个线程同时进入临界区.
信号量的计数器,在上面的例子中,我们设置成了 1,这个 1 表示只允许一个线程进入临界区,但如果我们把计数器的值设置成对象池里对象的个数 N,就能完美解决对象池的限流问题了。

  1. class ObjPool<T, R> {
  2. final List<T> pool;
  3. // 用信号量实现限流器
  4. final Semaphore sem;
  5. // 构造函数
  6. ObjPool(int size, T t){
  7. pool = new Vector<T>(){};
  8. for(int i=0; i<size; i++){
  9. pool.add(t);
  10. }
  11. sem = new Semaphore(size);
  12. }
  13. // 利用对象池的对象,调用func
  14. R exec(Function<T,R> func) {
  15. T t = null;
  16. sem.acquire();
  17. try {
  18. t = pool.remove(0);
  19. return func.apply(t);
  20. } finally {
  21. pool.add(t);
  22. sem.release();
  23. }
  24. }
  25. }
  26. // 创建对象池
  27. ObjPool<Long, String> pool =
  28. new ObjPool<Long, String>(10, 2);
  29. // 通过对象池获取t,之后执行
  30. pool.exec(t -> {
  31. System.out.println(t);
  32. return t.toString();
  33. });

ReadWriteLock:如何快速实现一个完备的缓存?

什么是读写锁?

读写锁,并不是 Java 语言特有的,而是一个广为使用的通用技术,所有的读写锁都遵守以下三条基本原则:

  • 允许多个线程同时读共享变量;
  • 只允许一个线程写共享变量;
  • 如果一个写线程正在执行写操作,此时禁止读线程读共享变量。


快速实现一个缓存

用 ReadWriteLock 快速实现一个通用的缓存工具类。

  1. class Cache<K,V> {
  2. final Map<K, V> m =
  3. new HashMap<>();
  4. final ReadWriteLock rwl =
  5. new ReentrantReadWriteLock();
  6. // 读锁
  7. final Lock r = rwl.readLock();
  8. // 写锁
  9. final Lock w = rwl.writeLock();
  10. // 读缓存
  11. V get(K key) {
  12. r.lock();
  13. try { return m.get(key); }
  14. finally { r.unlock(); }
  15. }
  16. // 写缓存
  17. V put(K key, V value) {
  18. w.lock();
  19. try { return m.put(key, v); }
  20. finally { w.unlock(); }
  21. }
  22. }

实现缓存的按需加载

  1. class Cache<K,V> {
  2. final Map<K, V> m =
  3. new HashMap<>();
  4. final ReadWriteLock rwl =
  5. new ReentrantReadWriteLock();
  6. final Lock r = rwl.readLock();
  7. final Lock w = rwl.writeLock();
  8. V get(K key) {
  9. V v = null;
  10. //读缓存
  11. r.lock();
  12. try {
  13. v = m.get(key);
  14. } finally{
  15. r.unlock();
  16. }
  17. //缓存中存在,返回
  18. if(v != null) {
  19. return v;
  20. }
  21. //缓存中不存在,查询数据库
  22. w.lock();
  23. try {
  24. //再次验证
  25. //其他线程可能已经查询过数据库
  26. v = m.get(key);
  27. if(v == null){
  28. //查询数据库
  29. v=省略代码无数
  30. m.put(key, v);
  31. }
  32. } finally{
  33. w.unlock();
  34. }
  35. return v;
  36. }
  37. }

读写锁的升级与降级

如下面代码, ①处获取读锁,在②处如果缓存不存在则升级为写锁, 更新缓存后再释放写锁, 最后在③处释放读锁

  1. //读缓存
  2. r.lock();
  3. try {
  4. v = m.get(key);
  5. if (v == null) {
  6. w.lock();
  7. try {
  8. //再次验证并更新缓存
  9. //省略详细代码
  10. } finally{
  11. w.unlock();
  12. }
  13. }
  14. } finally{
  15. r.unlock();
  16. }

这称为锁的升级。可惜 ReadWriteLock 并不支持这种升级。在上面的代码示例中,读锁还没有释放,此时获取写锁,会导致写锁永久等待,最终导致相关线程都被阻塞,永远也没有机会被唤醒。锁的升级是不允许的,这个一定要注意。

但是允许锁的降级, 以下代码来源自 ReentrantReadWriteLock 的官方示例,略做了改动。

  1. class CachedData {
  2. Object data;
  3. volatile boolean cacheValid;
  4. final ReadWriteLock rwl =
  5. new ReentrantReadWriteLock();
  6. // 读锁
  7. final Lock r = rwl.readLock();
  8. //写锁
  9. final Lock w = rwl.writeLock();
  10. void processCachedData() {
  11. // 获取读锁
  12. r.lock();
  13. if (!cacheValid) {
  14. // 释放读锁,因为不允许读锁的升级
  15. r.unlock();
  16. // 获取写锁
  17. w.lock();
  18. try {
  19. // 再次检查状态
  20. if (!cacheValid) {
  21. data = ...
  22. cacheValid = true;
  23. }
  24. // 释放写锁前,降级为读锁
  25. // 降级是可以的
  26. r.lock();
  27. } finally {
  28. // 释放写锁
  29. w.unlock();
  30. }
  31. }
  32. // 此处仍然持有读锁
  33. try {use(data);}
  34. finally {r.unlock();}
  35. }
  36. }

补充: 写锁支持条件变量,读锁是不支持条件变量的,读锁调用 newCondition() 会抛出 UnsupportedOperationException 异常。

如何解决缓存数据与源头数据的同步问题?

  • 超时机制, 为缓存中的数据设置生存时间
  • 源头数据发生变化时反馈给缓存
  • 操作数据时, 同时写数据库和缓存