汪文君——多线程与架构设计
Xms 是指设定程序启动时占用内存大小。一般来讲,大点,程序会启动的快一点,但是也可能会导致机器暂时间变慢。
Xmx 是指设定程序运行期间最大可占用的内存大小。如果程序运行需要占用更多的内存,超出了这个设置值,就会抛出OutOfMemory异常。
Xss 是指设定每个线程的堆栈大小。这个就要依据程序,看一个线程大约需要占用多少内存,可能会有多少线程同时运行等。
以上三个参数的设置都是默认以Byte为单位的,也可以在数字后面添加[k/K]或者[m/M]来表示KB或者MB。而且,超过机器本身的内存大小也是不可以的,否则就等着机器变慢而不是程序变慢了。
并发和并行:
并发是两个队列交替使用一台咖啡机,并行是两个队列同时使用两台咖啡机,如果串行,一个队列使用一台咖啡机,那么哪怕前面那个人便秘了去厕所呆半天,后面的人也只能死等着他回来才能去接咖啡,这效率无疑是最低的。
并发和并行都可以是很多个线程,就看这些线程能不能同时被(多个)cpu执行,如果可以就说明是并行,而并发是多个线程被(一个)cpu 轮流切换着执行。
一. 多线程基础
1. 快速认识线程
1.1描述
进程:对计算机来说 每一个任务就是一个进程(Process),在每个进程内至少要有一个线程(Thread)在运行。
线程:是程序执行的一个路径,每个线程都有自己的局部变量表、程序计数器(指向正在执行的指令指针)以及各自的生命周期。
现代操作系统中一般不止一个线程在运行,当启动了一个Java虚拟机(JVM)时,从操作系统开始就会创建一个新的进程(JVM进程),JVM进程中会派生或创建很多线程。
1.2 尝试并行运行
通过 【匿名内部类的方式创建线程,并重写其中的run方法】
run() :写入需 并行运行的逻辑
start(): 运行线程(开始执行run()中逻辑)
package com.zoro.concurrent.chapter01;
import java.util.concurrent.TimeUnit;
/**
* @author yx.jiang
* @date 2021/7/27 10:26
*/
public class TryConcurrency {
public static void main(String[] args) {
//看新闻
new Thread() {
@Override
public void run() {
browseNews();
}
}.start();
//听歌
enjoyMusic();
}
/**
* 看新闻
*/
public static void browseNews() {
for (; ; ) {
System.out.println("看新闻");
sleep(1);
}
}
/**
* 听歌
*/
public static void enjoyMusic() {
for (; ; ) {
System.out.println("听歌");
sleep(1);
}
}
/**
* 睡眠
*
* @param seconds 时长
*/
private static void sleep(int seconds) {
try {
TimeUnit.SECONDS.sleep(seconds);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
//输出如下:
听歌
看新闻
看新闻
听歌
看新闻
听歌
看新闻
听歌
看新闻
听歌
……
Java8可替换为①②:
①===> new _Thread(() -> browseNews()).start();
②===>_ _new _Thread(TryConcurrency::browseNews).start();
1.3 线程生命周期
5个主要阶段java.lang.Thread.State
- NEW
- RUNNABLE
- RUNNING
- BLOCKED
TERMINATED
```java
/**
* Thread state for a thread which has not yet started.
*/
NEW,
/**
* Thread state for a runnable thread. A thread in the runnable
* state is executing in the Java virtual machine but it may
* be waiting for other resources from the operating system
* such as processor.
*/
RUNNABLE,
/**
* Thread state for a thread blocked waiting for a monitor lock.
* A thread in the blocked state is waiting for a monitor lock
* to enter a synchronized block/method or
* reenter a synchronized block/method after calling
* {@link Object#wait() Object.wait}.
*/
BLOCKED,
/**
* Thread state for a waiting thread.
* A thread is in the waiting state due to calling one of the
* following methods:
* <ul>
* <li>{@link Object#wait() Object.wait} with no timeout</li>
* <li>{@link #join() Thread.join} with no timeout</li>
* <li>{@link LockSupport#park() LockSupport.park}</li>
* </ul>
*
* <p>A thread in the waiting state is waiting for another thread to
* perform a particular action.
*
* For example, a thread that has called <tt>Object.wait()</tt>
* on an object is waiting for another thread to call
* <tt>Object.notify()</tt> or <tt>Object.notifyAll()</tt> on
* that object. A thread that has called <tt>Thread.join()</tt>
* is waiting for a specified thread to terminate.
*/
WAITING,
/**
* Thread state for a waiting thread with a specified waiting time.
* A thread is in the timed waiting state due to calling one of
* the following methods with a specified positive waiting time:
* <ul>
* <li>{@link #sleep Thread.sleep}</li>
* <li>{@link Object#wait(long) Object.wait} with timeout</li>
* <li>{@link #join(long) Thread.join} with timeout</li>
* <li>{@link LockSupport#parkNanos LockSupport.parkNanos}</li>
* <li>{@link LockSupport#parkUntil LockSupport.parkUntil}</li>
* </ul>
*/
TIMED_WAITING,
/**
* Thread state for a terminated thread.
* The thread has completed execution.
*/
TERMINATED;
1.3.1 NEW
1.3.2 RUNNABLE
1.3.3 RUNNING
1.3.4 BLOCKED
1.3.5 TERMINATED
1.4 start() : 模板模式在Thread中的应用
java.lang.Thread#start()
Causes this thread to begin execution; the Java Virtual Machine calls the
run
method of this thread. 执行线程时,JVM会调用线程的run(),run()是被JNI方法start0()执行的。
/**
* Causes this thread to begin execution; the Java Virtual Machine
* calls the <code>run</code> method of this thread.
* <p>
* The result is that two threads are running concurrently: the
* current thread (which returns from the call to the
* <code>start</code> method) and the other thread (which executes its
* <code>run</code> method).
* <p>
* It is never legal to start a thread more than once.
* In particular, a thread may not be restarted once it has completed
* execution.
*
* @exception IllegalThreadStateException if the thread was already
* started.
* @see #run()
* @see #stop()
*/
public synchronized void start() {
/**
* This method is not invoked for the main method thread or "system"
* group threads created/set up by the VM. Any new functionality added
* to this method in the future may have to also be added to the VM.
*
* A zero status value corresponds to state "NEW".
*/
if (threadStatus != 0)
throw new IllegalThreadStateException();
/* Notify the group that this thread is about to be started
* so that it can be added to the group's list of threads
* and the group's unstarted count can be decremented. */
group.add(this);
boolean started = false;
try {
start0();
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
/* do nothing. If start0 threw a Throwable then
it will be passed up the call stack */
}
}
}
private native void start0();
结论:
- Thread被构造后的NEW状态,事实上threadStatus这个内部属性为0;
- 不能两次启动Thread,否则会出现IllegalThreadStateException异常;
- 线程启动后会被加入一个ThreadGroup中,TODO
- 一个线程生命周期结束(TERMINATED),再次调用start方法是不允许的,即TERMINATED是不能再回到RUNNABLE/RUNNING状态的;
1.4.1 模板模式
严格来讲:创建线程仅有一种方式; 线程的执行单元是run()方法,实现线程执行单元有两种方法,即:继承Thread并重写run(),实现Runnable接口实现自己的业务逻辑;简例:
```java package com.zoro.concurrent.chapter01;
/**
- 模板方法-例 *
- @author yx.jiang
@date 2021/7/27 10:26 / public class TemplateMethod { /*
- 打印 *
@param message 信息 */ public final void print(String message) { System.out.println(“##############”); wrapPrint(message); System.out.println(“##############”); }
/**
- 转换-打印 *
@param message 信息 */ protected void wrapPrint(String message) {
}
public static void main(String[] args) { new TemplateMethod() {
@Override
protected void wrapPrint(String message) {
System.out.println("*" + message + "*");
}
}.print(“Hello Thread”);
new TemplateMethod() {
@Override
protected void wrapPrint(String message) {
System.out.println("-" + message + "-");
}
}.print(“Hello Thread”); } }
//输出
#
Hello Thread
#
#
-Hello Thread-
#
<a name="qTI03"></a>
##### 售票(extends Thread)
TODOindex的线程安全问题
```java
package com.zoro.concurrent.chapter01;
/**
* 售票窗口-extends Thread
*
* @author yx.jiang
* @date 2021/7/27 10:26
*/
public class TicketWindow extends Thread {
private final String name;
private static final int MAX = 50;
private static int index = 1;
public TicketWindow(String name) {
this.name = name;
}
@Override
public void run() {
while (index <= MAX) {
System.out.println("柜台: " + name + ",当前号码是:" + (index++));
}
}
/**
* index存在线程安全问题
*
* {@code 共享资源很多、 共享资源要经过一系列复杂运算。。。}
* 显然不可能用{@code static}一个个修饰,并且static修饰的变量生命周期很长【常量池、静态成员变量、全局变量——1.8后由 永久代——>堆】
*/
public static void main(String[] args) {
TicketWindow tw1 = new TicketWindow("一号");
tw1.start();
TicketWindow tw2 = new TicketWindow("二号");
tw2.start();
TicketWindow tw3 = new TicketWindow("三号");
tw3.start();
TicketWindow tw4 = new TicketWindow("四号");
tw4.start();
}
}
1.4.2 策略模式
注:共享资源index仍存在线程安全问题
售票 implements Runnable
Thread类中的run是不能共享的,而Runnable很容易实现,同一Runnable构造不同的Thread实例;
package com.zoro.concurrent.chapter01;
/**
* 售票窗口——implements Runnable
*
* @author yx.jiang
* @date 2021/7/27 10:26
*/
public class TicketWindowRunnable implements Runnable {
private static final int MAX = 50;
private static int index = 1;
@Override
public void run() {
while (index <= MAX) {
System.out.println(Thread.currentThread().getName() + " 的号码是:" + (index++));
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* index存在线程安全问题
*/
public static void main(String[] args) {
TicketWindowRunnable ticketWindowRunnable = new TicketWindowRunnable();
Thread t1 = new Thread(ticketWindowRunnable, "1号");
Thread t2 = new Thread(ticketWindowRunnable, "2号");
Thread t3 = new Thread(ticketWindowRunnable, "3号");
Thread t4 = new Thread(ticketWindowRunnable, "4号");
t1.start();
t2.start();
t3.start();
t4.start();
}
}
2. Thread构造函数
2.1 线程的名称
2.1.1 默认名称
com.zoro.concurrent.chapter01.ThreadConstructor#defaultThreadName
2.1.2 自定义名称
com.zoro.concurrent.chapter01.ThreadConstructor#costumeThreadName
package com.zoro.concurrent.chapter01;
import java.util.stream.IntStream;
/**
* Thread的构造方法
*
* @author yx.jiang
* @date 2021/7/27 10:26
*/
public class ThreadConstructor {
private static final String PREFIX = "ALEX-";
public static void main(String[] args) {
// defaultThreadName();
costumeThreadName();
}
/**
* 自定义线程名
*/
public static void costumeThreadName() {
IntStream.range(0, 5).mapToObj(ThreadConstructor::createThread).forEach(Thread::start);
/**
* 输出结果如下:
* ALEX-0
* ALEX-1
* ALEX-2
* ALEX-3
* ALEX-4
*/
}
/**
* 创建线程
* @param intName 数字
* @return 自定义名称的线程
*/
public static Thread createThread(int intName) {
return new Thread(() -> System.out.println(Thread.currentThread().getName()), PREFIX + intName);
}
/**
* 默认线程名
*/
private static void defaultThreadName() {
IntStream.range(0, 5).boxed()
.map(i -> new Thread(
() -> System.out.println(Thread.currentThread().getName()))
)
.forEach(Thread::start);
/**
* 输出结果如下:
* Thread-0
* Thread-1
* Thread-4
* Thread-3
* Thread-2
*/
}
}
2.1.3 修改线程名称
2.2 线程的父子关系
Thread的所有构造方法最终会去调用静态init()方法,可看出每一个新创建的线程都有一个父线程。
private void init(ThreadGroup g, Runnable target, String name,
long stackSize, AccessControlContext acc,
boolean inheritThreadLocals) {
if (name == null) {
throw new NullPointerException("name cannot be null");
}
this.name = name;
Thread parent = currentThread();
SecurityManager security = System.getSecurityManager();
……
}
线程最初状态为NEW ,没有执行start()之前,它只能算是Thread的实例,并不意味着一个新的线程被创建,因此currentThread()代表的将会是创建它的那个线程。
- 一个线程的创建肯定是另一个线程完成的。
-
2.3 Thread、ThreadGroup
main线程所在的ThreadGroup称为“main”;
- 构造一个线程时未显示指定ThreadGroup,那么它将会和父线程同属于一个ThreadGroup;
2.4 Thread、JVM虚拟机栈
2.4.1 Thread 与 Stacksize
栈内存通过-xss
设置;一般地,stacksize越大,递归深度越深;stacksize越小,创建的线程数越多;
2.4.2 JVM内存结构
- 程序计数器线程私有
- java虚拟机栈线程私有
- 本地方法栈线程私有
- 堆内存
- 堆是GC的重要区域(含:新生代【Eden区,From Survivor区,To Survivor区】、老年代);
- 方法区
- 主要用于存储:已被虚拟机加载的类信息、常量、静态变量、即时编译器(JIT)编译后的代码等数据;
- 在HotSpot中,方法区还会被细分为持久代和代码缓存区,代码缓存区用于存储编译后的本地代码(和硬件相关)以及JIT(Just In Time)编译器生成的代码。不同的JVM通常有不同的实现。
- Java8元空间
- 持久代被彻底删除,取而代之的是元空间。
- 元空间是堆内存的一部分,JVM为每个类加载器分配一块内存列表,进行线性分配,块的大小取决于类加载器的类型,sun/反射/代理对应的类加载器块会小一点,之前版本会单独卸载回收某个类,现在则是GC过程中发现某个类加载器已经具备回收的条件,则会将整个类加载器相关的元空间全部回收,以减少内存碎片,节省GC扫描和压缩的时间。
2.4.3 Thread与虚拟机栈
2.5 守护线程
JVM中没有一个 非守护线程,则JVM的进程会退出。
main线程是守护线程。3. Thread API
3.1 sleep
3.1.1 介绍
休眠时,不会放弃monitor锁的所有权。是个可中断(interrupt)方法3.1.2 TimeUnit代替Thread.sleep
Thread.sleep能完成的TimeUnit同样能完成,而且更加清晰,对sleep提供了更好的封装。3.2 yield
3.2.1 介绍
提醒调度器我愿意放弃当前的cpu资源,如果CPU的资源不紧张,就会忽略这个提醒。3.2.2 yield和sleep
- sleep会导致当前线程暂停指定的时间,没有CPU时间片的消耗。
- yield只是堆CPU调度器的一个提示,如果CPU调度器没有忽略这个提示,它会导致线程上下文的切换。
- sleep会导致线程短暂的block,会在给定的时间内释放CPU资源。
- yield会使RUNNING状态的Thread进入RUNNABLE状态(如果CPU调度器没有忽略这个提示)。
- sleep几乎百分之百的完成了指定时间的休眠,而yield的提示并不能一定担保。
一个线程sleep另一个线程interrupt会捕获到中断信号,而yield则不会。
3.3 设置线程优先级
thread.setPriority(int)
取值1<= num <=10之间。对于root用户,它会hint操作系统你想要设置的优先级,否则它会被忽略。
- 如果CPU比较忙,设置优先级可能会获取更多的CPU时间片,但是闲时优先级的高低几乎不会有任何作用。
main线程的优先级为5,线程start前设置方能生效,子线程的优先级不能大于父线程的优先级。
3.4 获取线程ID
3.5 获取当前线程
3.6 设置线程上下文类加载器
3.7 interrupt
3.7.1 interrupt
3.7.2 isInterrupted
3.7.3 interrupted
3.7.4 interrupted注意事项
3.8 线程join
3.8.1 join详解
join方法会使当前线程永远的等待下去,直到被另一个线程中断,或者join的线程执行结束;
package com.zoro.concurrent.chapter01;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
* Thread-join
*
* @author yx.jiang
* @date 2021/7/27 10:26
*/
public class ThreadJoin {
/**
* ①join输出结果:
* <p>thread1.join,thread2.join;main线程阻塞直至thread1、thread2执行完毕!</p>
* ……
* 1 # 8
* 2 # 8
* 2 # 9
* 1 # 9
* main # 0
* main # 1
* main # 2
* ……
*
* ② 倘若注释掉 thread.join
* 那么输出内容为:
* <p>thread1,thread2,main交替输出</p>
* ……
* 1 # 4
* 2 # 4
* main # 4
* 1 # 5
* 2 # 5
* main # 5
* 1 # 6
* 2 # 6
* main # 6
* ……
*/
public static void main(String[] args) throws InterruptedException{
List<Thread> threadList = IntStream.range(1, 3).mapToObj(ThreadJoin::createThread).collect(Collectors.toList());
threadList.forEach(Thread::start);
/*for (Thread thread : threadList) {
thread.join();
}*/
for (int i = 0; i < 10; i++) {
System.out.println(Thread.currentThread().getName() + " # " + i);
shortSleep();
}
}
/**
* 创建线程
* @param seq 线程名
* @return
*/
private static Thread createThread(int seq) {
return new Thread(() -> {
for (int i = 0; i < 10; i++) {
System.out.println(Thread.currentThread().getName() + " # " + i);
shortSleep();
}
}, String.valueOf(seq));
}
/**
* 短暂睡眠
*/
private static void shortSleep() {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
3.8.2 join结合实战
查询各个航班公司的航班信息【接口或调用方式不同】,可通过多线程‘并行’查询。
package com.zoro.concurrent.chapter03;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
/**
* 航班查询-例
*
* @author yx.jiang
* @date 2021/7/27 10:26
*/
public class FightQueryExample {
/**
* 合作的航空公司
*/
public static List<String> fightCompany = Arrays.asList("CSA","CEA", "HNA");
public static void main(String[] args) {
List<String> results = search("北京", "上海");
System.out.println("=======查询结果=======");
results.forEach(System.out::println);
}
/**
* 查询并汇总
*
* @param origin 出发地
* @param destination 目的地
* @return 各航空公司航班汇总数据
*/
public static List<String> search(String origin, String destination) {
final List<String> result = new ArrayList<>();
//创建【各个航空公司】航班数据的 查询线程
List<FightQueryTask> taskList = fightCompany.stream()
.map(airLine -> createSearchTask(airLine, origin, destination))
.collect(Collectors.toList());
//启用线程
taskList.forEach(FightQueryTask::start);
taskList.forEach(task -> {
try {
task.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
//在此之前,当前线程会被阻塞,获取每一个查询线程的结果,并加入到result中
taskList.stream().map(FightQueryTask::get).forEach(result::addAll);
return result;
}
public static FightQueryTask createSearchTask(String airline, String origin, String destination){
return new FightQueryTask(airline, origin, destination);
}
}
/**
* 输出结果:
* [ CSA ]-query 从 北京 到 上海
* [ HNA ]-query 从 北京 到 上海
* [ CEA ]-query 从 北京 到 上海
* 航班:[ HNA ] 列表查询成功
* 航班:[ CSA ] 列表查询成功
* 航班:[ CEA ] 列表查询成功
* =======查询结果=======
* [ CSA ]-2
* [ CEA ]-6
* [ HNA ]-2
*/
3.9关闭线程
废弃的方法 stop(), 关闭时不会释放掉monitor的锁。
3.9.1 正常关闭
- 线程结束生命周期 正常结束
- 捕获中断信号关闭线程 ```java package com.zoro.concurrent.chapter03;
import java.util.concurrent.TimeUnit;
/**
- 线程中断退出 *
- @author yx.jiang
@date 2021/7/27 10:26 */ public class InterruptThreadExit {
/**
- 以下两种方式均会导致线程的正常结束,输出如下: *
- 开始工作~
- 系统即将中断
工作完毕 */ public static void main(String[] args) throws InterruptedException { Thread thread = new Thread() {
/*@Override
public void run() {
System.out.println("开始工作~");
while (!isInterrupted()) {
//working
}
System.out.println("工作完毕");
}*/
@Override
public void run() {
System.out.println("开始工作~");
for (;;){
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
break;
}
}
System.out.println("工作完毕");
}
};
thread.start(); TimeUnit.SECONDS.sleep(1); System.out.println(“系统即将中断”); thread.interrupt(); } } ```
- 使用volatile开关控制
由于线程的interrupt表示可能被擦除,或者逻辑单元中不会调用任何可中断方法,所以使用volatile修饰的开关flag关闭线程也是一种做法。
package com.zoro.concurrent.chapter03;
import java.util.concurrent.TimeUnit;
/**
* 线程中断退出
*
* @author yx.jiang
* @date 2021/7/27 10:26
*/
public class FlagThreadExit {
static class MyTask extends Thread{
/**
* 已关闭
*/
private volatile boolean closed = false;
@Override
public void run() {
System.out.println("开始工作~");
while (!closed && !isInterrupted()){
//正在运行
}
System.out.println("工作完毕");
}
public void setClosed(boolean closed) {
this.closed = closed;
}
}
/**
* 输出分别为:
* ①中断-interrupt
* 开始工作~
* 系统即将中断
* 已中断
* 工作完毕
*
* ②关闭-closed
* 开始工作~
* 系统即将关闭
* 系统已关闭
* 工作完毕
*/
public static void main(String[] args) throws InterruptedException {
MyTask myTask = new MyTask();
myTask.start();
TimeUnit.SECONDS.sleep(1);
System.out.println("系统即将中断");
myTask.interrupt();
System.out.println("已中断");
// System.out.println("系统即将关闭");
// myTask.setClosed(true);
// System.out.println("系统已关闭");
}
}
3.9.2 异常退出
一个线程的执行单元(Thread/Runnable中的run())中不允许抛出checked异常,如在线程运行过程中需要捕获checked异常来判断是否有运行下去的必要,可以通过将checked异常封装为unchecked异常(RuntimeException)抛出进而结束线程的生命周期。
3.9.3 进程假死
线程虽然存在,但没有任何输出、不进行任何作业
阻塞、死锁等。
通过jconsole、jstack、jvisualvm等工具进行死锁的判断。
4. 线程安全与数据同步
4.1 数据同步
4.1.1 数据不一致问题
将号码最大值改为500。
上文例子中的线程安全问题
- 某个号码被略过
- 某个号码出现多次
- 号码超出最大值500
4.2 synchronized关键字
在jdk1.5前,要解决上述问题需使用synchronized关键字(提供了一种排他机制)。4.2.1 什么是synchronized
- synchronized关键字提供了一种锁的机制,能够确保共享变量的互斥访问,从而防止数据不一致问题的出现。
- synchronized关键字包括monitor enter 和 monitor exit两个JVM指令,它能够保证在任何时候任何线程执行到monitor enter成功之前都必须从主内存中获取数据,而不是从缓存中,在monitor exit运行成功之后,共享变量被更新后的值必须刷入主内存。
- synchronized的指令严格遵守java happens-before 规则,一个monitor exit 指令之前必定要有一个monitor enter。
4.2.2 synchronized的用法
synchronized可以用于对代码块或方法进行修饰,而不能够用于对class以及变量进行修饰。
- 同步方法
[default|public|private|protected] synchronized [static] type method()
。示例如下:
_public synchronized void _sync(){
……
}
_public static synchronized void _staticSync(){
……
}
- 同步代码块
private final _Object MUTEX = _new _Object();
_public void _sync(){
_synchronized (MUTEX) {
……
}
}
改写叫号程序:
无论运行多少次,不再出现数据不一致的问题。
package com.zoro.concurrent.chapter04;
/**
* 售票窗口——implements Runnable
*
* @author yx.jiang
* @date 2021/7/27 10:26
*/
public class TicketWindowRunnable implements Runnable {
private static final int MAX = 50;
private static int index = 1;
private static final Object MUTEX = new Object();
@Override
public void run() {
synchronized (MUTEX) {
while (index <= MAX) {
System.out.println(Thread.currentThread().getName() + " 的号码是:" + (index++));
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
* index存在线程安全问题
*/
public static void main(String[] args) {
TicketWindowRunnable ticketWindowRunnable = new TicketWindowRunnable();
Thread t1 = new Thread(ticketWindowRunnable, "1号");
Thread t2 = new Thread(ticketWindowRunnable, "2号");
Thread t3 = new Thread(ticketWindowRunnable, "3号");
Thread t4 = new Thread(ticketWindowRunnable, "4号");
t1.start();
t2.start();
t3.start();
t4.start();
}
}
4.3 深入synchronized关键字
4.3.1 线程堆栈分析
synchronization 关键字提供了一种互斥机制(在同一时刻只能有一个线程访问同步资源),准确的讲:是某线程获取了与mutex关联的monitor锁。(将synchronized(mutex)称为锁)
package com.zoro.concurrent.chapter04;
import java.util.concurrent.TimeUnit;
/**
* mutex锁
*
* @author yx.jiang
* @date 2021/7/27 10:26
*/
public class Mutex {
private static final Object MUTEX = new Object();
/**
* 使用资源
*/
public void accessResource() {
synchronized (MUTEX) {
try {
TimeUnit.MINUTES.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
final Mutex mutex = new Mutex();
for (int i = 0; i < 5; i++) {
new Thread(mutex::accessResource).start();
}
}
}
Thread-1 拥有资源
Thead-0、Thead-2、Thead-3、Thead-4 正在BLOCKED
使用jstack命令打印进程的堆栈信息:jstack <pid>
如: jstack 32776
32776对应 jconsole 中的 pid
可以看到仅Thread-0拥有,而其他线程均在阻塞等待。
4.3.2 JVM指令分析
使用 jdk 命令 javap 对 Mutex class 进行反汇编,输出了大量的JVM指令,可以发现monitor enter 和 monitor exit是成对出现的(有时会出现一个monitor enter对应多个monitor exit),但是每一个monitor exit前必有对应的monitor enter。
- 在src目录下键入如下命令并运行:
javap -c com.zoro.concurrent.chapter04.Mutex
如若在 com.zoro.concurrent.chapter04
下 直接键入 javap -c Mutex
- 亦可通过idea插件jclasslib bytecode viewer查看
查看之前先编译build
Compiled from "Mutex.java"
public class com.zoro.concurrent.chapter04.Mutex {
public com.zoro.concurrent.chapter04.Mutex();
Code:
0: aload_0
1: invokespecial #1 // Method java/lang/Object."<init>":()V
4: return
public void accessResource();
Code:
0: getstatic #2 // Field MUTEX:Ljava/lang/Object;
3: dup
4: astore_1
5: monitorenter
6: getstatic #3 // Field java/util/concurrent/TimeUnit.MINUTES:Ljava/util/concurrent/TimeUnit;
9: ldc2_w #4 // long 10l
12: invokevirtual #6 // Method java/util/concurrent/TimeUnit.sleep:(J)V
15: goto 23
18: astore_2
19: aload_2
20: invokevirtual #8 // Method java/lang/InterruptedException.printStackTrace:()V
23: aload_1
24: monitorexit
25: goto 33
28: astore_3
29: aload_1
30: monitorexit
31: aload_3
32: athrow
33: return
Exception table:
from to target type
6 15 18 Class java/lang/InterruptedException
6 25 28 any
28 31 28 any
public static void main(java.lang.String[]);
Code:
0: new #9 // class com/zoro/concurrent/chapter04/Mutex
3: dup
4: invokespecial #10 // Method "<init>":()V
7: astore_1
8: iconst_0
9: istore_2
10: iload_2
11: iconst_5
12: if_icmpge 42
15: new #11 // class java/lang/Thread
18: dup
19: aload_1
20: dup
21: invokevirtual #12 // Method java/lang/Object.getClass:()Ljava/lang/Class;
24: pop
25: invokedynamic #13, 0 // InvokeDynamic #0:run:(Lcom/zoro/concurrent/chapter04/Mutex;)Ljava/lang/Runnable;
30: invokespecial #14 // Method java/lang/Thread."<init>":(Ljava/lang/Runnable;)V
33: invokevirtual #15 // Method java/lang/Thread.start:()V
36: iinc 2, 1
39: goto 10
42: return
static {};
Code:
0: new #16 // class java/lang/Object
3: dup
4: invokespecial #1 // Method java/lang/Object."<init>":()V
7: putstatic #2 // Field MUTEX:Ljava/lang/Object;
10: return
}
选片段,着重分析:
①获取到MUTEX引用,然后执行②monitorenter JVM指令,休眠结束后goto至③monitorexit的位置(astore
public void accessResource();
Code:
0: getstatic ①获取MUTEX
3: dup
4: astore_1
5: monitorenter ②执行monitorenter JVM指令
6: getstatic #3 // Field java/util/concurrent/TimeUnit.MINUTES:Ljava/util/concurrent/TimeUnit;
9: ldc2_w #4 // long 10l
12: invokevirtual #6 // Method java/util/concurrent/TimeUnit.sleep:(J)V
15: goto 23 ③跳转到23行
18: astore_2
19: aload_2
20: invokevirtual #8 // Method java/lang/InterruptedException.printStackTrace:()V
23: aload_1 ④
24: monitorexit ⑤执行monitorexit JVM指令
25: goto 33
28: astore_3
29: aload_1
30: monitorexit
31: aload_3
32: athrow
33: return
- Monitorenter
每个对象都与一个monitor相关联,一个monitor的lock的锁只能被一个线程在同一时间获得,在一个线程尝试获得与对象关联monitor的所有权会发生如下的几件事:
- 如果monitor的计数器为0,则意味着monitor的lock还没被获得,某个线程获得后立即对该计数器加一,自此该线程就是这个monitor的所有者了。
- 如果一个已经拥有该monitor所有权的线程重入,就会导致monitor的计数器再次累加,
- 如果monitor已经被其他线程所拥有,则其他线程尝试获取该monitor的所有权时,会被陷入阻塞状态直到monitor计数器变为0,才能再次尝试获取对monitor的所有权。
- Monitorexit
释放对monitor的所有权,想要释放对某个对象关联的monitor的所有权的前提是,你曾经获取了所有权。释放monitor所有权就是将monitor的计数器减一,如果计数器的结果为0,就意味着该线程不再拥有该monitor的所有权,也就是解锁。同时被该monitor block的线程将再次尝试获取对该monitor的所有权。
4.3.3 使用synchronized要注意的问题
与monitor关联的对象不能为空
private final Object mutex = null;
public void syncMethod() {
synchronized (mutex){
//TODO
}
}
synchronized作用域太大
对整个线程的执行单元进行了synchronized同步,从而丧失了并发能力,synchronized应尽可能地只作用于共享资源(数据)的读写作用域。
public class Task implements Runnable{
@Override
public synchronized void run() {
//TODO
}
}
- 不同的synchronized企图锁相同的方法
构造了5个线程,也构造了5个Runnable实例,Runnable作为线程执行逻辑单元传递给Thread,synchronized根本互斥不了与之对应的作用域,线程之间进行monitor lock的争抢只能发生在与monitor关联的同一个引用上,下面的代码每一个线程争抢的monitor关联的引用都是彼此独立的,so不能起到互斥作用。
public static class Task implements Runnable{
private final Object MUTEX = new Object();
@Override
public synchronized void run() {
//TODO
synchronized (MUTEX) {
//TODO
}
}
}
public static void main(String[] args) {
for (int i = 0; i < 5; i++) {
new Thread(Task::new).start();
}
}
多个锁的交叉导致死锁
private final Object MUTEX_READ = new Object();
private final Object MUTEX_WRITE = new Object();
public void read () {
synchronized (MUTEX_READ) {
synchronized (MUTEX_WRITE) {
//...
}
}
}
public void write () {
synchronized (MUTEX_WRITE) {
synchronized (MUTEX_READ) {
//...
}
}
}
4.4 This Monitor 和 Class Monitor
4.4.1 this monitor
```java package com.zoro.concurrent.chapter04;
import java.util.concurrent.TimeUnit;
/**
- this monitor
- synchronized 修饰同一个对象的不同方法 *
- @author yx.jiang
@date 2021/7/27 10:26 */ public class ThisMonitor01 { public synchronized void method1() {
System.out.println(Thread.currentThread().getName() + " 进入method1");
try {
TimeUnit.MINUTES.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public synchronized void method2() {
System.out.println(Thread.currentThread().getName() + " 进入method2");
try {
TimeUnit.MINUTES.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
ThisMonitor01 thisMonitor01 = new ThisMonitor01();
new Thread(thisMonitor01::method1, "T1").start();
new Thread(thisMonitor01::method2, "T2").start();
} }
//输出: T1 进入method1
`jconsole` 查看pid和线程信息<br /><br /><br /><br />`jstack pid` 分析线程的堆栈信息:<br /><br />修改为: **synchronized修饰method1; method2中synchronized同步代码块,锁的是this**
```java
package com.zoro.concurrent.chapter04;
import java.util.concurrent.TimeUnit;
/**
* this monitor
* synchronized修饰method1; method2中synchronized同步代码块,锁的是this
*
* @author yx.jiang
* @date 2021/7/27 10:26
*/
public class ThisMonitor02 {
public synchronized void method1() {
System.out.println(Thread.currentThread().getName() + " 进入method1");
try {
TimeUnit.MINUTES.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public synchronized void method2() {
System.out.println(Thread.currentThread().getName() + " 进入method2");
try {
TimeUnit.MINUTES.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
ThisMonitor02 thisMonitor01 = new ThisMonitor02();
new Thread(thisMonitor01::method1, "T1").start();
new Thread(thisMonitor01::method2, "T2").start();
}
}
4.4.2 class monitor
package com.zoro.concurrent.chapter04;
import java.util.concurrent.TimeUnit;
/**
* class monitor
*
* @author yx.jiang
* @date 2021/7/27 10:26
*/
public class ClassMonitor01 {
public static synchronized void method1() {
System.out.println(Thread.currentThread().getName() + " 进入method1 ");
try {
TimeUnit.MINUTES.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static synchronized void method2() {
System.out.println(Thread.currentThread().getName() + " 进入method2 ");
try {
TimeUnit.MINUTES.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
new Thread(ClassMonitor01::method1, "T1").start();
new Thread(ClassMonitor01::method2, "T2").start();
}
}
//输出
T1 进入method1
更改method02为同步代码块,锁ClassMonitor02.class
package com.zoro.concurrent.chapter04;
import java.util.concurrent.TimeUnit;
/**
* class monitor
*
* @author yx.jiang
* @date 2021/7/27 10:26
*/
public class ClassMonitor02 {
public static synchronized void method1() {
System.out.println(Thread.currentThread().getName() + " 进入method1 ");
try {
TimeUnit.MINUTES.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void method2() {
synchronized (ClassMonitor02.class) {
System.out.println(Thread.currentThread().getName() + " 进入method2 ");
try {
TimeUnit.MINUTES.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
new Thread(ClassMonitor02::method1, "T1").start();
new Thread(ClassMonitor02::method2, "T2").start();
}
}
4.5 程序死锁的原因及诊断
4.5.1 程序死锁
- 交叉锁可导致程序出现死锁
线程A持有资源R1的锁等待获取R2的锁,线程B持有R2的锁等待获取R1的锁.
- 内存不足
两个线程T1,T2,执行某个任务,T1获取10Mb内存,T2获取了20Mb内存,如果每个执行单元都需要30Mb的内存,但是剩余的可用内存刚好为20Mb,那么两个线程有可能都在等彼此能够释放内存资源
- 一问一答式的数据交换
一问一答,由于某种原因导致服务端错过了客户端的请求, 此时客户端和服务端都在等待双方发送数据
- 数据库锁
不论是表级别的还是行级别的锁, 如: 某个线程执行了for update语句退出了事务,其他线程在访问该数据库时都将陷入死锁
- 文件锁
某线程获取了文件锁意外退出,其他读取该文件的线程将会进入死锁直到系统释放文件句柄资源
- 死循环引起的死锁
查看线程堆栈信息不会发现死锁的现象,但是程序不工作,CPU占有率居高不下, 这种死锁又称为系统假死.
致命且难排查的死锁现象
4.5.2 程序死锁举例
- 交叉锁引起的死锁
jstack 或 jconsole 查看; 一般交叉锁引起的死锁线程都会进入BLOCKED状态, CPU资源占用不高, 容易借助工具发现jstack-1 PID
会直接发现死锁的信息;
- 死循环引起的死锁(假死)
因为工作线程并未BLOCKED, 而是始终处于RUNNABLE状态, CPU居高不下, 甚至都不能正常运行你的命令
可使用jstack, jconsole, jvisualvm, jProfiler(收费)进行诊断.
5. 线程通信
5.1 同步阻塞和异步非阻塞
5.1.1 同步阻塞消息处理
例: 客户端提交Event至服务器, 服务器收到客户端请求之后开辟线程处理客户请求, 经过较复杂的业务计算后将结果返给客户端.
- 同步Event提交, 客户端等待时间过长(提交Event时长 + 接受Event创建Thread时长 + 业务处理时长 + 返回结果时长) 会陷入阻塞, 导致二次提交Event耗时过长.
- 由于客户端提交的Event数量不多, 导致系统同时受理业务数量有限, 也就是系统整体的吞吐量不高
- 这种一个线程处理一个Event的方式, 会导致出现频繁的创建开启与销毁, 从而增加系统额外开销.
- 在业务达到峰值的时候, 大量的业务处理线程阻塞会导致频繁的CPU上下文切换, 从而降低系统性能
5.1.2 异步非阻塞消息处理
客户端提交Event后会得到一个相应的工单并且立即返回, Event则会被放置在Event队列中, 服务端有若干个工作线程, 不断的从Event队列中获取任务并且进行异步处理, 最后将处理结果保存至另外一个结果集中,如果客户端想要获得处理结果, 则可凭借工单号再次查询5.2 单线程通信
5.2.1 初识notify和wait
EventQueue
```java package com.zoro.concurrent.chapter05;
import java.util.LinkedList;
/**
- 【线程间通信】事件队列
- 队列满 - 最多容纳多少Event,好比一个系统最多同时能够受理多少业务一样
- 队列空 - 当所有的Event都被处理并且没有新的Event被提交的时候,此时队列是空的状态
- 有Event但没有满 - 有新的event被提交,但是此时没有达到队列的上限 *
- @author yx.jiang
@date 2021/7/27 10:26 */ public class EventQueue { private final int max;
static class Event {
}
private final LinkedList
eventQueue = new LinkedList<>(); private static final int DEFAULT_MAX_EVENT = 10;
public EventQueue() {
this(DEFAULT_MAX_EVENT);
}
public EventQueue(int max) {
this.max = max;
}
/**
- 提交事件到队列尾 *
@param event 事件 */ public void offer(Event event) { synchronized (eventQueue) {
if (eventQueue.size() >= max) {
console("队列满了。");
try {
eventQueue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
console("事件已提交。");
eventQueue.addLast(event);
eventQueue.notify();
} }
/**
- 从对头获取数据 *
@return event */ public Event take() { synchronized (eventQueue) {
if (eventQueue.isEmpty()) {
console("队列空了。");
try {
eventQueue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Event event = eventQueue.removeFirst();
this.eventQueue.notify();
console("事件 " + event + " 已处理");
return event;
} }
private void console(String message) { System.out.printf(“%s:%s \n”, Thread.currentThread().getName(), message); } }
<a name="Lp3yn"></a>
##### EventClient
```java
package com.zoro.concurrent.chapter05;
/**
* 事件客户端
*
* @author yx.jiang
* @date 2021/7/27 10:26
*/
public class EventClient {
/**
* 输出如下:
* Consumer:事件 com.zoro.concurrent.chapter05.EventQueue$Event@6f497ac0 已处理
* Consumer:事件 com.zoro.concurrent.chapter05.EventQueue$Event@2b474a9c 已处理
* Consumer:事件 com.zoro.concurrent.chapter05.EventQueue$Event@3a867800 已处理
* Consumer:队列空了。
* Producer:事件已提交。
* Consumer:事件 com.zoro.concurrent.chapter05.EventQueue$Event@4f032d53 已处理
* Consumer:队列空了。
* Producer:事件已提交。
* Consumer:事件 com.zoro.concurrent.chapter05.EventQueue$Event@6bb8d109 已处理
* Consumer:队列空了。
* Producer:事件已提交。
*/
public static void main(String[] args) {
final EventQueue eventQueue = new EventQueue();
new Thread(() -> {
for (; ; ) {
eventQueue.offer(new EventQueue.Event());
}
}, "Producer").start();
new Thread(() -> {
for (; ; ) {
eventQueue.take();
}
}, "Consumer").start();
}
}
5.2.2 wait和notify详解
Object中的方法
- Object的wait方法必须拥有该对象的monitor,也就是wait方法必须在同步方法中使用
- 当前线程执行了该对象的wait方法后, 将会放弃对该monitor的所有权并且进入与该对象关联的wait set中, 也就是说一旦线程执行了某个object的wait方法之后, 它就会释放对该对象monitor的所有权, 其他线程也会有机会继续争抢该monitor的所有权
5.2.3 wait,notify注意事项
- wait方法是可中断方法, [被打断后会收到中断异常interruptException, 同时interrupt标识也会被擦除]
- 线程执行了某个对象的wait方法后,会加入与之对应的wait set 中, 每一个对象的monitor都有与之对应的wait set
- 当线程进入wait set 之后, notify可将其唤醒, 也就是从wait set中弹出, 同时中断wait中的线程也会将其唤醒
- 必须在同步方法中使用wait和notify方法,因为执行wait和notify的前提是必须持有同步方法的monitor的所有权
错误示例: 需在同步代码块中
private void _testWait () {
_try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void _testNotify() {
_this.notify();
}
- 同步代码块的monitor必须与执行wait notify方法的对象一致,简单的说就是用哪个对象的monitor进行同步,就只能用哪个对象进行wait和notify操作.
错误示例: 应当用this
private final _Object MUTEX = _new _Object();
_private synchronized void _testWait() {
_try {
MUTEX.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
_private synchronized void _testNotify() {
MUTEX.notify();
}
5.2.4 wait和sleep
- 都可以使线程进入阻塞状态
- 均是可中断方法, 被中断后都会收到中断异常
- wait是Object的方法, sleep是Thread的方法
- wait需要在同步代码块中执行, 而sleep不需要
- 线程在同步方法中执行sleep时,并不会释放monitor锁, 而wait方法则会释放monitor的锁
sleep方法短暂休眠后会主动退出阻塞, 而wait(未指定wait时间)则需要被其他线程notify/中断后才能退出阻塞
5.3 多线程间通信
EventQueue 和 EventClient 在多线程下会出现以下问题:
LinkedList 为空时执行removeFirst方法
- LinkedList元素为10时执行addList方法
5.3.1 改进:
将临界值的判断if更改改为while, 将notify 更改为 notifyAll ```java package com.zoro.concurrent.chapter05;
import java.util.LinkedList;
/**
- 【线程间通信】事件队列
- 队列满 - 最多容纳多少Event,好比一个系统最多同时能够受理多少业务一样
- 队列空 - 当所有的Event都被处理并且没有新的Event被提交的时候,此时队列是空的状态
- 有Event但没有满 - 有新的event被提交,但是此时没有达到队列的上限 *
- @author yx.jiang
@date 2021/7/27 10:26 */ public class EventQueueSafe { private final int max;
static class Event {
}
private final LinkedList
eventQueue = new LinkedList<>(); private static final int DEFAULT_MAX_EVENT = 10;
public EventQueueSafe() {
this(DEFAULT_MAX_EVENT);
}
public EventQueueSafe(int max) {
this.max = max;
}
/**
- 提交事件到队列尾 *
@param event 事件 */ public void offer(Event event) { synchronized (eventQueue) {
while (eventQueue.size() >= max) {
console("队列满了。");
try {
eventQueue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
console("事件已提交。");
eventQueue.addLast(event);
eventQueue.notifyAll();
} }
/**
- 从对头获取数据 *
@return event */ public Event take() { synchronized (eventQueue) {
while (eventQueue.isEmpty()) {
console("队列空了。");
try {
eventQueue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Event event = eventQueue.removeFirst();
this.eventQueue.notifyAll();
console("事件 " + event + " 已处理");
return event;
} }
private void console(String message) { System.out.printf(“%s:%s \n”, Thread.currentThread().getName(), message); } }
<a name="YkSuU"></a>
#### 5.3.2 线程休息室wait set
<a name="AwQgt"></a>
### 5.4 自定义显式锁BooleanLock
> BooleanLock类似java.utils包下的Lock
<a name="Fa7Bn"></a>
#### 5.4.1 synchronized 的缺陷
1. 无法控制阻塞时长
2. 阻塞不可被中断
```java
package com.zoro.concurrent.chapter05;
import java.util.concurrent.TimeUnit;
/**
* synchronized缺陷
*
* @author yx.jiang
* @date 2021/7/27 10:26
*/
public class SynchronizedDefect {
//同步方法
public synchronized void syncMethod() {
try {
TimeUnit.HOURS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 1.
* T1线程先进入同步方法(t1启动后主线程休眠了2ms), T2线程启动执行{@link SynchronizedDefect#syncMethod}时会进入阻塞,
* T2何时能获取{@link SynchronizedDefect#syncMethod}的执行权, 取决于T1何时释放; 如果T2计划最多等待1min获得执行权, 否则就放弃
* 这种方式是无法做到的, 即: 阻塞时长无法控制
*
* 2.
* TT2因争抢某个monitor的锁而进入阻塞状态, 它是无法中断的, 虽然可以设置tt2线程的interrupt标识, 但是synchronized不像sleep和wait
* 那样可以获得中断信号
*/
public static void main(String[] args) throws InterruptedException {
//----------------------1.阻塞时长无法控制-------------------------
SynchronizedDefect defect = new SynchronizedDefect();
Thread t1 = new Thread(defect::syncMethod, "T1");
//确保t1启动
t1.start();
TimeUnit.MILLISECONDS.sleep(2);
Thread t2 = new Thread(defect::syncMethod, "T2");
t2.start();
//-----------------------2.阻塞不可被中断-----------------------
SynchronizedDefect defect2 = new SynchronizedDefect();
Thread tt1 = new Thread(defect2::syncMethod, "T1");
//确保tt1启动
tt1.start();
TimeUnit.MILLISECONDS.sleep(2);
Thread tt2 = new Thread(defect2::syncMethod, "T2");
tt2.start();
//确保tt2启动
TimeUnit.MILLISECONDS.sleep(2);
tt2.interrupt();
//true
System.out.println(tt2.isInterrupted());
//BLOCKED
System.out.println(tt2.getState());
}
}
5.4.2 显示锁BooleanLock
1. Lock接口
package com.zoro.concurrent.chapter05.lock;
import java.util.List;
import java.util.concurrent.TimeoutException;
/**
* 锁
*
* @author yx.jiang
* @date 2021/7/27 10:26
*/
public interface Lock {
/**
* 永远阻塞,除非获取到了锁, 和synchronized类似, 但是该方法是可以被中断的
* 中断时会抛出{@link InterruptedException}
*/
void lock() throws InterruptedException;
/**
* 除了可以被中断外, 还增加了对应的超时功能
*
* @param mills 超时时长
*/
void lock(long mills) throws InterruptedException, TimeoutException;
/**
* 释放锁
*/
void unLock();
/**
* 获取当前有哪些线程被阻塞
* @return 被阻塞的线程
*/
List<Thread> getBlockedThreads();
}
2. 实现BooleanLock
package com.zoro.concurrent.chapter05.lock;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeoutException;
import static java.lang.System.currentTimeMillis;
import static java.lang.Thread.currentThread;
/**
* 显式BooleanLock
*
* @author yx.jiang
* @date 2021/7/27 10:26
*/
public class BooleanLock implements Lock {
/**
* 当前拥有锁的线程
*/
private Thread currentThread;
/**
* false: 当前该锁没有被任何线程获得或者已经释放
* true: 该锁已被某个线程({@link BooleanLock#currentThread)获得
*/
private boolean locked = false;
/**
* 存储 [在获取当前线程时被阻塞的线程]
*/
private final List<Thread> blockedList = new ArrayList<>();
@Override
public void lock() throws InterruptedException {
//使用同步代码块的方式进行同步
synchronized (this) {
//如果当前锁已经被某个线程获得, 则该线程将加入阻塞队列, 并且使当前线程wait释放对this monitor的所有权
while (locked) {
blockedList.add(currentThread());
this.wait();
}
//如果当前锁没有被其他线程获得, 则该线程将尝试从阻塞队列中删除自己
// (注: 如果当前线程从未进如阻塞队列, 删除方法无任何影响; 如果当前线程是从wait set中被唤醒的, 则需要从阻塞队列中将自己删除)
blockedList.remove(currentThread());
//指定locked开关为true
this.locked = true;
//记录获取锁的线程
this.currentThread = currentThread();
}
}
@Override
public void lock(long mills) throws InterruptedException, TimeoutException {
synchronized (this) {
//mills不合法,默认调用lock()方法, 亦可抛出异常
if (mills <= 0) {
this.lock();
} else {
long remainingMills = mills;
long endMills = currentTimeMillis() + remainingMills;
while (locked) {
//如果remainingMills<=0, 意味着当前线程被其他线程唤醒或者在指定的wait时间到了之后还没获得锁, 这时抛出超时异常
if (remainingMills <= 0) {
throw new TimeoutException("经" + mills + "ms未能获取锁");
}
if (!blockedList.contains(currentThread())) {
blockedList.add(currentThread);
}
//等待remainingMills毫秒后,该值起初是由其他线程传入的, 但在多次wait过程中会重新计算
this.wait(remainingMills);
//重新计算remainingMills[剩余时间]
remainingMills = endMills - currentTimeMillis();
}
//获得该锁, 并且从block列表中删除当前线程, 将locked的状态改为true并指定获得锁的线程就是当前线程
blockedList.remove(currentThread());
this.locked = true;
this.currentThread = currentThread();
}
}
}
@Override
public void unLock() {
synchronized (this) {
//判断当前线程是否为获取锁的那个线程, 止呕加了锁的线程才能解锁
if (currentThread == currentThread()) {
//设置锁的状态为false
this.locked = false;
//通知其他在wait set 中的线程, 可以再次尝试获取锁了, 亦可使用 this.notify();
this.notifyAll();
}
}
}
@Override
public List<Thread> getBlockedThreads() {
return Collections.unmodifiableList(blockedList);
}
}
fixed: 某个线程被中断, 他将有可能还存在于blockList中:
package com.zoro.concurrent.chapter05.lock;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import static java.lang.System.currentTimeMillis;
import static java.lang.Thread.currentThread;
/**
* 显式BooleanLock
* <p>fixed: 某个线程被中断, 他将有可能还存在于blockList中 </p>
*
* @author yx.jiang
* @date 2021/7/27 10:26
*/
public class BooleanLockFixed implements Lock {
/**
* 当前拥有锁的线程
*/
private Thread currentThread;
/**
* false: 当前该锁没有被任何线程获得或者已经释放
* true: 该锁已被某个线程({@link BooleanLockFixed#currentThread)获得
*/
private boolean locked = false;
/**
* 存储 [在获取当前线程时被阻塞的线程]
*/
private final List<Thread> blockedList = new ArrayList<>();
@Override
public void lock() throws InterruptedException {
//使用同步代码块的方式进行同步
synchronized (this) {
//如果当前锁已经被某个线程获得, 则该线程将加入阻塞队列, 并且使当前线程wait释放对this monitor的所有权
while (locked) {
//暂存当前线程
final Thread tempThread = currentThread();
try {
blockedList.add(tempThread);
this.wait();
} catch (InterruptedException e) {
//⚠如果当前线程在wait时被中断, 则从blockedList中将其删除, 避免内存泄露
blockedList.remove(tempThread);
//继续抛出中断异常
throw e;
}
}
//如果当前锁没有被其他线程获得, 则该线程将尝试从阻塞队列中删除自己
// (注: 如果当前线程从未进如阻塞队列, 删除方法无任何影响; 如果当前线程是从wait set中被唤醒的, 则需要从阻塞队列中将自己删除)
blockedList.remove(currentThread());
//指定locked开关为true
this.locked = true;
//记录获取锁的线程
this.currentThread = currentThread();
}
}
@Override
public void lock(long mills) throws InterruptedException, TimeoutException {
synchronized (this) {
//mills不合法,默认调用lock()方法, 亦可抛出异常
if (mills <= 0) {
this.lock();
} else {
long remainingMills = mills;
long endMills = currentTimeMillis() + remainingMills;
while (locked) {
final Thread tempThread = currentThread();
try {
//如果remainingMills<=0, 意味着当前线程被其他线程唤醒或者在指定的wait时间到了之后还没获得锁, 这时抛出超时异常
if (remainingMills <= 0) {
throw new TimeoutException("经" + mills + "ms未能获取锁");
}
if (!blockedList.contains(tempThread)) {
blockedList.add(currentThread);
}
//等待remainingMills毫秒后,该值起初是由其他线程传入的, 但在多次wait过程中会重新计算
this.wait(remainingMills);
//重新计算remainingMills[剩余时间]
remainingMills = endMills - currentTimeMillis();
} catch (InterruptedException e) {
blockedList.remove(tempThread);
throw e;
}
}
//获得该锁, 并且从block列表中删除当前线程, 将locked的状态改为true并指定获得锁的线程就是当前线程
blockedList.remove(currentThread());
this.locked = true;
this.currentThread = currentThread();
}
}
}
@Override
public void unLock() {
synchronized (this) {
//判断当前线程是否为获取锁的那个线程, 止呕加了锁的线程才能解锁
if (currentThread == currentThread()) {
//设置锁的状态为false
this.locked = false;
Optional.of(currentThread().getName() + " 释放锁.").ifPresent(System.out::println);
//通知其他在wait set 中的线程, 可以再次尝试获取锁了, 亦可使用 this.notify();
this.notifyAll();
}
}
}
@Override
public List<Thread> getBlockedThreads() {
return Collections.unmodifiableList(blockedList);
}
}
3. 使用BooleanLock
- 多个线程通过lock()方法争抢锁
- 可中断被阻塞的线程
- 阻塞的线程可超时 ```java package com.zoro.concurrent.chapter05.lock;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.IntStream;
import static java.lang.Thread.currentThread; import static java.util.concurrent.ThreadLocalRandom.current;
/**
- lock-test *
- @author yx.jiang
@date 2021/7/27 10:26 / public class BooleanLockTest { /*
定义{@link BooleanLock} */ private final Lock lock = new BooleanLock();
/**
使用try..finally语句块确保lock每次都能被正确释放 */ public void syncMethod() { try {
//加锁
lock.lock();
int randomInt = current().nextInt(10);
System.out.println(currentThread() + "获得锁.");
TimeUnit.SECONDS.sleep(randomInt);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//释放锁
lock.unLock();
} }
public void syncMethodTimeoutable() { try {
//加锁
lock.lock(1000);
int randomInt = current().nextInt(10);
System.out.println(currentThread() + "获得锁.");
TimeUnit.SECONDS.sleep(randomInt);
} catch (InterruptedException | TimeoutException e) {
e.printStackTrace();
} finally {
//释放锁
lock.unLock();
} }
public static void main(String[] args) { BooleanLockTest booleanLockTest = new BooleanLockTest(); // multiThreadGrapeTest(booleanLockTest); // interruptBlockedThreadTest(booleanLockTest); lockTimeoutReleaseTest(booleanLockTest); }
/**
- 阻塞线程可超时
- 输出如下:
- Thread[Thread-0,5,main]获得锁.
- java.util.concurrent.TimeoutException: 经1000ms未能获取锁
- at com.zoro.concurrent.chapter05.lock.BooleanLock.lock(BooleanLock.java:65)
- at com.zoro.concurrent.chapter05.lock.BooleanLockTest.syncMethodTimeoutable(BooleanLockTest.java:43)
- at java.lang.Thread.run(Thread.java:748)
- Thread-0 释放锁. *
@param booleanLockTest 测试 */ private static void lockTimeoutReleaseTest(BooleanLockTest booleanLockTest) { try {
new Thread(booleanLockTest::syncMethodTimeoutable).start();
TimeUnit.MILLISECONDS.sleep(2);
Thread t2 = new Thread(booleanLockTest::syncMethodTimeoutable, "T2");
t2.start();
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
} }
/**
- <2>可中断被阻塞的线程
- 输出如下:
- Thread[T1,5,main]获得锁.
- java.lang.InterruptedException
- at java.lang.Object.wait(Native Method)
- at java.lang.Object.wait(Object.java:502)
- at com.zoro.concurrent.chapter05.lock.BooleanLock.lock(BooleanLock.java:40)
- at com.zoro.concurrent.chapter05.lock.BooleanLockTest.syncMethod(BooleanLockTest.java:27)
- at java.lang.Thread.run(Thread.java:748) *
@param booleanLockTest 测试 */ private static void interruptBlockedThreadTest(BooleanLockTest booleanLockTest) { try {
new Thread(booleanLockTest::syncMethod, "T1").start();
TimeUnit.MILLISECONDS.sleep(2);
Thread t2 = new Thread(booleanLockTest::syncMethod, "T2");
t2.start();
TimeUnit.MILLISECONDS.sleep(2);
t2.interrupt();
} catch (InterruptedException e) {
e.printStackTrace();
} }
/**
- <1>多线程通过lock()方法争抢锁
- 输出如下:
- Thread[Thread-0,5,main]获得锁.
- Thread-0 释放锁.
- Thread[Thread-9,5,main]获得锁.
- Thread-9 释放锁.
- Thread[Thread-1,5,main]获得锁.
- Thread-1 释放锁.
- Thread[Thread-8,5,main]获得锁.
- Thread-8 释放锁.
- Thread[Thread-2,5,main]获得锁.
- Thread-2 释放锁.
- Thread[Thread-7,5,main]获得锁.
- Thread-7 释放锁.
- Thread[Thread-3,5,main]获得锁.
- Thread-3 释放锁.
- Thread[Thread-6,5,main]获得锁.
- Thread-6 释放锁.
- Thread[Thread-5,5,main]获得锁.
- Thread-5 释放锁.
- Thread[Thread-4,5,main]获得锁.
- Thread-4 释放锁. *
- @param booleanLockTest 测试
*/
private static void multiThreadGrapeTest(BooleanLockTest booleanLockTest) {
IntStream.range(0, 10)
} }.mapToObj(i -> new Thread(booleanLockTest::syncMethod))
.forEach(Thread::start);
```