Java锁
自旋锁:为了不放弃CPU执行事件,循环(使用CAS技术)尝试获取锁,直至成功。优点是没有阻塞,缺点是占用cpu。
悲观锁:假定会发生并发冲突,同步所有对数据的相关操作,从读数据就开始加锁。
乐观锁:假定没有冲突,在修改数据时如果发现数据和之前获取的不一致,则读最新的数据,修改后重试修改。(实现方式:版本号机制、CAS算法)
独享锁(写):给资源加上写锁,线程可以修改资源,其他线程不能再加锁;(单写)
共享锁(读):给资源加上读锁后只能读不能改,其他线程也只能加读锁,不能加写锁;(多读)
可重入锁、不可重入锁:线程拿到一把锁后,是否可以自由进入同一把锁所同步的其他代码。
- 公平锁、非公平锁:争抢锁的顺序,是否按照先来后到的顺序。
锁的特性:可重入、独享、乐观锁
锁的范围:类锁、对象锁、锁消除、锁粗化
- 锁消除:在同步控制中,当JVM检测到不可能存在共享数据竞争,这时就会对这些同步锁进行锁消除。
- 锁粗化:将多个连续的加锁、解锁操作连接在一起,扩展成一个范围更大的锁。
默认情况下JVM锁会经历以下状态:
无锁 -> 偏向锁 -> 轻量级锁 -> 重量级锁
Lock
Lock接口的常用方法:
方法 | 描述 |
---|---|
lock | 获取锁的方法,若锁被其他线程获取,则等待(阻塞) |
lockInterruptibly | 在锁的获取过程中,可以中断线程 |
tryLock | 尝试非阻塞的获取锁,立即返回 |
unlock | 释放锁 |
ReentrantLock:独享锁;可重入锁;支持公平、非公平两种模式;
public class ReentrantDemo1 {
private static final ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) {
lock.lock(); // block until condition holds
try {
System.out.println("第一次获取锁");
System.out.println("当前线程获取锁的次数" + lock.getHoldCount());
lock.lock();
System.out.println("第二次获取锁了");
System.out.println("当前线程获取锁的次数" + lock.getHoldCount());
} finally {
lock.unlock();
}
System.out.println("当前线程获取锁的次数" + lock.getHoldCount());
// 如果不释放,此时其他线程是拿不到锁的
new Thread(() -> {
System.out.println(Thread.currentThread() + " 期望抢到锁");
lock.lock();
System.out.println(Thread.currentThread() + " 线程拿到了锁");
}).start();
}
}
ReentrantReadWriteLock:读写锁
维护一对关联锁,一个用于读操作,一个用于写操作;读锁可以有多个读线程持有,写锁是排他的。
适合读取线程比写入线程多的场景,改进互斥锁的性能,实例场景:缓存组件、集合的并发线程安全性改造。
// 读写锁(既保证了读数据的效率,也保证数据的一致性)
public class ReentrantReadWriteLockDemo2 {
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
public static void main(String[] args) {
final ReentrantReadWriteLockDemo2 readWriteLockDemo2 = new ReentrantReadWriteLockDemo2();
// 多线程同时读/写
new Thread(() -> {
readWriteLockDemo2.read(Thread.currentThread());
}).start();
new Thread(() -> {
readWriteLockDemo2.read(Thread.currentThread());
}).start();
new Thread(() -> {
readWriteLockDemo2.write(Thread.currentThread());
}).start();
}
// 多线程读,共享锁
public void read(Thread thread) {
readWriteLock.readLock().lock();
try {
long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start <= 1) {
System.out.println(thread.getName() + "正在进行“读”操作");
}
System.out.println(thread.getName() + "“读”操作完毕");
} finally {
readWriteLock.readLock().unlock();
}
}
// 多线程写
public void write(Thread thread) {
readWriteLock.writeLock().lock();
try {
long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start <= 1) {
System.out.println(thread.getName() + "正在进行“写”操作");
}
System.out.println(thread.getName() + "“写”操作完毕");
} finally {
readWriteLock.writeLock().unlock();
}
}
}
锁降级指的是写锁降级为读锁。持有当前拥有的写锁的同时,再获取到读锁,随后释放写锁的过程。
写锁是线程独占的,读锁是共享的,所以写->读是降级。(读->写,是不能实现的)
// 缓存示例
public class CacheDataDemo {
// 创建一个map用于缓存
private Map<String, Object> map = new HashMap<>();
private static ReadWriteLock rwl = new ReentrantReadWriteLock();
public static void main(String[] args) {
// 1 读取缓存里面的数据
// cache.query()
// 2 如果换成没数据,则取数据库里面查询 database.query()
// 3 查询完成之后,数据塞到塞到缓存里面 cache.put(data)
}
public Object get(String id) {
Object value = null;
// 首先开启读锁,从缓存中去取
rwl.readLock().lock();
try {
if (map.get(id) == null) {
// TODO database.query(); 全部查询数据库 ,缓存雪崩
// 必须释放读锁
rwl.readLock().unlock();
// 如果缓存中没有释放读锁,上写锁。如果不加锁,所有请求全部去查询数据库,就崩溃了
rwl.writeLock().lock(); // 所有线程在此处等待 1000 1 999 (在同步代码里面再次检查是否缓存)
try {
// 双重检查,防止已经有线程改变了当前的值,从而出现重复处理的情况
if (map.get(id) == null) {
// TODO value = ...如果缓存没有,就去数据库里面读取
}
rwl.readLock().lock(); // 加读锁降级写锁,这样就不会有其他线程能够改这个值,保证了数据一致性
} finally {
rwl.writeLock().unlock(); // 释放写锁@
}
}
} finally {
rwl.readLock().unlock();
}
return value;
}
}
Condition
synchronized与wait()和notify()/notifyAll()方法相结合可以实现等待通知模式,Lock也可以借助Condition实现。一个Lock对象里面可以创建多个Condition。
Condition是需要配合Lock使用,提供多个等待集合,更精确的控制(底层是park/unpark机制)。Condition单独使用会抛出非法监视器状态异常(IllegalMonitorStateException)。
// condition 实现队列线程安全。
public class QueueDemo {
final Lock lock = new ReentrantLock();
// 指定条件的等待 - 等待有空位
final Condition notFull = lock.newCondition();
// 指定条件的等待 - 等待不为空
final Condition notEmpty = lock.newCondition();
// 定义数组存储数据
final Object[] items = new Object[100];
int putptr, takeptr, count;
// 写入数据的线程,写入进来
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length) // 数据写满了
notFull.await(); // 写入数据的线程,进入阻塞
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal(); // 唤醒指定的读取线程
} finally {
lock.unlock();
}
}
// 读取数据的线程,调用take
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await(); // 线程阻塞在这里,等待被唤醒
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal(); // 通知写入数据的线程,告诉他们取走了数据,继续写入
return x;
} finally {
lock.unlock();
}
}
}
AQS抽象队列同步器
AbstractQueuedSynchronizer的缩写,也叫抽象的队列式同步器。定义了一套多线程访问共享资源的同步器框架。
字如其名,他是一个抽象类,所以大部分同步类都是继承于它,然后重写部分方法即可。
比如说ReentrantLock/Semaphore/CountDownLatch都是AQS的具体实现类。
同步锁的本质 - 排队
- 同步的方式:独享锁-单个队列窗口,共享锁-多个队列窗口
- 抢锁的方式:插队抢(不公平锁)、先来后到抢锁(公平锁)
- 没抢到锁的处理方式:快速尝试多次(CAS自旋锁)、阻塞等待、放弃抢锁
- 唤醒阻塞线程的方式(叫号器):全部通知、通知下一个
自定义独享锁
/**
* @author :dukz
* @date :Created in 2020/5/15 16:15
* @description:自定义独享锁
*/
public class DukzLock implements Lock {
// 保存锁的持有者
volatile AtomicReference<Thread> owner = new AtomicReference<>();
// 等待队列,保存正在等待的线程
volatile LinkedBlockingDeque<Thread> waiters = new LinkedBlockingDeque<>();
/**
* @Description: 尝试获取锁
* @Author: root
* @Date: 2020/5/15
* @return 返回是否获取成功
*/
@Override
public boolean tryLock() {
// 通过CAS方式获取锁
return owner.compareAndSet(null, Thread.currentThread());
}
@Override
public void lock() {
boolean isAdd = false;// 是否加入等待队列
while(!tryLock()){// 获取锁失败,因park方法可能会伪唤醒,所以用while循环
if(!isAdd){// 未加入等待队列,则加入
waiters.offer(Thread.currentThread());
isAdd = true;
}else{// 已加入队列,则阻塞当前线程
LockSupport.park();
}
}
// 获取锁成功,移除等待队列
waiters.remove(Thread.currentThread());
}
@Override
public void unlock() {
// 释放owner
if(owner.compareAndSet(Thread.currentThread(), null)){// 释放锁成功
// 唤醒等待者
Iterator<Thread> iterator = waiters.iterator();
while (iterator.hasNext()){
Thread thread = iterator.next();
LockSupport.unpark(thread);// 唤醒
}
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public Condition newCondition() {
return null;
}
}
/**
* @author :dukz
* @date :Created in 2020/5/15 16:33
* @description:自定义独享锁测试
*/
public class DukzLockDemo {
volatile int value = 0;
Lock lock = new DukzLock();
public void add() {
lock.lock();
try {
// TODO 很多业务操作
value++;
}finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
DukzLockDemo dl = new DukzLockDemo();
for (int i = 0; i < 2; i++) {
new Thread(() -> {
for (int j = 0; j < 10000; j++) {
dl.add();
}
}).start();
}
Thread.sleep(2000L);
System.out.println(dl.value);
}
}
AQS抽象队列同步器(AbstractQueuedSynchronizer)
- 提供了对资源占用、释放,线程的等待、唤醒等接口和具体实现;
- 可以用在各种需要控制资源争用的场景中。(ReentrantLock/CountDownLatch/Semphore)
- acquire 、acquireShared : 定义了资源争用的逻辑, 如果没拿到, 则等待。
- tryAcquire 、tryAcquireShared : 实际执行占用资源的操作, 如何判定一个由使用者具体去实现。
- release 、reIeaseShared : 定义释放资源的逻辑, 释放之后, 通知后续节点进行争抢。
- tryRelease 、tryReleaseShared: 实际执行资源释放的操作, 具体的AQS 使用者去实现。
- isHeldExclusively():该线程是否正在独占资源。只有用到 condition 才需要去实现它。
state:它维护了一个 volatile int state(代表共享资源)和一个 FIFO 线程等待队列(多线程争用资源被 阻塞时会进入此队列)。这里 volatile 是核心关键词,具体 volatile 的语义,在此不述。state 的 访问方式有三种: getState() ,setState() ,compareAndSetState()
同步器的实现是AQS的核心
同步器的实现是 AQS 核心,以 ReentrantLock 为例,state 初始化为 0,表示未锁定状态。A 线程 lock()时,会调用 tryAcquire()独占该锁并将 state+1。此后,其他线程再 tryAcquire()时就会失 败,直到 A 线程 unlock()到 state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放 锁之前,A 线程自己是可以重复获取此锁的(state 会累加),这就是可重入的概念。但要注意, 获取多少次就要释放多么次,这样才能保证 state 是能回到零态的。
以 CountDownLatch 以例,任务分为 N 个子线程去执行,state 也初始化为 N(注意 N 要与 线程个数一致)。这 N 个子线程是并行执行的,每个子线程执行完后 countDown()一次,state 会 CAS 减 1。等到所有子线程都执行完后(即 state=0),会 unpark()主调用线程,然后主调用线程 就会从 await()函数返回,继续后余动作。模拟AQS的实现
// 抽象队列同步器 // state, owner, waiters public class DukzAqs { // acquire、 acquireShared : 定义了资源争用的逻辑,如果没拿到,则等待。 // tryAcquire、 tryAcquireShared : 实际执行占用资源的操作,如何判定一个由使用者具体去实现。 // release、 releaseShared : 定义释放资源的逻辑,释放之后,通知后续节点进行争抢。 // tryRelease、 tryReleaseShared: 实际执行资源释放的操作,具体的AQS使用者去实现。 // 1、 如何判断一个资源的拥有者 public volatile AtomicReference<Thread> owner = new AtomicReference<>(); // 保存 正在等待的线程 public volatile LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>(); // 记录资源状态 public volatile AtomicInteger state = new AtomicInteger(0); // 共享资源占用的逻辑,返回资源的占用情况 public int tryAcquireShared(){ throw new UnsupportedOperationException(); } public void acquireShared(){ boolean addQ = true; while(tryAcquireShared() < 0) { if (addQ) { // 没拿到锁,加入到等待集合 waiters.offer(Thread.currentThread()); addQ = false; } else { // 阻塞 挂起当前的线程,不要继续往下跑了 LockSupport.park(); // 伪唤醒,就是非unpark唤醒的 } } waiters.remove(Thread.currentThread()); // 把线程移除 } public boolean tryReleaseShared(){ throw new UnsupportedOperationException(); } public void releaseShared(){ if (tryReleaseShared()) { // 通知等待者 Iterator<Thread> iterator = waiters.iterator(); while (iterator.hasNext()) { Thread next = iterator.next(); LockSupport.unpark(next); // 唤醒 } } } // 独占资源相关的代码 public boolean tryAcquire() { // 交给使用者去实现。 模板方法设计模式 throw new UnsupportedOperationException(); } public void acquire() { boolean addQ = true; while (!tryAcquire()) { if (addQ) { // 没拿到锁,加入到等待集合 waiters.offer(Thread.currentThread()); addQ = false; } else { // 阻塞 挂起当前的线程,不要继续往下跑了 LockSupport.park(); // 伪唤醒,就是非unpark唤醒的 } } waiters.remove(Thread.currentThread()); // 把线程移除 } public boolean tryRelease() { throw new UnsupportedOperationException(); } public void release() { // 定义了 释放资源之后要做的操作 if (tryRelease()) { // 通知等待者 Iterator<Thread> iterator = waiters.iterator(); while (iterator.hasNext()) { Thread next = iterator.next(); LockSupport.unpark(next); // 唤醒 } } } public AtomicInteger getState() { return state; } public void setState(AtomicInteger state) { this.state = state; } }
资源占用的流程