2.1 第一部分-线程安全
2.1.1 线程安全问题
案例:两个线程对初始值为0的静态变量一个做自增,一个做自增,一个做自减,结果是0吗?
@Slf4j(topic = "AddSub")
public class AddSubDemo {
static int counter = 0;
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
for (int i = 0; i < 5000; i++) {
counter++;
}
}, "t1");
Thread t2 = new Thread(() -> {
for (int i = 0; i < 5000; i++) {
counter--;
}
});
t1.start();
t2.start();
t1.join();
t2.join();
log.debug("counter = {}", counter);
}
}
结果是变化的。
对于静态变量i,自增和自减的字节码指令如下:
# i++
getstatic i // 获取静态变量i的值
iconst_1 // 准备常量1
iadd // 自增
putstatic i // 将修改后的值存入静态变量i
# i--
getstatic i // 获取静态变量i的值
iconst_1 // 准备常量1
isub // 自减
putstatic i // 将修改后的值存入静态变量i
Java的内存模型如下,完成静态变量的自增和自减,需要在主存和工作内存中进行数据交换:
如果是单线程顺序执行以上8行字节码指令,不会出现问题:
但是在双线程的情况下,会产生指令重排序,导致各种问题,比如脏读(读取更新未保存的数据):
拓展
i++
、i--
、++i
、--i
等操作都是非原子性的,因为JVM指令可并非一条。可以分析一下字节码指令(javap -v
)。
i++
的代码如下:
int i = 0;
System.out.println(i++);
字节码指令:
0 iconst_0 # 将常量0加载到栈顶
1 istore_1 # 弹出栈顶元素,并赋值给局部变量表中索引为1的位置
2 getstatic #2 <java/lang/System.out> # 获取系统类的静态属性
5 iload_1 # 将局部变量表中索引为1的元素加载到栈顶
6 iinc 1 by 1 # 直接把局部变量表中索引为1的元素值+1
9 invokevirtual #3 <java/io/PrintStream.println> # 调用流的打印方法
顺序:先将元素加载到操作数栈栈顶,然后更新局部变量表的元素的值。
++i
的代码如下:
int i = 0;
System.out.println(++i);
字节码指令:
0 iconst_0 # 将常量0加载到栈顶
1 istore_1 # 弹出栈顶元素,并赋值给局部变量表中索引为1的位置
2 getstatic #2 <java/lang/System.out> # 获取系统类的静态属性
5 iinc 1 by 1 # 直接把局部变量表中索引为1的元素值+1
8 iload_1 # 将局部变量表中索引为1的元素加载到栈顶
9 invokevirtual #3 <java/io/PrintStream.println> # 调用流的打印方法
顺序:先更新局部变量表的元素的值,再将元素加载到操作数栈栈顶。
2.1.2 synchronized解决方案
临界区:一段代码块内如果内存堆共享资源的多线程读写操作,称这段代码块为临界区。
竞态条件:多个线程在临界区内执行,由于代码的执行序列不同而导致结果无法预测,称之为发生了竞态条件。
应用互斥
为了避免临界区的竞态条件发生,有多种手段可以达到目的:
- 阻塞式的解决方案:
synchoronized
,Lock
- 非阻塞式的解决方案:原子变量
AtomicInteger
、AtomicLong
synchronized
语法;
synchronized (对象) {
临界区
}
解决:
@Slf4j(topic = "AddSub")
public class SafeAddSubDemo {
static Integer counter = 0;
static Object lock = new Object();
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
for (int i = 0; i < 5000; i++) {
synchronized (lock) {
counter++;
}
}
}, "t1");
Thread t2 = new Thread(() -> {
for (int i = 0; i < 5000; i++) {
synchronized (lock) {
counter--;
}
}
});
t1.start();
t2.start();
t1.join();
t2.join();
log.debug("counter = {}", counter);
}
}
这样就能保证输出结果为0了。
思考
synchronized
实际是用对象锁保证了临界区代码的原子性,临界区内的代码对外是不可分割的,不会被线程切换所打断。
思考如下问题:
如果把
synchronized(obj)
放在for循环的外面, 如何理解?
for循环也是一个原子操作, 表现出原子性。如果t1
synchronized(obj1)
而 t2synchronized(obj2)
会怎么运行?
因为t1,、t2拿到不是同一把对象锁,,所以他们仍然会发现安全问题, 必须要是同一把对象锁。如果t1
synchronized(obj)
而t2没有加会怎么样 ?
因为t2没有加锁,所以t2不需要获取t1的锁, 直接就可以执行下面的代码, 仍然会出现安全问题。
面向对象改进
@Slf4j(topic = "BetterAddAndSub")
public class BetterAddSubDemo {
public static void main(String[] args) throws InterruptedException {
Room room = new Room();
Thread t1 = new Thread(() -> {
for (int i = 0; i < 5000; i++) {
room.increment();
}
}, "t1");
Thread t2 = new Thread(() -> {
for (int i = 0; i < 5000; i++) {
room.decrement();
}
});
t1.start();
t2.start();
t1.join();
t2.join();
log.debug("counter = {}", room.getCounter());
}
}
class Room {
private int counter = 0;
public void increment() {
synchronized (this) {
counter++;
}
}
public void decrement() {
synchronized (this) {
counter--;
}
}
public int getCounter() {
synchronized (this) {
return counter;
}
}
}
2.1.3 八锁现象
方法上的synchronized
class Test {
public synchronized void test() {
// do something
}
// 等价于(锁住的是实例对象)
public void test() {
synchronized(this) {
// do something
}
}
public synchronized static void test() {
// do something
}
// 等价于(锁住的是类对象)
public static void test() {
synchronized(Test.class) {
// do something
}
}
}
八锁现象
(1)情况一:synchronized
方法锁住的是实例对象,两个方法使用同一个锁,谁先获取锁谁先执行。
结果:AB
@Slf4j(topic = "LockTest")
public class LockDemo1 {
public static void main(String[] args) throws InterruptedException {
Lock lock = new Lock();
new Thread(lock::printA, "A").start();
TimeUnit.SECONDS.sleep(1);
new Thread(lock::printB, "B").start();
}
}
@Slf4j(topic = "Lock")
class Lock {
public synchronized void printA() { log.debug("A => [{}]", System.nanoTime()); }
public synchronized void printB() { log.debug("B => [{}]", System.nanoTime()); }
}
(2)情况二:synchronized
方法锁住的是实例对象,两个方法使用同一个锁,谁先获取锁谁先执行。
结果:AB
@Slf4j(topic = "LockTest1")
public class LockDemo2 {
@SneakyThrows
public static void main(String[] args) {
Lock2 lock = new Lock2();
new Thread(lock::printA, "A").start();
TimeUnit.SECONDS.sleep(1);
new Thread(lock::printB, "B").start();
}
}
@Slf4j(topic = "Lock")
class Lock2 {
@SneakyThrows
public synchronized void printA() {
TimeUnit.SECONDS.sleep(2);
log.debug("A => [{}]", System.nanoTime());
}
public synchronized void printB() {
log.debug("B => [{}]", System.nanoTime());
}
}
(3)情况三:未加synchronized
的方法不需要等待锁。
结果:CAB
@Slf4j(topic = "LockTest3")
public class LockDemo3 {
@SneakyThrows
public static void main(String[] args) {
Lock3 lock = new Lock3();
new Thread(lock::printA, "A").start();
new Thread(lock::printB, "B").start();
new Thread(lock::printC, "C").start();
}
}
@Slf4j(topic = "Lock3")
class Lock3 {
@SneakyThrows
public synchronized void printA() {
TimeUnit.SECONDS.sleep(1); // 会让出CPU执行权
log.debug("A => [{}]", System.nanoTime());
}
public synchronized void printB() {
log.debug("B => [{}]", System.nanoTime());
}
public void printC() {
log.debug("C => [{}]", System.nanoTime());
}
}
(4)情况四:两个线程的方法锁住是各自的实例对象,不存在互斥,并且睡眠会让出CPU执行权。
结果:BA
@Slf4j(topic = "LockTest4")
public class LockDemo4 {
public static void main(String[] args) {
Lock4 lock1 = new Lock4();
Lock4 lock2 = new Lock4();
new Thread(lock1::printA, "A").start();
new Thread(lock2::printB, "B").start();
}
}
@Slf4j(topic = "Lock4")
class Lock4 {
@SneakyThrows
public synchronized void printA() {
TimeUnit.SECONDS.sleep(1);
log.debug("A => [{}]", System.nanoTime());
}
public synchronized void printB() {
log.debug("B => [{}]", System.nanoTime());
}
}
(5)情况5:static synchronized
方法锁住的是类对象,synchronized
方法锁住的是实例对象。两线程锁住的对象不同,不存在互斥。
结果:BA
@Slf4j(topic = "LockTest5")
public class LockDemo5 {
public static void main(String[] args) {
Lock5 lock5 = new Lock5();
new Thread(Lock5::printA, "A").start();
new Thread(lock5::printB, "B").start();
}
}
@Slf4j(topic = "Lock5")
class Lock5 {
@SneakyThrows
public static synchronized void printA() {
TimeUnit.SECONDS.sleep(1);
log.debug("A => [{}]", System.nanoTime());
}
public synchronized void printB() {
log.debug("B => [{}]", System.nanoTime());
}
}
(6)情况六:synchronized
方法锁住的是类对象,两线程使用同一个锁,谁先获取谁先执行。
结果:AB
@Slf4j(topic = "LockTest6")
public class LockDemo6 {
public static void main(String[] args) {
new Thread(Lock6::printA, "A").start();
new Thread(Lock6::printB, "B").start();
}
}
@Slf4j(topic = "Lock6")
class Lock6 {
@SneakyThrows
public static synchronized void printA() {
TimeUnit.SECONDS.sleep(1);
log.debug("A => [{}]", System.nanoTime());
}
public static synchronized void printB() {
log.debug("B => [{}]", System.nanoTime());
}
}
(7)情况七:两个线程使用的一个是类对象,一个是实例对象,不存在互斥。
结果:BA
@Slf4j(topic = "LockTest7")
public class LockDemo7 {
public static void main(String[] args) {
Lock7 lock7 = new Lock7();
new Thread(Lock7::printA, "A").start();
new Thread(lock7::printB, "B").start();
}
}
@Slf4j(topic = "Lock7")
class Lock7 {
@SneakyThrows
public static synchronized void printA() {
TimeUnit.SECONDS.sleep(1);
log.debug("A => [{}]", System.nanoTime());
}
public synchronized void printB() {
log.debug("B => [{}]", System.nanoTime());
}
}
(8)情况八:两个线程的方法锁住的都是类对象,谁先获取谁先执行。
结果:AB
@Slf4j(topic = "LockTest8")
public class LockDemo8 {
public static void main(String[] args) {
new Thread(Lock8::printA, "A").start();
new Thread(Lock8::printB, "B").start();
}
}
@Slf4j(topic = "Lock8")
class Lock8 {
@SneakyThrows
public static synchronized void printA() {
TimeUnit.SECONDS.sleep(1);
log.debug("A => [{}]", System.nanoTime());
}
public static synchronized void printB() {
log.debug("B => [{}]", System.nanoTime());
}
}
2.1.4 变量的线程安全
成员变量和静态变量:
- 如果它们没有共享,则线程安全
- 如果它们被共享了:
- 如果只有读操作,则线程安全
- 如果有读写操作,则这段代码是临界区,需要考虑线程安全
局部变量:
- 局部变量是线程安全的
- 局部变量的引用未必是线程安全的
- 如果该对象没有逃离方法的作用范围,它是线程安全的
- 如果该对象逃离了方法的作用范围,需要考虑线程安全
成员变量案例分析
有如下类:
class UnsafeOperation {
ArrayList<String> list = new ArrayList<>();
public void method1(int loopNumber) {
for (int i = 0; i < loopNumber; i++) {
// { 临界区,会产生竞态条件
method2();
method3();
// } 临界区
}
}
private void method2() { list.add("1"); }
private void method3() { list.remove(0); }
}
执行如下代码则会出现线程安全问题:
@Slf4j(topic = "UnsafeArrayList")
public class UnsafeArrayListDemo {
static final int THREAD_NUMBER = 2;
static final int LOOP_NUMBER = 200;
public static void main(String[] args) {
UnsafeOperation operation = new UnsafeOperation();
for (int i = 0; i < THREAD_NUMBER; i++) {
new Thread(() -> operation.method1(LOOP_NUMBER), "Thread" + (i + 1)).start();
}
}
}
分析:
- 两个线程中的
method2
引用的都是同一个对象中的list成员变量。 method3
和method2
分析相同。
将原类中的成员变量修改为局部变量则不会有线程安全问题:
class SafeOperation {
public void method1(int loopNumber) {
ArrayList<String> list = new ArrayList<>();
for (int i = 0; i < loopNumber; i++) {
method2(list);
method3(list);
}
}
private void method2(ArrayList<String> list) { list.add("1"); }
private void method3(ArrayList<String> list) { list.remove(0); }
}
分析:
list
是局部变量,每个线程调用时会创建其不同实例,没有共享。method2
的参数是从method1
中传递过来的,与method1
中引用同一个对象。method3
的参数分析与method2
相同。
private或final的重要性
在上述的SafeOperation
中,如果把method2
和method3
的方法修改为public
会不会导致线程安全问题?
情况一:有其他线程调用method2
和method3
。
结果:不会导致线程安全问题,因为一个线程调用method1
中的局部变量list
与其他线程传给method2
和method3
的参数不同。
情况二:为SafeOperation
添加子类,子类覆盖method3
方法。
class SubSafeOperation extends SafeOperation{
@Override
public void method3(ArrayList<String> list) {
new Thread(() -> list.remove(0)).start();
}
}
结果:由于method3
的访问权限为public
,此时子类重写父类的方法,子类和父类共享list
对象,在子类中新开线程来操作list
对象,此时就会出现线程安全问题。
所以不想子类重写父类的方法的,需要将父类中的方法设置为private
或者final
,这便是开闭原则中的【闭】。
2.1.5 习题
(1)卖票问题
售票窗口有1000张票,模拟1000个人买票,查看最后剩余票数和售出票数之和是否为1000。
@Slf4j(topic = "ExerciseSell")
public class ExerciseSellDemo {
// Random为线程安全
static Random random = new Random();
// 随机数1~5
public static int randomAmount() {
return random.nextInt(5) + 1;
}
public static void main(String[] args) throws InterruptedException {
// 售票窗口,总共1000张票
TicketWindow window = new TicketWindow(1000);
// 线程集合,模拟多人买票
List<Thread> threadList = new ArrayList<>();
// 卖出的票数统计,Vector线程安全
List<Integer> amountList = new Vector<>();
// 模拟1000个人买票
for (int i = 0; i < 1000; i++) {
Thread thread = new Thread(() -> {
// 买票
int amount = window.sell(randomAmount());
// 睡眠
try {
Thread.sleep(randomAmount());
} catch (InterruptedException e) {
e.printStackTrace();
}
// 统计买票数
amountList.add(amount);
});
threadList.add(thread);
thread.start();
}
for (Thread thread : threadList) {
thread.join();
}
// 查看余票数量和售票数量之和是否为1000
log.debug("余票 => [{}]", window.getCount());
log.debug("售票 => [{}]", amountList.stream().mapToInt(i -> i).sum());
}
}
/**
* 售票窗口
*/
class TicketWindow {
// 票总数
private int count;
public TicketWindow(int count) {
this.count = count;
}
// 获取余票数量
public int getCount() {
return count;
}
// 售票
public int sell(int amount) {
if (this.count >= amount) {
this.count -= amount;
return amount;
} else {
return 0;
}
}
}
为了测出异常结果,使用windows测试脚本:
for /L %n in (1,1,10) do java -cp ".;C:\Java\maven\localRepository\org\projectlombok\lombok\1.18.12\lombok-1.18.12.jar;C:\Java\maven\localRepository\ch\qos\logback\logback-classic\1.1.2\logback-classic-1.1.2.jar;C:\Java\maven\localRepository\ch\qos\logback\logback-core\1.1.2\logback-core-1.1.2.jar;C:\Java\maven\localRepository\org\slf4j\slf4j-api\1.7.6\slf4j-api-1.7.6.jar" top.parak.safe.ExerciseSellDemo
多次测试,终于发现错误结果:
分析原因,就在于临界区TicketWindow
的sell()
方法,多个线程对类的成员变量count
进行写操作,而这个方法并未受到保护,于是出现线程安全问题。只需要在方法头上标识synchronized
即可,锁住实例对象,则线程安全。
(2)转账问题
两个账户余额都为1000,互相转账1000次,查看最后两个账户余额之和是否为2000。
@Slf4j(topic = "ExerciseTransfer")
public class ExerciseTransferDemo {
// Random为线程安全
static Random random = new Random();
// 随机数1~100
public static int randomAmount() {
return random.nextInt(100) + 1;
}
public static void main(String[] args) throws InterruptedException {
// 模拟两个账户
Account k1 = new Account(1000);
Account k2 = new Account(1000);
// 互相转账1000次
Thread t1 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
k1.transfer(k2, randomAmount());
}
}, "t1");
Thread t2 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
k2.transfer(k1, randomAmount());
}
}, "t2");
t1.start();
t2.start();
t1.join();
t2.join();
// 查看转账2000次后的余额
log.debug("k1 => [{}]", k1.getMoney());
log.debug("k2 => [{}]", k2.getMoney());
}
}
/**
* 账户
*/
class Account {
private int money;
public Account(int money) {
this.money = money;
}
public int getMoney() {
return money;
}
public void setMoney(int money) {
this.money = money;
}
public void transfer(Account target, int amount) {
if (this.money >= amount) {
this.setMoney(this.getMoney() - amount);
target.setMoney(target.getMoney() + amount);
}
}
}
运行结果:
2021-04-08 14:47:33.703 [main] DEBUG ExerciseTransfer - k1 => [1]
2021-04-08 14:47:33.706 [main] DEBUG ExerciseTransfer - k2 => [145]
发现k1
和k2
的余额之和不为2000,所以Account
是线程不安全的。
由于转账需要操作自己和对方的money
,所以需要直接锁住两个类的money
,即锁住Account
类。
改进如下:
public void transfer(Account target, int amount) {
synchronized (Account.class) {
if (this.money >= amount) {
this.setMoney(this.getMoney() - amount);
target.setMoney(target.getMoney() + amount);
}
}
}
2.2 第二部分-Monitor
2.2.1 Monitor
Java对象头
普通对象
数组对象
其中Mark Word结构为:
Normal
:一般状态,没有加任何锁,Mark Word前面30位保存的是对象的信息,最后2位为状态(01),倒数第三位表示是否使用偏向锁(未使用:0)Biased
:偏向状态,使用偏向锁,Mark Word前面30位保存当前线程的ID,最后2位为状态(01),倒数第三位表示是否使用偏向锁(使用:1)Lightweight
:使用轻量级锁,Mark Word前30位保存的是锁记录的指针,最后2位为状态(00)Heavyweight
:使用重量级锁,Mark Word前30位保存的是Monitor的地址指针,最后2位状态(10)
Monitor
Monitor被翻译为监视器或管程。
每个Java对象都可以关联一个Monitor对象,如果使用synchronized
给对象上锁(重量级)之后,该对象头的Mark Word中就被设置指向Monitor对象的指针。
Monitor结构如下:
- 刚开始
Monitor
中Owner
为null。 - 当
Thread-2
执行synchronized(obj)
就会将Monitor
的所有者Owner
置为Thread-2
,Monitor
中只能有一个Owner
。 - 当
Thread-2
上锁的过程中,如果Thread-3
、Thread-4
、Thread-5
也来执行synchronized(obj)
,就会进入EntryList
BLOCKED。 Thread-2
执行完同步代码块的内容,然后唤醒EntryList
中等待的线程来竞争锁,竞争的是非公平的。- 图中的
WaitSet
中的Thread-0
、Thread-1
是之前获得过锁,但条件不满足进入WAITING
状态的线程。
注意:
- synchronized必须是进入同一个对象的Monitor才有上述的效果。
- 不加synchronized的对象不会关联监视器,不遵从以上规则。
2.2.2 synchronized原理
Demo:
static final Object lock = new Object();
static int counter = 0;
public static void main(String[] args) {
synchronized (lock) {
counter++;
}
}
字节码如下:
public static void main(java.lang.String[]);
descriptor: ([Ljava/lang/String;)V
flags: ACC_PUBLIC, ACC_STATIC
Code:
stack=2, locals=3, args_size=1
0: getstatic #2 // 从类中获取静态变量lock (synchronized开始)
3: dup // 复制操作数栈的栈顶数值
4: astore_1 // 将复制的引用插入到局部变量表索引为1的位置
5: monitorenter // 进入lock对象的MarkWord的monitor
6: getstatic #3 // 从类中获取静态变量i
9: iconst_1 // 将i放入常量池
10: iadd // 让i自增
11: putstatic #3 // 将类中的静态变量i值设置为新值
14: aload_1 // 导出局部变量表中索引为1的位置的对象,即lock引用
15: monitorexit // 退出lock对象的MarkWord的monitor,唤醒EntryList
16: goto 24 // 直接goto24的return
19: astore_2 // 将异常对象插入到局部变量表索引为2的位置
20: aload_1 // 加载局部变量表中索引为1的位置的对象,即lock引用
21: monitorexit // 退出lock对象的MarkWord的monitor,唤醒EntryList
22: aload_2 // 加载局部变量表中索引为2的位置的对象,即异常
23: athrow // throw Exception
24: return
Exception table:
from to target type
6 16 19 any // 同步代码块6-16,共享一个异常
19 22 19 any // 同步代码块19-22,共享一个异常
使用synchronized
时JVM会根据环境找到对象的monitor
,根据monitor
的状态进行加解锁的判断。如果成功加锁就成为该monitor
的唯一持有者,monitor在被释放前不能再被其他线程获取。
同步代码块使用monitorenter
和monitorexit
这两个字节码指令获取和释放monitor
。这两个字节码指令都需要一个引用类型的参数指明要锁定和解锁的对象,对于普通方法,锁时当前实例对象;对于静态方法,所是当前类的Class对象;对于同步方法块,锁是synchronized
括号中的对象。
不公平的原因:
所有收到锁请求的线程首先自旋,如果通过自旋也没有获取锁将被放入WaitSet,该做法已经对于进入队列的线程不公平。
为了防止WaitSet尾部的元素被大量线程进行CAS访问影响性能,Owner线程会在释放锁时将WaitSet的部分线程移动到EntryList并且指定某个线程为OnDeck线程,该行为叫做竞争切换,牺牲了公平性但提高了性能。
2.2.3 轻量级锁
使用场景:如果一个对象虽然有多线程访问,但多线程访问的时间是错开的(也就是没有竞争),那么可以使用轻量级锁来优化。
轻量级锁对使用者是透明的,即语法仍然是synchronized
。
假设有两个方法同步块,利用同一个对象加锁:
static final Object obj = new Object();
public static vod method1() {
synchronized (obj) {
// 同步块A
method2();
}
}
public static void method2() {
synchronized (obj) {
// 同步块B
}
}
加锁流程:
栈帧中创建锁记录(Lock Record)对象:每个线程的栈帧都会包含一个锁记录的结构,内部可以存储锁定对象的Mark Word
让锁记录中Object reference指向锁对象,并尝试用CAS替换Object的Mark Word,将Mark Word的值存入锁记录
如果CAS替换成功,对象头中存储了锁记录地址和状态,表示由该线程给对象加锁,这时图示如下:
如果CAS替换失败,有两种情况:
- 如果是其他线程已经持有了该Object的轻量级锁,这时表明有竞争,进入锁膨胀过程
- 如果是自己执行了synchronized锁重入,那么再添加一条Lock Record作为重入的计数。在上面的代码中
当退出synchronized代码块(解锁时)如果有取值位null的锁记录,表示有重入,这是重置锁记录,表示重入计数减一
当退出synchronized代码块(解锁时)锁记录的值不为null,这时使用CAS将Mark Word的值恢复给对象头。
- 成功,则解锁成功
- 失败,说明轻量级锁进行了锁膨胀或已经升级位重量级锁,进入重量级锁解锁流程
2.2.4 重量级锁
如果在尝试加轻量级锁的过程中,CAS操作无法成功,这时一种情况就是有其他线程为此对象加上了轻量级锁(有竞争),这是需要进行锁膨胀,将轻量级锁变为重量级锁。
static Object obj = new Object();
public static void method1() {
synchronized (obj) {
// 同步块
}
}
当Thread-1进行轻量级加锁时,Thread-0已经对该对象加了轻量级锁
这时Thread-1加轻量级锁失败,进入锁膨胀流程(因为Thread-1线程加轻量级锁失败,轻量级锁没有阻塞队列的概念,所以此时就要为对象申请Monitor锁(重量级锁),让Object指向重量级锁,然后Thread-1自己进入了Monitor的EntryList变成BLOCKED状态。)
- 即为Object对象申请Monitor锁,让Object指向重量级锁地址
- 然后自己进入Monitor的EntryList BLOCKED
- 当Thread-0退出同步块解锁时,使用CAS将Mark Word的值恢复给对象头,肯定恢复失败,因为对象的对象头中存储的是重量级锁的地址。这时会进入重量级解锁流程,即按照Monitor地址找到Monitor对象,设置Owner为null,唤醒EntryList中BLOCKED线程Thread-1。
自旋锁优化
重量级锁竞争的时候,还可以使用自旋来进行优化,如果当前线程自旋成功(即这时候持锁线程已经退出了同步块,释放了锁),这时当前线程就可以避免阻塞。
同步对性能最大的影响是阻塞,挂起和恢复线程的操作都需要转入内核态完成。需要应用上共享数据的锁定只会持续很短的时间,为了这段时间去挂起和恢复线程并不值得。如果机器有多个处理器核心,我们可以让后面请求锁的线程稍等一会,但不放弃处理器的执行时间,看看持有锁的线程是否很快释放锁。为了让线程等待只需让线程执行一个忙循环,这项技术就是自旋锁。
自旋不能替代阻塞,虽然避免了线程切换开销,但要占用处理器时间,如果锁被占用的时间很短,自旋的效果就会非常好,繁殖只会白白消耗处理器资源。如果自旋超过了限定的次数仍然没有成功获得锁,就应挂起线程,自旋默认限定次数是10。
自旋锁在JDK1.4中引入,默认关闭;在JDK1.6中改为默认开启。
JDK1.6对自旋锁进行了优化,自旋时间不再固定,而是由前一次的自旋时间及锁拥有者的状态决定。如果在同一个锁上,自旋刚刚成功获得过锁且持有锁的线程正在运行,虚拟机会认为这次自旋也很有可能成功,进而允许自旋持续更久。如果自旋很少成功,以后获取锁时将可能直接省略掉自旋,避免浪费处理器资源。
有了自适应自旋,随着程序运行时间的增长,虚拟机的程序锁的状况预测就会越来越精准。
JDK1.7之后不能控制是否开启自旋功能。
2.2.5 偏向锁
轻量级锁在没有竞争时,每次重入仍然需要执行CAS操作。
Java 6中引入了偏向锁来做进一步优化:只有第一次使用CAS将线程ID设置到对象的Mark Word头,之后发现这个线程ID是自己的就表示没有竞争,不用重新CAS。以后只要不发生竞争,这个对象就归该线程所有。
例如:
static final Object obj = new Object();
public static void m1() {
synchronized (obj) {
// 同步块A
m2();
}
}
public static void m2() {
synchronized (obj) {
// 同步块B
m3();
}
}
public static void m3() {
synchronized (obj) {
// 同步块C
}
}
一个对象创建时:
- 如果开启了偏向锁(默认开启),那么对象创建后,Mark Word值为0x05即最后三位为101,这时它的thread_id、epoch、age都为0。
- 偏向锁是默认是延迟的,不会在程序启动时立即生效,如果想避免延迟,可以加VM参数
-XX:BiasedLockingStartupDelay=0
来禁用延迟。 - 如果没有开启偏向锁,那么对象创建后,Mark Word值为0x01即最后3位为001,这时它的hashcode、age都为0,第一次用到hashcode时才会赋值。
- JVM禁用偏向锁:
-XX:-UseBiasedLocking
代码如下:
@Slf4j(topic = "Biased")
public class BiasedDemo {
public static void main(String[] args) throws InterruptedException {
Biased biased = new Biased();
log.debug("Before Biased Lock");
log.debug(ClassLayout.parseInstance(biased.toPrintable());
TimeUnit.SECONDS.sleep(3);
log.debug("After Biased Lock");
log.debug(ClassLayout.parseInstance(biased).toPrintable());
}
}
class Biased {
}
运行结果:
2021-04-09 19:14:30.487 [main] DEBUG Biased - Before Biased Lock
2021-04-09 19:14:32.083 [main] DEBUG Biased - top.parak.safe.Biased object internals:
OFFSET SIZE TYPE DESCRIPTION VALUE
0 4 (object header) 01 00 00 00 (00000001 00000000 00000000 00000000) (1)
4 4 (object header) 00 00 00 00 (00000000 00000000 00000000 00000000) (0)
8 4 (object header) 70 b8 01 f8 (01110000 10111000 00000001 11111000) (-134104976)
12 4 (loss due to the next object alignment)
Instance size: 16 bytes
Space losses: 0 bytes internal + 4 bytes external = 4 bytes total
2021-04-09 19:14:35.096 [main] DEBUG Biased - After Biased Lock
2021-04-09 19:14:35.097 [main] DEBUG Biased - top.parak.safe.Biased object internals:
OFFSET SIZE TYPE DESCRIPTION VALUE
0 4 (object header) 05 00 00 00 (00000101 00000000 00000000 00000000) (5)
4 4 (object header) 00 00 00 00 (00000000 00000000 00000000 00000000) (0)
8 4 (object header) 70 b8 01 f8 (01110000 10111000 00000001 11111000) (-134104976)
12 4 (loss due to the next object alignment)
Instance size: 16 bytes
Space losses: 0 bytes internal + 4 bytes external = 4 bytes total
因为偏向锁有延迟,所以在第二次创建对象前延时5S。
第一次创建对象时,可以看到,对象头的第一个字节为00000001
,即没有加锁,
第二次创建对象时,可以看到,对象头的第一个字节为00000101
,即加偏向锁。
下面使用工具类将繁琐的ClassLayout.parseInstance(obj).toPrintable()
的详细信息输出做一步简化,只输出关于Mark Word锁那一行的部分:
import org.openjdk.jol.info.ClassLayout;
/**
* @author KHighness
* @since 2021-04-09
*/
public class ClassLayoutUtil {
private final static int FIRST_LINE_START = 185;
private final static int LINE_LENGTH = 53;
public static String printSimple(ClassLayout instance) {
return instance.toPrintable().substring(FIRST_LINE_START, FIRST_LINE_START + LINE_LENGTH);
}
}
(1)撤销偏向锁-hashcode方法
使用偏向锁则进入Biased状态,使用hashcode()
方法则进入Normal状态。
Normal状态下:Mark Word(32) = hashcode(25) + age(4) + biased(1) + 01(2)
Biased状态下:Mark Word(32) = thread_id(23) + epoch(2) + biased(1) + 01(2)
@Slf4j(topic = "HashcodeBiased")
public class HashcodeBiasedDemo {
/**
* -XX:BiasedLockingStartupDelay=0 禁用偏向锁延迟
*/
public static void main(String[] args) {
HashcodeBiased biased = new HashcodeBiased();
log.debug(ClassLayoutUtil.printSimple(ClassLayout.parseInstance(biased))); // 偏向锁
log.debug(String.valueOf(biased.hashCode()));
log.debug(ClassLayoutUtil.printSimple(ClassLayout.parseInstance(biased))); // 无锁
}
}
class HashcodeBiased {
}
运行结果:
2021-04-09 23:03:39.378 [main] DEBUG HashcodeBiased - 00 00 00 (00000101 00000000 00000000 00000000) (5)
2021-04-09 23:03:39.380 [main] DEBUG HashcodeBiased - 1268650975
2021-04-09 23:03:39.381 [main] DEBUG HashcodeBiased - df 13 9e (00000001 11011111 00010011 10011110) (-1642
可以看到第一行为101
,第三行为001
,说明对象的偏向锁被撤销。
(2)撤销偏向锁-其他线程使用对象
当有其他线程使用偏向锁对象时,会将偏向锁升级为轻量级锁。
@Slf4j(topic = "OtherThreadBiased")
public class OtherThreadBiasedDemo {
/**
* -XX:BiasedLockingStartupDelay=0 禁用偏向锁延迟
*/
public static void main(String[] args) {
OtherThreadBiased biased = new OtherThreadBiased();
new Thread(() -> {
log.debug(ClassLayoutUtil.printSimple(ClassLayout.parseInstance(biased))); // 偏向锁
synchronized (biased) {
log.debug(ClassLayoutUtil.printSimple(ClassLayout.parseInstance(biased))); // 偏向锁
}
log.debug(ClassLayoutUtil.printSimple(ClassLayout.parseInstance(biased))); // 偏向锁
synchronized (OtherThreadBiased.class) {
OtherThreadBiased.class.notify();
}
}, "t1").start();
new Thread(() -> {
synchronized (OtherThreadBiased.class) {
try {
OtherThreadBiased.class.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug(ClassLayoutUtil.printSimple(ClassLayout.parseInstance(biased))); // 偏向锁
synchronized (biased) {
log.debug(ClassLayoutUtil.printSimple(ClassLayout.parseInstance(biased))); // 偏向锁
}
log.debug(ClassLayoutUtil.printSimple(ClassLayout.parseInstance(biased))); // 无锁
}, "t2").start();
}
}
class OtherThreadBiased {
}
运行结果:
2021-04-09 23:20:08.722 [t1] DEBUG OtherThreadBiased - 05 00 00 00 (00000101 00000000 00000000 00000000) (5)
2021-04-09 23:20:08.724 [t1] DEBUG OtherThreadBiased - 05 80 71 20 (00000101 10000000 01110001 00100000) (54
2021-04-09 23:20:08.725 [t1] DEBUG OtherThreadBiased - 05 80 71 20 (00000101 10000000 01110001 00100000) (54
2021-04-09 23:20:08.726 [t2] DEBUG OtherThreadBiased - 05 80 71 20 (00000101 10000000 01110001 00100000) (54
2021-04-09 23:20:08.727 [t2] DEBUG OtherThreadBiased - a0 f2 c0 20 (10100000 11110010 11000000 00100000) (54
2021-04-09 23:20:08.728 [t2] DEBUG OtherThreadBiased - 01 00 00 00 (00000001 00000000 00000000 00000000) (1)
(3)撤销偏向锁-调用wait/notify
因为wait/notify只有重量级锁才有。
(4)批量重定向
如果对象虽然被多个线程访问,但没有竞争,这时偏向了线程T1的对象仍有机会重新偏向T2,重偏向会重置对象的Thread ID。
当撤销偏向锁阈值超过20次后,JVM会这样觉得,我是不是偏向错了呢,于是会再给这些对象加锁时重新偏向至加锁线程。
(5)批量撤销
当撤销偏向锁阈值超过40次后,JVM会这样觉得,自己确实偏向错了,根本就不该偏向,于是整个类的所有对象都会变为不可偏向的,新建的对象也是不可偏向的。
(6)锁消除
- 线程同步的代价是相当高的,同步的后果是降低并发性和性能。
- 在动态编译同步块的时候,JIT编译器可以借助逃逸分析来判断同步块所使用的锁对象是否只能够被一个线程访问而没有被发布到其他线程。
- 如果没有,那么JIT编译器在编译这个同步块的时候就会取消对这部分代码的同步。
2.2.6 synchronized总结
JDK1.6对synchronized做了很多优化,引入了自适应自旋锁、锁消除、锁粗化、偏向锁和轻量级锁等提高锁的效率,锁一共有四个状态,级别从低到高依次是:无锁、偏向锁、轻量级锁和重量级锁,状态会随竞争情况升级。锁可以升级但不能降级,这种只能升级不能降级的锁策略是为了提高锁获得和释放的效率。
对比如下:
锁状态 | 优点 | 缺点 | 适用场景 |
---|---|---|---|
偏向锁(01) | 加锁和解锁不需要额外的消耗,和执行非同步方法比仅存在纳秒级的差距。 | 如果线程间存在锁竞争,会带来额外的所撤销的消耗。 | 适用于只有一个线程访问同步块场景。 |
轻量级锁(00) | 竞争的线程不会阻塞,提高了程序的响应速度。 | 如果始终得不到锁竞争的线程使用自旋会消耗CPU。 | 追求响应时间。 同步块执行速度非常快。 |
重量级锁(10) | 线程竞争不使用自旋,不会消耗CPU。 | 线程阻塞,响应时间缓慢。 | 追求吞吐量。 同步块执行速度较长。 |
自旋锁
同步对性能最大的影响是阻塞,挂起和恢复线程的操作都需要转入内核态完成。许多应用上共享数据的锁定只会持续很短的时间,为了这段时间去挂起和恢复线程并不值得。如果机器有多个处理核心,我们可以让后面请求锁的线程稍等一会,但不放弃处理器的执行时间,看看持有锁的线程是否很快会释放锁。为了让线程等待只需让线程执行一个忙循环。
自旋锁在JDK1.4中引入,默认关闭,在JDK1.6中默认开启。自旋不能替代阻塞,虽然避免了线程切换开销,但要占用处理器时间,如果锁被占用的时间很短,自旋的效果就会非常好,反之只会白白消耗处理器资源。如果自旋超过限定次数仍然没有获取到锁,那么就应该挂起线程,自旋默认次数是10。
自适应自旋
JDK1.6对自选苏军哦进行了优化,自旋时间不再固定,而是由前一次的自旋时间及锁拥有者的状态决定。
如果在同一个锁上,自旋刚刚成功获得过锁且持有锁的线程正在运行,虚拟机会认为这次自旋也很有可能成功,进而允许自旋持续更久。如果自旋很少成功,以后获取锁时可能直接省略掉自旋,避免浪费处理器资源。
有了自适应自旋,随着程序运行时间的增长,虚拟机对程序锁的状况预测就会越来越精准。
锁消除
即时编译器对检测不可能存在共享数据竞争的锁进行消除。
主要判定依据来源于逃逸分析,如果判断一段代码中堆上的所有数据都只被一个线程加锁,就可以当作栈上的数据对待,认为它们是线程私有的而无须同步。
锁粗化
原则需要将同步块的作用范围限制得尽量小,只在共享数据的实际作用域中进行同步,这是为了使等待锁的线程尽快拿到锁。
但如果一系列的连续操作都对同一个对象反复加锁和解锁,甚至加锁操作是出现在循环体之外的,即使没有线程竞争也会导致不必要的性能消耗。因此如果虚拟机探测到有一串零碎的操作都对同一个对象加锁,将会把同步的范围扩展到整个操作序列的外部。
偏向锁
偏向锁是为了在没有竞争的情况下减少锁开销,锁会偏向于第一个获得它的线程,如果在执行过程中锁一直没有被其他线程获取,则持有偏向锁的线程将不需要进行同步。
当锁对象第一次被线程获取时,虚拟机会将对象头MarkWord中的偏向模式设置为1,锁标志状态设置为01,同时使用CAS把获取到锁的线程ID记录在对象的MarkWord中。如果CAS成功,持有偏向锁的线程以后每次进入锁相关的同步块都不再进行任何同步操作。
一旦有其他线程尝试获取锁,偏向模式立即结束,根据锁对象是否处于锁定状态决定是否撤销锁偏向,后续同步按照轻量级锁那样执行。
轻量级锁
轻量级锁时为了没有在竞争的前提下减少重量级锁使用操作系统互斥量产生的性能消耗。
在代码即将进入同步块时,如果同步对象没有被锁定,虚拟机将在当前线程的栈帧中建立一个锁记录i空间,存储锁对象目前MarkWord的拷贝。然后虚拟机使用CAS尝试把对象的MarkWord更新为指向锁记录的指针,如果更新成功即代表该线程拥有了锁,锁标志位将转变为00,标识处于轻量级锁状态。
如果更新失败就意味着至少存在一条线程与当前线程竞争。虚拟机检查对象的MarkWord是否指向当前线程的栈帧,如果是则说明当前线程已经拥有了锁,直接进入同步块继续执行,否则说明锁对象已经被其他线程抢占。如果出现两条以上线程争用同一个锁,轻量级锁将进行锁膨胀,成为重量级锁,锁标志状态标为10,此时MarkWord存储的就是指向重量级锁的指针,后面等待锁的线程也必须阻塞。
解锁同样通过CAS进行,如果对象MarkWord仍然指向线程的锁记录,就用CAS把对象的当前MarkWord和线程复制的MarkWord替换回来。加入替换成功同步过程就顺利完成了,如果失败则说明有其他线程尝试过获取该锁,就要在释放锁的同事唤醒被挂起的线程。
2.3 第三部分-模式
2.3.1 wait notify
wait/notify原理
- Owner:获得锁的线程。初始化为null,当有线程占有该monitor的时候,Owner标记为该线程的唯一标识,当线程释放monitor的时候,Owner又恢复为null。
- EntryList:有资格获取资源的线程会被移动到该队列中。
WaitSet:如果Owner线程被wait方法阻塞,则转移至WaitSet队列。
Owner
线程发现条件不满足,调用wait
方法,即可进入WaitSet
变成WAITING
状态BLOCKED
和WAITING
的线程都处于阻塞状态,不占用CPU时间片BLOCKED
线程会在Owner
线程加锁时唤醒WAITING
线程会在Owner
线程调用notify
或notifyAll
时唤醒,但唤醒后并不意味着立刻获得锁,仍需进入EntryList
重新竞争。
API介绍
obj.wait()
让进入object监视器的线程到WaitSet
等待obj.notify()
在object上正在WaitSet
等待的线程中挑一个唤醒obj.notifyAll()
让object上正在WaitSet
等待的线程全部唤醒
它们都是线程之间进行协作的手段,都属于object对象的方法,必须获得此对象的锁,才能调用这几个方法。
案例:
@Slf4j(topic = "WaitNotify")
public class WaitNotifyDemo {
final static Object obj = new Object();
public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
synchronized (obj) {
log.debug("执行...");
try {
obj.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("其它代码...");
}
}, "t1").start();
new Thread(() -> {
synchronized (obj) {
log.debug("执行...");
try {
obj.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("其它代码...");
}
}, "t2").start();
TimeUnit.SECONDS.sleep(1);
log.debug("唤醒obj上其他线程");
synchronized (obj) {
obj.notify(); // (1)
obj.notifyAll(); // (2)
}
}
}
注释(2),即调用obj.notify()
:
2021-04-10 19:53:10.709 [t1] DEBUG WaitNotify - 执行...
2021-04-10 19:53:10.711 [t2] DEBUG WaitNotify - 执行...
2021-04-10 19:53:11.717 [main] DEBUG WaitNotify - 唤醒obj上其他线程
2021-04-10 19:53:11.717 [t1] DEBUG WaitNotify - 其它代码...
注释(1),即调用obj.notifyAll()
:
2021-04-10 19:54:13.288 [t1] DEBUG WaitNotify - 执行...
2021-04-10 19:54:13.289 [t2] DEBUG WaitNotify - 执行...
2021-04-10 19:54:14.296 [main] DEBUG WaitNotify - 唤醒obj上其他线程
2021-04-10 19:54:14.296 [t2] DEBUG WaitNotify - 其它代码...
2021-04-10 19:54:14.296 [t1] DEBUG WaitNotify - 其它代码...
可以看到notify
只能唤醒一个线程,而notifyAll
唤醒了所有线程。
sleep与wait
Thread#sleep()
与Object#wait()
相同点:
- 线程进入了
sleep
和wait
都进入TIMED_WAITING
状态。
不同点:
sleep
不需要强制和synchronized
配合使用,但是wait
一定要和synchronized
配合使用sleep
在睡眠的同时不会释放对象锁,但是wait
在等待时会释放对象锁。
案例:
@Slf4j(topic = "SleepAndWait")
public class SleepAndWaitDemo {
private static final Object lock = new Object();
public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
synchronized (lock) {
log.debug("获得锁");
try {
Thread.sleep(20_000); // (1)
lock.wait(20_000); // (2)
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "t1").start();
TimeUnit.SECONDS.sleep(1);
synchronized (lock) {
log.debug("获得锁");
}
}
}
注释(2),即调用Thread.sleep()
:
2021-04-10 20:23:40.382 [t1] DEBUG SleepAndWait - 获得锁
2021-04-10 20:24:00.386 [main] DEBUG SleepAndWait - 获得锁
注释(1),即调用obj.wait()
:
2021-04-10 20:26:40.335 [t1] DEBUG SleepAndWait - 获得锁
2021-04-10 20:26:41.335 [main] DEBUG SleepAndWait - 获得锁
可以看到sleep情况下主线程20S后才拿到锁,而wait情况下主线程立即可以拿到锁。
wait/notify正确姿势
// 一个线程
synchronized (lock) {
while (condition) { // 防止虚假唤醒
lock.wait();
}
// do something
}
// 另一个线程
synchronized (lock) {
lock.notifyAll();
}
2.3.2 同步模式之保护性暂停
Guarded Suspension,用在一个线程等待另一个线程的执行结果。
- 有一个结果需要从一个线程传递到另一个线程,让他们关联到一个Guarded Object
- 如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)
- JDK中,join的实现、future的实现,采用的就是此模式
- 因为要等待另一方的结果,因此归类到同步模式
示例:一个线程需要等待另一个线程的下载结果,并且设置超时时间。
@Slf4j(topic = "GuardedObject")
public class GuardedObjectDemo {
public static void main(String[] args) {
GuardedObject guardedObject = new GuardedObject();
// 获取结果线程
new Thread(() -> {
log.debug("等待结果");
List<String> response = (List<String>) guardedObject.getResponse(5000);
if (response != null) {
log.debug("结果大小:{}", response.size());
} else {
log.debug("下载失败");
}
}, "receive").start();
// 下载内容线程
new Thread(() -> {
log.debug("执行下载");
List<String> download = ParaKDownloader.download("https://www.parak.top");
guardedObject.setResponse(download);
}, "download").start();
}
}
class GuardedObject {
// 结果
private Object response;
// 获取结果
public Object getResponse(long timeout) {
synchronized (this) {
// 开始时间
long start = System.currentTimeMillis();
// 经历时间
long pass = 0;
// 没有结果
while (response == null) {
// 此轮循环应该等待的时间
long wait = timeout - pass;
if (wait <= 0) {
break;
}
try {
this.wait(wait);
} catch (InterruptedException e) {
e.printStackTrace();
}
pass = System.currentTimeMillis() - start;
}
return response;
}
}
// 产生结果
public void setResponse(Object response) {
synchronized (this) {
this.response = response;
this.notifyAll();
}
}
}
class ParaKDownloader {
public static List<String> download(String url) {
HttpURLConnection connection = null;
List<String> res = new ArrayList<>();
String line = null;
try {
connection = (HttpURLConnection) new URL(url).openConnection();
BufferedReader reader = new BufferedReader(new InputStreamReader(connection.getInputStream(), StandardCharsets.UTF_8));
while ((line = reader.readLine()) != null) {
res.add(line);
}
} catch (IOException e) {
e.printStackTrace();
}
return res;
}
}
join原理
看一看Thread#join()
的源代码,即是同步模式之保护性暂停的应用。
public final synchronized void join(long millis) throws InterruptedException {
long base = System.currentTimeMillis(); // 开始时间
long now = 0; // 当前时间
if (millis < 0) { // 参数校验
throw new IllegalArgumentException("timeout value is negative");
}
if (millis == 0) { // 一直等待,直到线程结束
while (isAlive()) {
wait(0);
}
} else {
while (isAlive()) {
long delay = millis - now; // 这轮需要等待的时间
if (delay <= 0) {
break;
}
wait(delay); // 等待
now = System.currentTimeMillis() - base; // 经过时间
}
}
}
扩展
如果需要在多个类中间使用GuardedObject
对象,作为参数传递不是很方便,因此设计一个用来解耦的中间类。
比如一个小区有一个信箱,每个居民只需要在信箱里根据ID取到自己的信件,而每一封信件都由不同的邮递员送达。
public class MailBoxesDemo {
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 5; i++) {
new People().start();
}
TimeUnit.SECONDS.sleep(1);
for (Integer id : MailBoxes.getIds()) {
new Postman(id, "内容" + id).start();
}
}
}
// 居民
@Slf4j(topic = "People")
class People extends Thread {
@Override
public void run() {
Mail mail = MailBoxes.createMail();
log.debug("开始收信: {}", mail.getId());
String content = (String) mail.getContent(5000);
log.debug("收到信件: {}", content);
}
}
// 邮递员
@Slf4j(topic = "Postman")
class Postman extends Thread{
private int id;
private String content;
public Postman(int id, String content) {
this.id = id;
this.content = content;
}
@Override
public void run() {
Mail mail = MailBoxes.getMail(id);
log.debug("送信 [id={}, content={}]", id, content);
mail.setContent(content);
}
}
// 信箱
class MailBoxes {
private static Map<Integer, Mail> boxes = new Hashtable<>();
private static int id = 0;
private static synchronized int generateId() {
return ++id;
}
public static Mail getMail(int id) {
return boxes.remove(id);
}
public static Mail createMail() {
int id = generateId();
Mail mail = new Mail(id);
boxes.put(id, mail);
return mail;
}
public static Set<Integer> getIds() {
return boxes.keySet();
}
}
// 信件
class Mail {
// 信件ID
private final int id;
public Mail(int id) {
this.id = id;
}
public int getId() {
return id;
}
// 信件内容
private Object content;
// 获取内容
public Object getContent(long timeout) {
synchronized (this) {
// 开始时间
long start = System.currentTimeMillis();
// 经历时间
long pass = 0;
// 没有结果
while (content == null) {
// 此轮循环应该等待的时间
long wait = timeout - pass;
if (wait <= 0) {
break;
}
try {
this.wait(wait);
} catch (InterruptedException e) {
e.printStackTrace();
}
pass = System.currentTimeMillis() - start;
}
return content;
}
}
// 设置内容
public void setContent(Object content) {
synchronized (this) {
this.content = content;
this.notifyAll();
}
}
}
2.3.3 异步模式之生产者/消费者
模型描述如下:
- 消费队列可以平衡生产和消费的资源
- 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
- 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
- JDK中各种阻塞队列,采用的就是这种模式
实现:
public class MessageQueueDemo {
public static void main(String[] args) {
MessageQueue queue = new MessageQueue(2);
// 3个生产者
for (int i = 0; i < 3; i++) {
final int id = i + 1;
new Thread(() -> {
queue.put(new Message(id, "值" + id));
}, "生产者" + id).start();
}
// 1个消费者
new Thread(() -> {
while (true) {
// 1S消费一个线程
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
queue.get();
}
}, "消费者").start();
}
}
// 消息队列类
@Slf4j(topic = "MessageDemo")
class MessageQueue {
// 消息队列集合
private LinkedList<Message> list;
// 消息队列容量
private int capacity;
public MessageQueue(int capacity) {
this.capacity = capacity;
this.list = new LinkedList<>();
}
// 获取消息
public Message get() {
// 检查队列是否为空
synchronized (list) {
while (list.isEmpty()) {
try {
log.debug("队列为空,消费者线程等待");
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 从队列头部获取消息并返回
Message message = list.removeFirst();
log.debug("消费消息 <= {}", message.toString());
list.notifyAll();
return message;
}
}
// 存入消息
public void put(Message message) {
synchronized (list) {
// 检查队列是否已满
while (list.size() == capacity) {
try {
log.debug("队列已满,生产者线程等待");
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 将消息加入队列尾部
list.addLast(message);
log.debug("生产消息 => {}", message.toString());
list.notifyAll();
}
}
}
// 消息
class Message {
private int id;
private Object value;
public Message(int id, Object value) {
this.id = id;
this.value = value;
}
public int getId() {
return id;
}
public Object getValue() {
return value;
}
@Override
public String toString() {
return "Message[" +
"id=" + id +
", value=" + value +
']';
}
}
2.3.4 park unpark
park/unpark
是LockSupport
类中的方法:
// 暂停当前线程
LockSupport.park();
// 恢复线程运行
LockSupport.unpark(Thread thread);
park & unpark
与wait & notify
区别:
wai
t、notify
和notifyAll
必须配合Object Monitor一起使用,而park
和unpark
不必。park/unpark
是以线程为单位类【阻塞】和【唤醒】线程,而notify
只能随机唤醒一个等待线程,notifyAll
是唤醒所有等待线程,就不那么【精确】。park & unpark
可以先unpark
,而wait & notify
不能先notify
。
示例:
@Slf4j(topic = "ParkUnpark")
public class ParkUnparkDemo {
public static void main(String[] args) {
Scanner sc = new Scanner(System.in);
String str = null;
while (!(str = sc.nextLine()).equals("")) {
String[] s = str.split(" ");
int time1 = Integer.parseInt(s[0]);
int time2 = Integer.parseInt(s[1]);
// park
Thread t1 = new Thread(() -> {
log.debug("start...");
try {
TimeUnit.SECONDS.sleep(time1);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("park...");
LockSupport.park();
log.debug("continue...");
}, "t1");
t1.start();
// unpark
try {
TimeUnit.SECONDS.sleep(time2);
} catch (InterruptedException e) {
e.printStackTrace();
}
// unpark既可以在park之前调用,也可以在park之后调用
log.debug("unpark...");
LockSupport.unpark(t1);
}
}
}
运行结果:
1 2
2021-04-18 20:20:53.213 [t1] DEBUG ParkUnpark - start...
2021-04-18 20:20:54.217 [t1] DEBUG ParkUnpark - park...
2021-04-18 20:20:55.226 [main] DEBUG ParkUnpark - unpark...
2021-04-18 20:20:55.226 [t1] DEBUG ParkUnpark - continue...
2 1
2021-04-18 20:20:57.756 [t1] DEBUG ParkUnpark - start...
2021-04-18 20:20:58.771 [main] DEBUG ParkUnpark - unpark...
2021-04-18 20:20:59.762 [t1] DEBUG ParkUnpark - park...
2021-04-18 20:20:59.762 [t1] DEBUG ParkUnpark - continue...
第一次输入,time1
= 1, time2
= 2,于是先执行park
,后执行unpark
,最终执行了continue
;
第一次输入,time1
= 2, time2
= 1,于是先执行unpark
,后执行park
,最终也执行了continue
。
证明unpark
的执行比较随意。
park/unpark原理
每个线程都有自己的Parker对象,由三部分组成:_countrer
、_cond
和_mutex
打个比喻
- 线程就像一个旅人,Parker就像为他随身携带的背包,
_cond
就好比背包中的帐篷,_counter
就好比背包中的备用干粮(0为耗尽,1为充足)
- 调用
park
就是要看需不需要停下来歇息- 如果备用干粮耗尽(
_counter
= 0),那么钻进帐篷歇息。 - 如果备用干粮充足(
_counter
= 1),那么就不需停留,继续前进。
- 如果备用干粮耗尽(
- 调用
unpark
,就好比向背包中补充干粮- 如果这时线程还在敞篷,就唤醒让他继续前进。
- 如果这时线程还在运行,那么下次他调用
park
时,仅是消耗掉备用干粮,不需停留继续前进。- 因为背包空间有限,多次调用
unpark
仅会补充一份备用干粮。
- 因为背包空间有限,多次调用
图示如下:
(1)先调用park
再调用unpark
- 当前线程调用
Unsafe.park()
方法 - 检查
_counter
,本情况为0,这时,获得_mutex
互斥锁 - 线程进入
_cond
条件变量阻塞 - 设置
_counter = 0
- 调用
Unsafe.unpark(Thread_0)
方法,设置_counter
为1 - 唤醒
_cond
条件变量中的Thread_0
Thread_0
恢复运行- 设置
_counter
为0
(2)先调用unpark
再调用park
- 调用
Unsafe.unpark(Thread)0
方法,设置_counter
为1 - 当前线程调用
Unsafe.park()
方法 - 检查
_counter
,本情况为1,这时线程无需阻塞,继续运行 - 设置
_counter
为0
2.3.5 线程状态转换
假设有线程t:
(1)情况1:NEW
==> RUNNABLE
当调用t.start()
方法时,由NEW
=> RUNNABLE
(2)情况2:RUNNABLE
<==> WAITING
t
线程用synchronized(obj)
获取了对象锁后
- 调用
obj.wait()
方法时,t线程从RUNNABLE
==>WAITING
- 调用
obj.notify()
、obj.notifyAll()
、t.interrupt()
时- 竞争锁成功,t线程从
WAITING
==>RUNNABLE
- 竞争锁失败,t线程从
WAITING
==>BLOCKED
- 竞争锁成功,t线程从
(3)情况3:RUNNABLE
<==> WAITING
- 当前线程调用
t.join()
方法时,当前线程从RUNNABLE
==>WAITING
- 注意是当前线程在
t
线程对象的监视器上等待
- 注意是当前线程在
t
线程运行结束,或调用了当前线程的interrupt()
时,当前线程从WAITING
==>RUNNABLE
(4)情况4:RUNNABLE
<==> WAITING
- 当前线程调用
LockSupport.park()
方法会让当前线程从RUNNABLE
==>WAITING
- 调用
LockSupport.unpark(目标线程)
或者调用了线程的interrupt()
,会让目标线程从WAITING
==>RUNNABLE
(5)情况5:RUNNABLE
<==> WAITING
t
线程用synchronized(obj)
获取了对象锁后
- 调用
obj.wait(long n)
方法时,t
线程从RUNNABLE
==>TIMED_WAITING
t
线程等待时间超过了n毫秒,或调用obj.notify()
,obj.notifyAll()
,t.interrupt()
时- 竞争时成功,
t
线程从TIMED_WAITING
==>RUNNABLE
- 竞争锁失败,
t
线程从TIMED_WAITING
==>BLOCKED
- 竞争时成功,
(6)情况6:RUNNABLE
<==> TIMED_WAITING
- 当前线程调用
t.join(long n)
方法时,当前线程从RUNNABLE
==>TIMED_WAITING
- 注意是当前线程在
t
线程对象的监视器上等待
- 注意是当前线程在
- 当前线程等待时间超过了n毫秒,或者
t
线程运行结束,或者调用了当前线程的interrupt()
时,当前线程从TIMED_WAITING
==>RUNNABLE
(7)情况7:RUNNABLE
<==> TIMED_WAITING
- 当前线程调用
Thread.sleep(long n)
,当前线程从RUNNABLE
==>TIMED_WAITING
- 当前线程等待时间超过了n毫秒,当前线程从
TIMED_WAITING
==>RUNNABLE
(8)情况8:RUNNABLE
<==> TIMED_WAITING
- 当前线程调用
LockSupport.parkNanos(long nanos)
或LockSupport.parkUntil(long millons)
,当前线程从RUNNABLE
==>TIMED_WAITING
- 调用
LockSupport.unpark(目标线程)
或者调用了线程的interrupt()
,或时等待超时,会让目标线程从TIMED_WAITING
==>RUNNABLE
(9)情况9:RUNNABLE
<==> BLOCKED
t
线程用synchronized(obj)
获取了对象锁时如果竞争失败,从RUNNABLE
==>BLOCKED
- 持
obj
锁线程的同步代码块执行完毕,会唤醒该对象上所有BLOCKED
的线程重新竞争,如果其中t
线程竞争成功,从BLOCKED
==>RUNNABLE
,其他失败的线程仍然BLOCKED
(10)情况10:RUNNABLE
<==> TERMINATED
当先线程所有代码运行完毕,进入TERMINATED
2.4 第四部分-Lock
2.4.1 多把锁
多把不相干的锁
一件大屋子有两个功能:睡觉、学习,互不相干。
现在小南要学习,小女要睡觉,但如果只用一间屋子(一个对象锁)的话,那么并发度很低。
例如:
public class MultiLockDemo {
public static void main(String[] args) {
BigRoom bigRoom = new BigRoom();
new Thread(() -> {
try {
bigRoom.study();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "小南").start();
new Thread(() -> {
try {
bigRoom.sleep();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "小女").start();
}
}
@Slf4j(topic = "BigRoom")
class BigRoom {
public void sleep() throws InterruptedException {
synchronized (this) {
log.debug("sleep2个小时");
TimeUnit.SECONDS.sleep(2);
}
}
public void study() throws InterruptedException {
synchronized (this) {
log.debug("sleep1个小时");
TimeUnit.SECONDS.sleep(1);
}
}
}
解决方法是准备多个房间(多个对象锁),让小南和小女获得不同的锁:
@Slf4j(topic = "BigRoom")
class BigRoom {
/* 书房 */
private final Object studyRoom = new Object();
/* 卧室 */
private final Object sleepRoom = new Object();
public void sleep() throws InterruptedException {
synchronized (sleepRoom) {
log.debug("sleep2个小时");
TimeUnit.SECONDS.sleep(2);
}
}
public void study() throws InterruptedException {
synchronized (studyRoom) {
log.debug("sleep1个小时");
TimeUnit.SECONDS.sleep(1);
}
}
}
锁的粒度细分:
- 好处:增强程序的并发性。
- 坏处:如果一个线程需要同时获得多把锁,就容易发生死锁。
2.4.2 活跃性
- 因为某种原因,使得代码一直无法执行完毕,这样的现象叫做活跃性。
- 活跃性相关的一系列问题都可以用
ReentrantLock
进行解决。
2.4.2.1 死锁
一个线程需要同时获得多把锁,这时就容易发生死锁。
如:线程1获取A对象锁,线程2获取B对象锁,此时线程1又想获取B对象锁,线程2又想获取A对象锁,它们都等着对象释放锁,此时就称为死锁。
例如:
@Slf4j(topic = "DeadLock")
public class DeadLockDemo {
public static void main(String[] args) {
final Object A = new Object();
final Object B = new Object();
new Thread(() -> {
synchronized (A) {
log.debug("lock A");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (B) {
log.debug("lock B");
}
}
}, "t1").start();
new Thread(() -> {
synchronized (B) {
log.debug("lock B");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (A) {
log.debug("lock A");
}
}
}, "t2").start();
}
}
运行结果:
2021-04-22 22:48:27.708 [t2] DEBUG DeadLock - lock B
2021-04-22 22:48:27.708 [t1] DEBUG DeadLock - lock A
发生死锁的必要条件
- 互斥条件:在一段时间内,一种资源只能被一个进程所使用。
- 请求和保持条件:进程已经拥有了至少一种资源,同时又去申请其他资源。因为其他资源被别的进程所使用,该进程进入阻塞状态,并且不释放自己已有的资源。
- 不可剥夺条件:进程对已获得的资源在未使用完成前不能被强占,只能在进程使用完后自己释放。
- 循环等待条件:发生死锁时,必然存在一个进程——资源的循环链。
定位死锁的方法
先运行上述死锁示例,保证系统中存在死锁进程。
- jps进程id + jstack定位死锁
PS > jps
13616 Jps
14432 Launcher
13796 DeadLockDemo // 死锁进程
16676
26428 RemoteMavenServer36
PS > jstack 13796
2021-04-22 11:15:43
Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.221-b11 mixed mode):
......
Found one Java-level deadlock: // 发现一个Java级别的死锁
=============================
// 死锁的引用情况
"t2":
waiting to lock monitor 0x000000001d390b08 (object 0x000000076cfd4e58, a java.lang.Object),
which is held by "t1"
"t1":
waiting to lock monitor 0x000000001d393028 (object 0x000000076cfd4e68, a java.lang.Object),
which is held by "t2"
Java stack information for the threads listed above:
===================================================
"t2":
at top.parak.share.DeadLockDemo.lambda$main$1(DeadLockDemo.java:39) // 发生死锁的位置
- waiting to lock <0x000000076cfd4e58> (a java.lang.Object)
- locked <0x000000076cfd4e68> (a java.lang.Object)
at top.parak.share.DeadLockDemo$$Lambda$2/885951223.run(Unknown Source)
at java.lang.Thread.run(Thread.java:748)
"t1":
at top.parak.share.DeadLockDemo.lambda$main$0(DeadLockDemo.java:26) // 发生死锁的位置
- waiting to lock <0x000000076cfd4e68> (a java.lang.Object)
- locked <0x000000076cfd4e58> (a java.lang.Object)
at top.parak.share.DeadLockDemo$$Lambda$1/1854778591.run(Unknown Source)
at java.lang.Thread.run(Thread.java:748)
Found 1 deadlock.
- jconsole工具检测死锁
powershell输入jconsole
,连接本地进程,选择线程界面
死锁举例:哲学家就餐问题
有五位哲学家,围坐在圆桌旁:
- 他们只做两件事,思考和吃饭,思考一会吃口饭,吃完饭接着思考。
- 吃饭时要用两根筷子吃,桌子上共有五根筷子,每位哲学家左右手边各有一根筷子。
- 如果筷子被身边的人拿着,自己就得等待。
当每位哲学家即线程持有一根筷子时,他们都在等待另一个线程释放锁,因此造成了死锁。
代码:
public class PhilosophersEatingDemo {
private final static int NUM = 5;
public static void main(String[] args) {
Chopstick[] c = new Chopstick[NUM];
for (int i = 0; i < NUM; i++) {
c[i] = new Chopstick(String.valueOf(i));
}
new Philosopher("苏格拉底", c[0], c[1]).start();
new Philosopher("柏拉图", c[1], c[2]).start();
new Philosopher("亚里士多德", c[2], c[3]).start();
new Philosopher("郝拉克利特", c[3], c[4]).start();
new Philosopher("阿基米德", c[4], c[0]).start();
}
}
// 哲学家
@Slf4j(topic = "Philosopher")
class Philosopher extends Thread {
final Chopstick left; // 左手筷子
final Chopstick right; // 右手筷子
public Philosopher(String name, Chopstick left, Chopstick right) {
super(name);
this.left = left;
this.right = right;
}
public void run() {
while (true) {
// 尝试获得左手筷子
synchronized (left) {
// 尝试获得右手筷子
synchronized (right) {
eat();
}
}
}
}
// 吃饭
private void eat() {
log.debug("eating...");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// 筷子
class Chopstick {
String name;
public Chopstick(String name) {
this.name = name;
}
@Override
public String toString() {
return "筷子[" +
"name='" + name + '\'' +
']';
}
}
运行结果:
2021-04-22 14:15:57.623 [亚里士多德] DEBUG Philosopher - eating...
2021-04-22 14:15:57.623 [苏格拉底] DEBUG Philosopher - eating...
2021-04-22 14:15:58.632 [亚里士多德] DEBUG Philosopher - eating...
2021-04-22 14:15:58.632 [苏格拉底] DEBUG Philosopher - eating...
2021-04-22 14:15:59.643 [柏拉图] DEBUG Philosopher - eating...
2021-04-22 14:16:00.647 [柏拉图] DEBUG Philosopher - eating...
jconsole检测死锁:
---------------------------------------------------------------------
名称: 苏格拉底
状态: top.parak.share.Chopstick@798c4fad【筷子1】上的BLOCKED, 拥有者: 柏拉图
总阻止数: 8, 总等待数: 6
堆栈跟踪:
top.parak.share.Philosopher.run(PhilosophersEatingDemo.java:45)
- 已锁定 top.parak.share.Chopstick@6ea242ba【筷子0】
---------------------------------------------------------------------
名称: 柏拉图
状态: top.parak.share.Chopstick@6057af47【筷子2】上的BLOCKED, 拥有者: 亚里士多德
总阻止数: 3, 总等待数: 2
堆栈跟踪:
top.parak.share.Philosopher.run(PhilosophersEatingDemo.java:45)
- 已锁定 top.parak.share.Chopstick@798c4fad【筷子1】
---------------------------------------------------------------------
名称: 亚里士多德
状态: top.parak.share.Chopstick@6796caf6【筷子3】上的BLOCKED, 拥有者: 郝拉克利特
总阻止数: 8, 总等待数: 2
堆栈跟踪:
top.parak.share.Philosopher.run(PhilosophersEatingDemo.java:45)
- 已锁定 top.parak.share.Chopstick@6057af47【筷子2】
---------------------------------------------------------------------
名称: 郝拉克利特
状态: top.parak.share.Chopstick@16b56e05【筷子4】上的BLOCKED, 拥有者: 阿基米德
总阻止数: 2, 总等待数: 0
堆栈跟踪:
top.parak.share.Philosopher.run(PhilosophersEatingDemo.java:45)
- 已锁定 top.parak.share.Chopstick@6796caf6【筷子3】
---------------------------------------------------------------------
名称: 阿基米德
状态: top.parak.share.Chopstick@6ea242ba【筷子0】上的BLOCKED, 拥有者: 苏格拉底
总阻止数: 1, 总等待数: 0
堆栈跟踪:
top.parak.share.Philosopher.run(PhilosophersEatingDemo.java:45)
- 已锁定 top.parak.share.Chopstick@16b56e05【筷子4】
大家都拿着一根筷子,等着另一根筷子,造成死锁,解决得看后面的ReentrantLock
。
2.4.2.2 活锁
活锁出现在两个线程互相改变对方的结束条件,最后谁也无法结束。
例如:
@Slf4j(topic = "LiveLock")
public class LiveLockDemo {
static volatile int count = 10;
static final Object lock = new Object();
public static void main(String[] args) {
new Thread(() -> {
// 期望减到0,退出循环
while (count > 0) {
try {
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
count--;
log.debug("count: {}", count);
}
}, "t1").start();
new Thread(() -> {
// 期望超过20退出循环
while (count < 20) {
try {
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
count++;
log.debug("count: {}", count);
}
}, "t2").start();
}
}
解决:在线程执行时,中途给予不同的间隔时间,让某个线程先结束即可。
死锁与活锁的区别:
- 死锁是因为线程互相持有对象想要的锁,并且都不释放,最后到时线程阻塞,停止运行的现象。
- 活锁是因为线程间修改了对方的结束条件,从而导致代码一直在运行,却一直运行不完的现象。
2.4.2.3 饥饿
某些线程因为优先级太低,始终得不到CPU调度执行,也不能够结束。
2.4.3 ReentrantLock
特点(synchronized不具备的):
- 可中断
- 可以设置超时时间
- 可以设置为公平锁
- 支持多个条件变量
与synchronized
一样,都支持可重入。
两者比较如下:
Lock | synchronized | |
---|---|---|
层次方面 | (1)Lock 是一个接口,是在类级别上的实现;(2)JDK层次的实现。 |
(1)synchronized 是Java关键字;(2)JVM层次定义的。 |
灵活性方面 | Lock 接口提供的lock() 和unlock() 方法,可以随时获得锁、释放锁,非常灵活。 |
释放锁、获得锁是被动的。释放锁只有两种情况: (1)同步代码块执行完毕; (2)抛出异常,同步器执行 monitor.exit() 释放锁。 |
锁的状态方面 | (1)Lock 可以判断锁的状态,它会提供tryLock() 方法来告诉我们是否获得锁成功;(2) tryLock() 有返回值,用来尝试获取锁,如果获取成功,则返回false;获取失败,返回false,这个方法无论如何都会立即返回。在拿不到锁时不会一直在那等待。 |
(1)在锁的状态方面,synchronized 完全是被动的,没法判断锁的状态; (2) synchronized 在拿不到锁时,则会阻塞在那里,一直等待。 |
锁的类型方面 | 基于Lock 接口,有多种锁的实现,如:(1)可重入锁: ReentrantLock (2)可重入读写锁: ReentrantReadWriteLock 等。针对可重入锁,还有公平锁和非公平锁之分。 |
对于synchronized 来说,它只是一个JVM层次的关键字,并不是一个接口,没有具体实现。 |
ReentrantLock
语法:
// 获取锁
reentrantLock.lock();
try {
// 临界区
} finally {
// 释放锁
reentrantLock.unlock();
}
2.4.3.1 可重入
可重入是指同一个线程如果首次获得了这把锁,那么因为它是这把锁的拥有者,因此有权利再次获取这把锁。如果是不可重入锁,那么第二次获得锁时,自己也会被锁挡住。
例如:
@Slf4j(topic = "ReentrantLock")
public class ReentrantLockDemo {
private static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) {
lock.lock();
try {
log.debug("enter main");
m1();
} finally {
lock.unlock();
}
}
public static void m1() {
lock.lock();
try {
log.debug("enter m1");
m2();
} finally {
lock.unlock();
}
}
public static void m2() {
lock.lock();
try {
log.debug("enter m2");
} finally {
lock.unlock();
}
}
}
运行结果:
2021-04-22 22:49:46.320 [main] DEBUG ReentrantLock - enter main
2021-04-22 22:49:46.322 [main] DEBUG ReentrantLock - enter m1
2021-04-22 22:49:46.322 [main] DEBUG ReentrantLock - enter m2
2.4.3.2 可打断
synchoronized
和reentrantLock.lock()
的锁,是不可被打断的,也就是说别的线程已经获得了锁,我的线程就需要一直等待下去,不能中断。
可被中断的锁,通过reentrantLock.lockInterruptibly()
获取的锁对象,可以通过调用阻塞线程的interrupt()
方法。
如果某个线程处于阻塞状态,可以调用其interrupt()
方法让其停止阻塞,获得锁失败。处于阻塞状态的线程,被打断了就不用阻塞了,直接停止就行。
可中断的锁,在一定程度上可以被动的减少死锁的概率,之所以被动,是因为我们需要手动调用阻塞线程的interrupt()
方法。
例如:
@Slf4j(topic = "LockInterruptibly")
public class LockInterruptiblyDemo {
private static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
try {
// 如果没有竞争那么此方法就会获取lock对象锁
// 如果有竞争就进入阻塞队列,可以被其他线程用interrupt方法打断
log.debug("尝试获得锁");
lock.lockInterruptibly();
} catch (InterruptedException e) {
e.printStackTrace();
log.debug("未获取锁");
return;
}
try {
log.debug("获取到锁");
} finally {
lock.unlock();
}
}, "t1");
lock.lock();
t1.start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("打断t1线程");
t1.interrupt();
}
}
运行结果:
2021-04-22 22:57:01.129 [t1] DEBUG LockInterruptibly - 尝试获得锁
2021-04-22 22:57:02.148 [main] DEBUG LockInterruptibly - 打断t1线程
2021-04-22 22:57:02.149 [t1] DEBUG LockInterruptibly - 未获取锁
java.lang.InterruptedException
2.4.3.3 锁超时
防止无限制等待,减少死锁。
reentrantLock.tryLock()
会返回锁是否成功。如果成功则返回true,反之则返回false。tryLock(long timeout, TimeUnit unit)
可以设置指定等待时间,其中timeout为最长等待时间,unit为时间单位。
例如:
@Slf4j(topic = "TryLock")
public class TryLockDemo {
private static final ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
log.debug("尝试获取锁");
try {
if (!lock.tryLock(2, TimeUnit.SECONDS)) { // 尝试等待2S,获取锁
log.debug("获取不到锁");
return;
}
} catch (InterruptedException e) {
e.printStackTrace();
log.debug("获取不到锁");
return;
}
try {
log.debug("获取到锁");
} finally {
lock.unlock();
}
}, "t1");
lock.lock();
log.debug("获取到锁");
t1.start();
TimeUnit.SECONDS.sleep(1);
lock.unlock();
log.debug("释放了锁");
}
}
运行结果:
2021-04-23 00:46:54.944 [main] DEBUG TryLock - 获取到锁
2021-04-23 00:46:54.946 [t1] DEBUG TryLock - 尝试获取锁
2021-04-23 00:46:55.949 [main] DEBUG TryLock - 释放了锁
2021-04-23 00:46:55.949 [t1] DEBUG TryLock - 获取到锁
解决哲学家就餐的问题
尝试获取筷子,获取不到就放下。
public class PhilosophersEatingDemo {
private final static int NUM = 5;
public static void main(String[] args) {
Chopstick[] c = new Chopstick[NUM];
for (int i = 0; i < NUM; i++) {
c[i] = new Chopstick(String.valueOf(i));
}
new Philosopher("苏格拉底", c[0], c[1]).start();
new Philosopher("柏拉图", c[1], c[2]).start();
new Philosopher("亚里士多德", c[2], c[3]).start();
new Philosopher("郝拉克利特", c[3], c[4]).start();
new Philosopher("阿基米德", c[4], c[0]).start();
}
}
// 哲学家
@Slf4j(topic = "Philosopher")
class Philosopher extends Thread {
final Chopstick left; // 左手筷子
final Chopstick right; // 右手筷子
public Philosopher(String name, Chopstick left, Chopstick right) {
super(name);
this.left = left;
this.right = right;
}
public void run() {
while (true) {
if (left.tryLock()) { // 尝试获得左手筷子
try {
if (right.tryLock()) { // 尝试获得右手筷子
try {
eat();
} finally {
right.unlock(); // 释放右手筷子
}
}
} finally {
left.unlock(); // 释放左手筷子
}
}
}
}
// 吃饭
private void eat() {
log.debug("eating...");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// 筷子
class Chopstick extends ReentrantLock {
String name;
public Chopstick(String name) {
this.name = name;
}
@Override
public String toString() {
return "筷子[" + "name='" + name + ']';
}
}
2.4.3.4 公平锁
- 可以把竞争的线程放在一个先进先出的阻塞队列上
- 只要持有锁的线程执行完了,唤醒阻塞队列中的下一个线程获取锁即可
- 先进入队列的线程先获取到锁
synchronized是不公平锁。一个线程持有锁,其他线程进入阻塞队列;当这个线程释放了锁,那么阻塞队列的线程就会一起去争抢,而不是按照先来先得的顺序。
ReentrantLock默认是不公平的,但是可以切换为公平锁,看一眼带参构造方法:
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
构造一个公平锁new ReentrantLock(true)
,公平锁没有必要,会降低并发度。
2.4.3.5 条件变量
synchronized中也有条件变量,就是waitSet
,当条件不满足时进入waitSet
,当条件不满足时进入waitSet
等待。ReentrantLock
的条件变量比synchronized
强大之处在于,它是支持多个条件变量的。
synchronized
让那些不满足条件的线程都在一间休息室等消息ReentrantLock
支持多间休息室,有专门等的休息室、专门等早餐的休息室、唤醒时也是按休息室来唤醒
使用流程:
await
前需要获得锁await
执行后,会释放锁,进入conditionObject
等待await
的线程被唤醒(或打断、或超时)重新竞争lock锁- 竞争lock锁成功后,从await后继续执行
示例:
@Slf4j(topic = "CorrectPostureStep4")
public class CorrectPostureStep4Demo {
static boolean hasCigarette = false;
static boolean hasTakeAway = false;
static ReentrantLock ROOM = new ReentrantLock();
// 等烟的休息室
static Condition waitCigaretteSet = ROOM.newCondition();
// 等外卖的休息室
static Condition waitTakeAwaySet = ROOM.newCondition();
public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
ROOM.lock();
try {
String name = Thread.currentThread().getName();
log.debug("有烟吗 ? [{}]", hasCigarette);
while (!hasCigarette) { // 防止虚假唤醒
log.debug("没有烟 => [{}]先歇会...", name);
waitCigaretteSet.await();
}
log.debug("再看看有烟吗 ? [{}]", hasCigarette);
if (hasCigarette) {
log.debug("有烟了 => [{}]开始干活ing", name);
} else {
log.debug("依然没有烟 => [{}]不干活了", name);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
ROOM.unlock();
}
}, "FlowerK").start();
new Thread(() -> {
ROOM.lock();
try {
String name = Thread.currentThread().getName();
log.debug("外卖是否送到 ? [{}]", hasTakeAway);
if (!hasTakeAway) {
log.debug("外卖还未送到 => [{}]先歇会...", name);
waitTakeAwaySet.await();
}
log.debug("再看看外卖是否送到 ? [{}]", hasTakeAway);
if (hasTakeAway) {
log.debug("外卖已经送到 => [{}]开始干活ing", name);
} else {
log.debug("外卖仍然未到 => [{}]不干活了", hasTakeAway);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
ROOM.unlock();
}
}, "RubbishK").start();
TimeUnit.SECONDS.sleep(1);
new Thread(() -> {
ROOM.lock();
try {
hasTakeAway = true;
waitTakeAwaySet.signal();
log.debug("外卖已送到");
} finally {
ROOM.unlock();
}
}, "美团外卖").start();
TimeUnit.SECONDS.sleep(1);
new Thread(() -> {
ROOM.lock();
try {
hasCigarette = true;
waitCigaretteSet.signal();
log.debug("烟已送到");
} finally {
ROOM.unlock();
}
}, "饿了么").start();
}
}
2.4.4 同步模式之顺序控制
2.4.4.1 固定输出
有两个线程,线程A打印1,线程B打印2。
要求:程序必须先打印2再打印1。
wait/notify版本
@Slf4j(topic = "OrderWaitNotify")
public class OrderWaitNotifyDemo {
private static final Object lock = new Object();
private static boolean bRunned = false; // 表示t2是否运行
public static void main(String[] args) {
Thread a = new Thread(() -> {
synchronized (lock) {
while (!bRunned) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
log.debug("1");
}, "A");
Thread b = new Thread(() -> {
synchronized (lock) {
log.debug("2");
bRunned = true;
lock.notify();
}
}, "B");
a.start();
b.start();
}
}
await/signal版本
@Slf4j(topic = "OrderReentrant")
public class OrderReentrantDemo {
private static final ReentrantLock lock = new ReentrantLock();
private static final Condition condition = lock.newCondition();
private static boolean bRunned = false;
public static void main(String[] args) {
Thread a = new Thread(() -> {
lock.lock();
try {
if (!bRunned) {
condition.await();
}
log.debug("1");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}, "A");
Thread b = new Thread(() -> {
lock.lock();
try {
log.debug("2");
bRunned = true;
condition.signal();
} finally {
lock.unlock();
}
}, "B");
a.start();
b.start();
}
}
park/unpark版本
@Slf4j(topic = "OrderParkUnpark")
public class OrderParkUnparkDemo {
public static void main(String[] args) {
Thread a = new Thread(() -> {
LockSupport.park();
log.debug("1");
}, "A");
Thread b = new Thread(() -> {
log.debug("2");
LockSupport.unpark(a);
}, "B");
a.start();
b.start();
}
}
2.4.4.2 交替输出
线程A输出a五次,线程B输出b五次,线程C输出c五次。
要求:程序交替输出5次abc
。
wait/notify版本
public class AlternateWaitNotifyDemo {
public static void main(String[] args) {
AlternateWaitNotify alternateWaitNotify = new AlternateWaitNotify(1, 5);
new Thread(() -> alternateWaitNotify.print("a", 1, 2), "A").start();
new Thread(() -> alternateWaitNotify.print("b", 2, 3), "B").start();
new Thread(() -> alternateWaitNotify.print("c", 3, 1), "C").start();
}
}
@Slf4j(topic = "AlternateWaitNotify")
class AlternateWaitNotify {
// 当前标记
private int flag;
// 循环次数
private final int loopNumber;
public AlternateWaitNotify(int flag, int loopNumber) {
this.flag = flag;
this.loopNumber = loopNumber;
}
/**
* 输出内容 等待标记 下个标记
* a 1 2
* b 2 3
* c 3 1
*
* @param str 输出内容
* @param waitFlag 等待标记
* @param nextFlag 下个标记
*/
public void print(String str, int waitFlag, int nextFlag) {
for (int i = 0; i < loopNumber; i++) {
synchronized (this) {
while (waitFlag != this.flag) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug(str);
this.flag = nextFlag;
this.notifyAll();
}
}
}
}
await/signal版本
public class AlternateAwaitSignalDemo {
public static void main(String[] args) {
AlternateAwaitSignal alternateAwaitSignal = new AlternateAwaitSignal(5);
Condition a = alternateAwaitSignal.newCondition();
Condition b = alternateAwaitSignal.newCondition();
Condition c = alternateAwaitSignal.newCondition();
new Thread(() -> alternateAwaitSignal.print("A", a, b)).start();
new Thread(() -> alternateAwaitSignal.print("B", b, c)).start();
new Thread(() -> alternateAwaitSignal.print("C", c, a)).start();
}
}
@Slf4j(topic = "AlternateAwaitSignal")
class AlternateAwaitSignal extends ReentrantLock {
private final int loopNumber;
public AlternateAwaitSignal(int loopNumber) {
this.loopNumber = loopNumber;
}
/**
* @param str 打印内容
* @param curr 当前休息室
* @param next 下个休息室
*/
public void print(String str, Condition curr, Condition next) {
for (int i = 0; i < loopNumber; i++) {
lock();
try {
curr.await();
log.debug(str);
next.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
unlock();
}
}
}
}
park/unpark版本
public class AlternateParkUnparkDemo {
static Thread t1;
static Thread t2;
static Thread t3;
public static void main(String[] args) {
AlternateParkUnpark alternateParkUnpark = new AlternateParkUnpark(5);
t1 = new Thread(() -> alternateParkUnpark.print("a", t2), "t1");
t2 = new Thread(() -> alternateParkUnpark.print("b", t3), "t2");
t3 = new Thread(() -> alternateParkUnpark.print("c", t1), "t3");
t1.start();
t2.start();
t3.start();
LockSupport.unpark(t1);
}
}
@Slf4j(topic = "AlternateParkUnpark")
class AlternateParkUnpark {
private final int loopNumber;
public AlternateParkUnpark(int loopNumber) {
this.loopNumber = loopNumber;
}
public void print(String str, Thread next) {
for (int i = 0; i < loopNumber; i++) {
LockSupport.park();
log.debug(str);
LockSupport.unpark(next);
}
}
}