Java锁

  • 自旋锁:为了不放弃CPU执行事件,循环(使用CAS技术)尝试获取锁,直至成功。优点是没有阻塞,缺点是占用cpu。

  • 悲观锁:假定会发生并发冲突,同步所有对数据的相关操作,从读数据就开始加锁。

  • 乐观锁:假定没有冲突,在修改数据时如果发现数据和之前获取的不一致,则读最新的数据,修改后重试修改。(实现方式:版本号机制、CAS算法)

  • 独享锁(写):给资源加上写锁,线程可以修改资源,其他线程不能再加锁;(单写)

  • 共享锁(读):给资源加上读锁后只能读不能改,其他线程也只能加读锁,不能加写锁;(多读)

  • 可重入锁、不可重入锁:线程拿到一把锁后,是否可以自由进入同一把锁所同步的其他代码。

  • 公平锁、非公平锁:争抢锁的顺序,是否按照先来后到的顺序。

锁的特性:可重入、独享、乐观锁
锁的范围:类锁、对象锁、锁消除、锁粗化

  • 锁消除:在同步控制中,当JVM检测到不可能存在共享数据竞争,这时就会对这些同步锁进行锁消除。
  • 锁粗化:将多个连续的加锁、解锁操作连接在一起,扩展成一个范围更大的锁。

默认情况下JVM锁会经历以下状态:
无锁 -> 偏向锁 -> 轻量级锁 -> 重量级锁

Lock

Lock接口的常用方法:

方法 描述
lock 获取锁的方法,若锁被其他线程获取,则等待(阻塞)
lockInterruptibly 在锁的获取过程中,可以中断线程
tryLock 尝试非阻塞的获取锁,立即返回
unlock 释放锁
  • ReentrantLock:独享锁;可重入锁;支持公平、非公平两种模式;

    1. public class ReentrantDemo1 {
    2. private static final ReentrantLock lock = new ReentrantLock();
    3. public static void main(String[] args) {
    4. lock.lock(); // block until condition holds
    5. try {
    6. System.out.println("第一次获取锁");
    7. System.out.println("当前线程获取锁的次数" + lock.getHoldCount());
    8. lock.lock();
    9. System.out.println("第二次获取锁了");
    10. System.out.println("当前线程获取锁的次数" + lock.getHoldCount());
    11. } finally {
    12. lock.unlock();
    13. }
    14. System.out.println("当前线程获取锁的次数" + lock.getHoldCount());
    15. // 如果不释放,此时其他线程是拿不到锁的
    16. new Thread(() -> {
    17. System.out.println(Thread.currentThread() + " 期望抢到锁");
    18. lock.lock();
    19. System.out.println(Thread.currentThread() + " 线程拿到了锁");
    20. }).start();
    21. }
    22. }
  • ReentrantReadWriteLock:读写锁

维护一对关联锁,一个用于读操作,一个用于写操作;读锁可以有多个读线程持有,写锁是排他的。
适合读取线程比写入线程多的场景,改进互斥锁的性能,实例场景:缓存组件、集合的并发线程安全性改造。

  1. // 读写锁(既保证了读数据的效率,也保证数据的一致性)
  2. public class ReentrantReadWriteLockDemo2 {
  3. ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
  4. public static void main(String[] args) {
  5. final ReentrantReadWriteLockDemo2 readWriteLockDemo2 = new ReentrantReadWriteLockDemo2();
  6. // 多线程同时读/写
  7. new Thread(() -> {
  8. readWriteLockDemo2.read(Thread.currentThread());
  9. }).start();
  10. new Thread(() -> {
  11. readWriteLockDemo2.read(Thread.currentThread());
  12. }).start();
  13. new Thread(() -> {
  14. readWriteLockDemo2.write(Thread.currentThread());
  15. }).start();
  16. }
  17. // 多线程读,共享锁
  18. public void read(Thread thread) {
  19. readWriteLock.readLock().lock();
  20. try {
  21. long start = System.currentTimeMillis();
  22. while (System.currentTimeMillis() - start <= 1) {
  23. System.out.println(thread.getName() + "正在进行“读”操作");
  24. }
  25. System.out.println(thread.getName() + "“读”操作完毕");
  26. } finally {
  27. readWriteLock.readLock().unlock();
  28. }
  29. }
  30. // 多线程写
  31. public void write(Thread thread) {
  32. readWriteLock.writeLock().lock();
  33. try {
  34. long start = System.currentTimeMillis();
  35. while (System.currentTimeMillis() - start <= 1) {
  36. System.out.println(thread.getName() + "正在进行“写”操作");
  37. }
  38. System.out.println(thread.getName() + "“写”操作完毕");
  39. } finally {
  40. readWriteLock.writeLock().unlock();
  41. }
  42. }
  43. }

锁降级指的是写锁降级为读锁。持有当前拥有的写锁的同时,再获取到读锁,随后释放写锁的过程。
写锁是线程独占的,读锁是共享的,所以写->读是降级。(读->写,是不能实现的)

  1. // 缓存示例
  2. public class CacheDataDemo {
  3. // 创建一个map用于缓存
  4. private Map<String, Object> map = new HashMap<>();
  5. private static ReadWriteLock rwl = new ReentrantReadWriteLock();
  6. public static void main(String[] args) {
  7. // 1 读取缓存里面的数据
  8. // cache.query()
  9. // 2 如果换成没数据,则取数据库里面查询 database.query()
  10. // 3 查询完成之后,数据塞到塞到缓存里面 cache.put(data)
  11. }
  12. public Object get(String id) {
  13. Object value = null;
  14. // 首先开启读锁,从缓存中去取
  15. rwl.readLock().lock();
  16. try {
  17. if (map.get(id) == null) {
  18. // TODO database.query(); 全部查询数据库 ,缓存雪崩
  19. // 必须释放读锁
  20. rwl.readLock().unlock();
  21. // 如果缓存中没有释放读锁,上写锁。如果不加锁,所有请求全部去查询数据库,就崩溃了
  22. rwl.writeLock().lock(); // 所有线程在此处等待 1000 1 999 (在同步代码里面再次检查是否缓存)
  23. try {
  24. // 双重检查,防止已经有线程改变了当前的值,从而出现重复处理的情况
  25. if (map.get(id) == null) {
  26. // TODO value = ...如果缓存没有,就去数据库里面读取
  27. }
  28. rwl.readLock().lock(); // 加读锁降级写锁,这样就不会有其他线程能够改这个值,保证了数据一致性
  29. } finally {
  30. rwl.writeLock().unlock(); // 释放写锁@
  31. }
  32. }
  33. } finally {
  34. rwl.readLock().unlock();
  35. }
  36. return value;
  37. }
  38. }

Condition

synchronized与wait()和notify()/notifyAll()方法相结合可以实现等待通知模式,Lock也可以借助Condition实现。一个Lock对象里面可以创建多个Condition。
Condition是需要配合Lock使用,提供多个等待集合,更精确的控制(底层是park/unpark机制)。Condition单独使用会抛出非法监视器状态异常(IllegalMonitorStateException)。
image.png

  1. // condition 实现队列线程安全。
  2. public class QueueDemo {
  3. final Lock lock = new ReentrantLock();
  4. // 指定条件的等待 - 等待有空位
  5. final Condition notFull = lock.newCondition();
  6. // 指定条件的等待 - 等待不为空
  7. final Condition notEmpty = lock.newCondition();
  8. // 定义数组存储数据
  9. final Object[] items = new Object[100];
  10. int putptr, takeptr, count;
  11. // 写入数据的线程,写入进来
  12. public void put(Object x) throws InterruptedException {
  13. lock.lock();
  14. try {
  15. while (count == items.length) // 数据写满了
  16. notFull.await(); // 写入数据的线程,进入阻塞
  17. items[putptr] = x;
  18. if (++putptr == items.length) putptr = 0;
  19. ++count;
  20. notEmpty.signal(); // 唤醒指定的读取线程
  21. } finally {
  22. lock.unlock();
  23. }
  24. }
  25. // 读取数据的线程,调用take
  26. public Object take() throws InterruptedException {
  27. lock.lock();
  28. try {
  29. while (count == 0)
  30. notEmpty.await(); // 线程阻塞在这里,等待被唤醒
  31. Object x = items[takeptr];
  32. if (++takeptr == items.length) takeptr = 0;
  33. --count;
  34. notFull.signal(); // 通知写入数据的线程,告诉他们取走了数据,继续写入
  35. return x;
  36. } finally {
  37. lock.unlock();
  38. }
  39. }
  40. }

AQS抽象队列同步器

AbstractQueuedSynchronizer的缩写,也叫抽象的队列式同步器。定义了一套多线程访问共享资源的同步器框架。
字如其名,他是一个抽象类,所以大部分同步类都是继承于它,然后重写部分方法即可。
比如说ReentrantLock/Semaphore/CountDownLatch都是AQS的具体实现类。

同步锁的本质 - 排队

  • 同步的方式:独享锁-单个队列窗口,共享锁-多个队列窗口
  • 抢锁的方式:插队抢(不公平锁)、先来后到抢锁(公平锁)
  • 没抢到锁的处理方式:快速尝试多次(CAS自旋锁)、阻塞等待、放弃抢锁
  • 唤醒阻塞线程的方式(叫号器):全部通知、通知下一个

自定义独享锁

/**
 * @author :dukz
 * @date :Created in 2020/5/15 16:15
 * @description:自定义独享锁
 */
public class DukzLock implements Lock {
    // 保存锁的持有者
    volatile AtomicReference<Thread> owner = new AtomicReference<>();
    // 等待队列,保存正在等待的线程
    volatile LinkedBlockingDeque<Thread> waiters = new LinkedBlockingDeque<>();


    /**
    * @Description: 尝试获取锁
    * @Author: root
    * @Date: 2020/5/15
    * @return 返回是否获取成功
    */
    @Override
    public boolean tryLock() {
        // 通过CAS方式获取锁
        return owner.compareAndSet(null, Thread.currentThread());
    }

    @Override
    public void lock() {
        boolean isAdd = false;// 是否加入等待队列
        while(!tryLock()){// 获取锁失败,因park方法可能会伪唤醒,所以用while循环
            if(!isAdd){// 未加入等待队列,则加入
                waiters.offer(Thread.currentThread());
                isAdd = true;
            }else{// 已加入队列,则阻塞当前线程
                LockSupport.park();
            }
        }
        // 获取锁成功,移除等待队列
        waiters.remove(Thread.currentThread());
    }

    @Override
    public void unlock() {
        // 释放owner
        if(owner.compareAndSet(Thread.currentThread(), null)){// 释放锁成功
            // 唤醒等待者
            Iterator<Thread> iterator = waiters.iterator();
            while (iterator.hasNext()){
                Thread thread = iterator.next();
                LockSupport.unpark(thread);// 唤醒
            }
        }
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {

    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    @Override
    public Condition newCondition() {
        return null;
    }
}
/**
 * @author :dukz
 * @date :Created in 2020/5/15 16:33
 * @description:自定义独享锁测试
 */
public class DukzLockDemo {
    volatile int value = 0;

    Lock lock = new DukzLock();

    public void add() {
        lock.lock();
        try {
            // TODO  很多业务操作
            value++;
        }finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        DukzLockDemo dl = new DukzLockDemo();

        for (int i = 0; i < 2; i++) {
            new Thread(() -> {
                for (int j = 0; j < 10000; j++) {
                    dl.add();
                }
            }).start();
        }
        Thread.sleep(2000L);
        System.out.println(dl.value);
    }
}

AQS抽象队列同步器(AbstractQueuedSynchronizer)

  • 提供了对资源占用、释放,线程的等待、唤醒等接口和具体实现;
  • 可以用在各种需要控制资源争用的场景中。(ReentrantLock/CountDownLatch/Semphore)

image.png

  • acquire 、acquireShared : 定义了资源争用的逻辑, 如果没拿到, 则等待。
  • tryAcquire 、tryAcquireShared : 实际执行占用资源的操作, 如何判定一个由使用者具体去实现。
  • release 、reIeaseShared : 定义释放资源的逻辑, 释放之后, 通知后续节点进行争抢。
  • tryRelease 、tryReleaseShared: 实际执行资源释放的操作, 具体的AQS 使用者去实现
  • isHeldExclusively():该线程是否正在独占资源。只有用到 condition 才需要去实现它。
  • state:它维护了一个 volatile int state(代表共享资源)和一个 FIFO 线程等待队列(多线程争用资源被 阻塞时会进入此队列)。这里 volatile 是核心关键词,具体 volatile 的语义,在此不述。state 的 访问方式有三种: getState() ,setState() ,compareAndSetState()

    同步器的实现是AQS的核心

    同步器的实现是 AQS 核心,以 ReentrantLock 为例,state 初始化为 0,表示未锁定状态。A 线程 lock()时,会调用 tryAcquire()独占该锁并将 state+1。此后,其他线程再 tryAcquire()时就会失 败,直到 A 线程 unlock()到 state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放 锁之前,A 线程自己是可以重复获取此锁的(state 会累加),这就是可重入的概念。但要注意, 获取多少次就要释放多么次,这样才能保证 state 是能回到零态的。
    CountDownLatch 以例,任务分为 N 个子线程去执行,state 也初始化为 N(注意 N 要与 线程个数一致)。这 N 个子线程是并行执行的,每个子线程执行完后 countDown()一次,state 会 CAS 减 1。等到所有子线程都执行完后(即 state=0),会 unpark()主调用线程,然后主调用线程 就会从 await()函数返回,继续后余动作。

    模拟AQS的实现

    // 抽象队列同步器
    // state, owner, waiters
    public class DukzAqs {
      // acquire、 acquireShared : 定义了资源争用的逻辑,如果没拿到,则等待。
      // tryAcquire、 tryAcquireShared : 实际执行占用资源的操作,如何判定一个由使用者具体去实现。
      // release、 releaseShared : 定义释放资源的逻辑,释放之后,通知后续节点进行争抢。
      // tryRelease、 tryReleaseShared: 实际执行资源释放的操作,具体的AQS使用者去实现。
    
      // 1、 如何判断一个资源的拥有者
      public volatile AtomicReference<Thread> owner = new AtomicReference<>();
      // 保存 正在等待的线程
      public volatile LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>();
      // 记录资源状态
      public volatile AtomicInteger state = new AtomicInteger(0);
    
      // 共享资源占用的逻辑,返回资源的占用情况
      public int tryAcquireShared(){
          throw new UnsupportedOperationException();
      }
    
      public void acquireShared(){
          boolean addQ = true;
          while(tryAcquireShared() < 0) {
              if (addQ) {
                  // 没拿到锁,加入到等待集合
                  waiters.offer(Thread.currentThread());
                  addQ = false;
              } else {
                  // 阻塞 挂起当前的线程,不要继续往下跑了
                  LockSupport.park(); // 伪唤醒,就是非unpark唤醒的
              }
          }
          waiters.remove(Thread.currentThread()); // 把线程移除
      }
    
      public boolean tryReleaseShared(){
          throw new UnsupportedOperationException();
      }
    
      public void releaseShared(){
          if (tryReleaseShared()) {
              // 通知等待者
              Iterator<Thread> iterator = waiters.iterator();
              while (iterator.hasNext()) {
                  Thread next = iterator.next();
                  LockSupport.unpark(next); // 唤醒
              }
          }
      }
    
      // 独占资源相关的代码
      public boolean tryAcquire() { // 交给使用者去实现。 模板方法设计模式
          throw new UnsupportedOperationException();
      }
    
      public void acquire() {
          boolean addQ = true;
          while (!tryAcquire()) {
              if (addQ) {
                  // 没拿到锁,加入到等待集合
                  waiters.offer(Thread.currentThread());
                  addQ = false;
              } else {
                  // 阻塞 挂起当前的线程,不要继续往下跑了
                  LockSupport.park(); // 伪唤醒,就是非unpark唤醒的
              }
          }
          waiters.remove(Thread.currentThread()); // 把线程移除
      }
    
      public boolean tryRelease() {
          throw new UnsupportedOperationException();
      }
    
      public void release() { // 定义了 释放资源之后要做的操作
          if (tryRelease()) {
              // 通知等待者
              Iterator<Thread> iterator = waiters.iterator();
              while (iterator.hasNext()) {
                  Thread next = iterator.next();
                  LockSupport.unpark(next); // 唤醒
              }
          }
      }
    
      public AtomicInteger getState() {
          return state;
      }
    
      public void setState(AtomicInteger state) {
          this.state = state;
      }
    }
    

    资源占用的流程

    image.png