1.同步阻塞与异步非阻塞
1.1同步阻塞消息处理
- 同步Event提交, 客户端等待时间过长 (提交Event时长+接受Event创建thread时 长+业务处理时长+返回结果时长)会陷入阻塞, 导致二次提交Event耗时过长。
- 由于客户端提交的Event数量不多, 导致系统同时受理业务数量有限,也就是系统整体的吞吐量不高。
- 这种一个线程处理一个Event的方式,会导致出现频繁的创建开启与销毁,从而增加系统额外开销。
- 在业务达到峰值的时候,大量的业务处上理线程阻塞会导致频繁的CPU上下文切换,从而降低系统性能。
1.2异步飞阻塞消息处理
客户端提交Event后会得到一个相应的工单并且立即返回, Event则会被放置在Event 队列中。服务端有若干个工作线程, 不断地从Event队列中获取任务并且进行异步处 理, 最后将处理结果保存至另外一个结果集中,如果客户端想要获得处理结果,则可凭借工单 号再次查询。
两种方式相比较,你会发现异步非阻塞的优势非常明显,首先客户端不用等到结果 处 理结束之后才能返回,从而提高了系统的吞吐量和并发量;其次若服务端的线程数量在一 个可控的范围之内是不会导致太多的CPU上下文切换从而带来的额外开销的; 再次服务端 线程可以重复利用,这样就减少了不断创建线程带来的资源浪费。
2.单线程间通信
在1.2节中, 服务端有若干个线程会从队列中获取相应的Event进行异步处理, 那 么 这些线程又是如何从队列中获取数据的呢?换句话说就是如何知道队列里此时是否有数据 呢?比较笨的办法是不断地轮询:如果有数据则读取数据并处理,如果没有则等待若干 时 间再次轮询。还有一种比较好的方式就是通知机制:如果队列中有Event, 则通知工作的线程开始工作;没有Event,则工作线程休息并等待通知。
2.1初试wait和notify
在本节中, 我们将会学习线程之间如何进行通信, 首先实现一个EventQueue, 该Queue有如下三种状态:
- 队列满——最多可容纳多少个Event, 好比一个系统最多同时能够受理多少业务一样;
- 队列空一 当所有的Event都被处理并且没有新的Event被提交的时候, 此时队列将是空的状态;
- 有Event但是没有满—— 有新的Event被提交, 但是此时没有到达队列的上限。
EventQueue队列:
import java.util.LinkedList;
public class EventQueue {
private final int max;
static class Event{
}
private final LinkedList<Event> eventQueue = new LinkedList<>();
private final static int DEAULT_MAX_EVENT=10;
public EventQueue() {
this(DEAULT_MAX_EVENT);
}
public EventQueue(int max){
this.max = max;
}
public void offer(Event event) {
synchronized (eventQueue) {
if(eventQueue.size() >= max) {
try {
System.out.println("the queue is full");
eventQueue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("the new event is submitted");
eventQueue.addLast(event);
eventQueue.notify();
}
}
public Event take() {
synchronized (eventQueue) {
if ( eventQueue.isEmpty()) {
try {
System.out.println("the queue is empty");
eventQueue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Event event = eventQueue.removeFirst();
this.eventQueue.notify();
System.out.println("the event " + event + " is handled");
return event;
}
}
}
上述代码中,在EventQueue中定义了一个队列,offer方法会提交一个Event至队尾,如果此时队列已经满了,那么提交的线程将会被阻塞,这是调用了wait方法的结果(后 文中会重点介绍wait方法) 。同样take方法会从队头获取数据,如果队列中没有可用数据, 那么工作线程就会被阻塞,这也是调用wait方法的直接结果。此外,还可以看到一个notify方法,该方法的作用是唤醒那些曾经执行 monitor的wait方法而进入阻塞的线程。
EventClient测试代码:
import java.util.concurrent.TimeUnit;
public class EventClient {
public static void main(String[] args) {
final EventQueue eventQueue = new EventQueue();
new Thread(
()->{
for(;;) {
eventQueue.offer(new EventQueue.Event());
}
}
,"Producer"
).start();
new Thread(
()->{
for(;;) {
eventQueue.take();
try {
TimeUnit.MICROSECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
,"Consumer"
).start();
}
}
其中, Producer线程模拟提交Event的客户端几乎没有任何的延迟, 而Consumer线程则用于模拟处理请求的工作线程(上面的EventQueue目前只支持一个线程的Producer和一个线程的Consumer, 也就是单线程间的通信, 多个线程的生产者与消费者的会在1.3节中讲到)。运行上面的代码,分析一下输出的结果,具体如下:
2.2wait和notify方法详解
看到这里想必读者对wait和notify方法有了一个大概的认识, wait和notify方法并不是Thread特有的方法, 而是Object中的方法, 也就是说在JDK中的每一个类都拥有这两个方法,那么这两个方法到底有什么神奇之处可以使线程阻塞又可以唤醒线程呢?我们先来说说wait方法, 下面是wait方法的三个重载方法。
public final void wait() throws InterruptedException
public final void wait(long timeout) throws InterruptedException
public final void wait(long timeout,int naos) throws InterruptedException
- wait方法的这三个重载方法都将调用wait(longtime out)这个方法,前文使用的wait() 方法等价于wait(0) , 0代表着永不超时。
- Object的wait(longtime out) 方法会导致当前线程进入阻塞, 直到有其他线程调用了Object的notify或者notifyAll方法才能将其唤醒, 或者阻塞时间到达了timeout时间而自动唤醒。
- 当前线程执行了该对象的wait方法之后, 将会放弃对该monitor的所有权并且进入与该对象关联的wait set中, 也就是说一旦线程执行了某个object的wait方法之后,它就会释放对该对象monitor的所有权, 其他线程也会有机会继续争抢该monitor的所有权。
public final native void notify();
- 唤醒单个正在执行该对象wait方法的线程。
- 如果有某个线程由于执行该对象的wait方法而进入阻塞则会被唤醒, 如果没有则会忽略。
- 被唤醒的线程需要重新获取对该对象所关联monitor的lock才能继续执行。
2.3关于wait和notify的注意事项
- wait方法是可中断方法, 这也就意味着, 当前线程一旦调用了wait方法进入阻塞状态, 其他线程是可以使用interrupt方法将其打断的; 根据3.7节的介绍, 可中断方法被打断后会收到中断异常InterruptedException, 同时interrupt标识也会被擦除。
- 线程执行了某个对象的wait方法以后, 会加人与之对应的wait set中, 每一个对象的 monitor都有一个与之关联的wait set(5.3.2节中会详细讲解wait set的知识) 。
- 当线程进入wait set之后, notify方法可以将其唤醒, 也就是从wait set中弹出, 同时中断wait中的线程也会将其唤醒。
- 必须在同步方法中使用wait和notify方法, 因为执行wait和notify的前提条件是必须持有同步方法的monitor的所有权, 运行下面任何一个方法都会抛出非法的monitor状态异常Illegal Monitor State Exception:
- 同步代码的monitor必须与执行wait notify方法的对象一致, 简单地说就是用哪个对象的monitor进行同步, 就只能用哪个对象进行wait和notify操作。运行下面代码中的任何一个方法, 同样都会抛出Illegal Monitor State Exception异常信息: ```java private final Object MUTEX = new Object();
private synchronized void testWait() { try { MUTEX.wait(); } catch (InterruptedException e) { e.printStackTrace(); } }
private synchronized void testNotify() { MUTEX.notify(); }
上述同步方法中monitor引用的是this, 而wait和notify方法使用的却是MUTEX的方法, 其实这并不难理解, 虽然是在同步方法中执行wait和notify方法, 但是wait和notify方法的执行并未以获取MUTEX的monitor为前提。
<a name="Eg2Hd"></a>
### 2.4 wait 和 sleep
从表面上看, wait和sleep方法都可以使当前线程进入阻塞状态, 但是两者之间存在着本质的区别,下面我们将总结两者的区别和相似之处。
- wait和sleep方法都可以使线程进入阻塞状态。
- wait和sleep方法均是可中断方法, 被中断后都会收到中断异常。
- wait是Object的方法, 而sleep是Thread特有的方法。
- wait方法的执行必须在同步方法中进行, 而sleep则不需要。
- 线程在同步方法中执行sleep方法时, 并不会释放monitor的锁, 而wait方法则会释放monitor的锁。
- sleep方法短暂休眠之后会主动退出阻塞, 而wait方法(没有指定wait时间) 则需要被其他线程中断后才能退出阻塞。
<a name="DMLoK"></a>
## 3.多线程通信
<a name="R1X5J"></a>
### 3.1生产者和消费者
<a name="JgrUJ"></a>
#### 3.1.1notifyAll方法
多线程间通信需要用到Object的notifyAll方法, 该方法与notify比较类似, 都可以唤醒由于调用了wait方法而阻塞的线程, 但是notify方法只能唤醒其中的一个线程, 而notifyAll方法则可以同时唤醒全部的阻塞线程, 同样被唤醒的线程仍需要继续争抢monitor的锁。
<a name="Y3agg"></a>
#### 3.1.2生产者和消费者
在2.1节中曾定义了一个EventQueue, 该队列在多个线程同时并发的情况下会出现数据不一致的情况, 读者可以自行增加Event Client中的线程数量进行测试, 在笔者的测试中出现了数据不一致的情况, 大致可分为两类:**其一是LinkedList中没有元素的时候仍旧调用了remove First方法, 其二是当LinkedList中的元素超过10个的时候仍旧执行了addLast方法, 下面通过图示的方法分别对其进行分析。**
<a name="IQXbx"></a>
##### (1)LinkedList为空时执行removeFirst方法
也许有读者会有疑问, EventQueue中的方法都增加了synchronized数据同步, 为何还会存在数据不一致的情况?假设EventQueue中的元素为空, 两个线程在执行take方法时分别调用wait方法进入了阻塞之中, 另外一个offer线程执行add Last方法之后唤醒了其中一个阻塞的take线程, 该线程顺利消费了一个元素之后恰巧再次唤醒了一个take线程, 这时就会导致执行空LinkedList的remove First方法, 执行过程如图所示<br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/766178/1627440166898-746b3558-9b20-492c-aeb7-03b19572484e.png#clientId=ue6e61031-e9a5-4&from=paste&id=uae18e3c8&margin=%5Bobject%20Object%5D&name=image.png&originHeight=517&originWidth=1166&originalType=binary&ratio=1&size=820952&status=done&style=none&taskId=u71e5aff0-0c21-4991-97e2-055d45027a8)
<a name="OKYBI"></a>
##### (2)LinkedList元素为10时执行add Last方法
假设某个时刻EventQueue中存在10个Event数据, 其中两个线程在执行offer方法的时候分别因为调用了wait方法而进入阻塞中, 另外的一个线程执行take方法消费了event元素并且唤醒了一个offer线程, 而该offer线程执行了add Last方法之后, queue中的元素为10, 并且再次执行唤醒方法, 恰巧另外一个offer线程也被唤醒, 因此可以绕开阀值检查eventQueue() .size>=max, 致使EventQueue中的元素超过10个, 执行过程如图所示<br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/766178/1627440389631-48f62030-1f88-4496-8edb-67b7b5061618.png#clientId=ue6e61031-e9a5-4&from=paste&id=ubaf172b2&margin=%5Bobject%20Object%5D&name=image.png&originHeight=513&originWidth=1167&originalType=binary&ratio=1&size=692815&status=done&style=none&taskId=ud92113a0-7336-45cf-81e6-88011a5d85d)
<a name="o7AiN"></a>
##### (3)改进
在分析完多线程情况下出现的问题之后,我们将对其进行改进。实际上在真实的开发中,绝大多数时候遇到的都是多线程间通信的情况,其中生产者消费者的例子就是最好的模型,示例如下:
```java
public void offer(Event event) {
synchronized (eventQueue) {
while(eventQueue.size()>=max) {
System.out.println("the queue is full.");
eventQUeue.wait();
}
System.out.println("the new event is submmited");
eventQueue.addLast(event);
eventQueue.notifyAll();
}
}
public Event take() {
synchronized (eventQueue) {
while(eventQueue.isEmpty()) {
eventQueue.wait();
}
}
Event event = eventQUeue.removeFirst();
this.eventQueue.notifyAll();
return event;
}
只需要将临界值的判断if更改为while, 将notify更改为notifyAll即可。
3.2线程休息室wait set
图是若干个线程调用了wait方法之后被加入与monitor关联的wait set中, 待另外一个线程调用该monitor的notify方法之后, 其中一个线程会从wait set中弹出, 至于是随机弹出还是以先进先出的方式弹出,虚拟机规范同样也没有给出强制的要求
而执行notifyAll则不需要考虑哪个线程会被弹出, 因为wait set中的所有wait线程都将被弹出,如图所示。
4.自定义显式锁BooleanLock
4.1synchronized关键字的缺陷
synchronized关键字提供了一种排他式的数据同步机制, 某个线程在获取monitor lock的时候可能会被阻塞,而这种阻塞有两个很明显的缺陷:第一,无法控制阻塞时长。第二,阻塞不可被中断。下面通过示例来进行分析,如下:
public class SynchedTest {
public synchronized void syncMethod() {
try {
TimeUnit.HOURS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException {
SynchedTest defect = new SynchedTest();
Thread t1 = new Thread(defect::syncMethod, "T1");
t1.start();
TimeUnit.MILLISECONDS.sleep(2);
Thread t2 = new Thread(defect::syncMethod,"T2");
t2.start();
}
}
上面的代码中有一个同步方法syncMethod, 这里启动了两个线程分别调用该方法, 在该方法中线程会休眠1小时的时间,为了确保T1线程能够最先进入同步方法,在T1线程启动后主线程休眠了2毫秒的时间。T2线程启动执行syncMethod方法时会进入阻塞, T2什么时候能够获得sync Method的执行完全取决于T1何时对其释放, 如果T2计划最多1分钟获得执行权,否则就放弃,很显然这种方式是做不到的,这也就是前面所说的阻塞时长无法控制
第二个缺陷是T2若因争抢某个monitor的锁而进入阻塞状态, 那么它是无法中断的,虽然可以设置T2线程的interrupt标识, 但是synchronized阻塞不像sleep和wait方法一样能够捕获得到中断信号。下面将main方法的代码稍作修改,试图打断T2线程,来看看会怎样。
public static void main(String[] args) throws InterruptedException {
SynchedTest defect = new SynchedTest();
Thread t1 = new Thread(defect::syncMethod, "T1");
t1.start();
TimeUnit.MILLISECONDS.sleep(2);
Thread t2 = new Thread(defect::syncMethod,"T2");
t2.start();
TimeUnit.MILLISECONDS.sleep(2);
t2.interrupt();
System.out.println(t2.isInterrupted());
System.out.println(t2.getState());
}
4.2显式锁Boolean Lock
在本节中, 我们将利用前面所学的知识, 构造一个显式的BooleanLock, 使其在具备synchronized关键字所有功能的同时又具备可中断和lock超时的功能。
4.2.1定义Lock接口
import java.util.List;
import java.util.concurrent.TimeoutException;
public interface Lock {
void lock() throws InterruptedException;
void lock(long mills) throws InterruptedException, TimeoutException;
void unlock();
List<Thread> getBlockedThreads();
}
- lock() 方法永远阻塞, 除非获取到了锁, 这一点和synchronized非常类似, 但是该方法是可以被中断的, 中断时会抛出InterruptedException异常。
- lock(long mills) 方法除了可以被中断以外, 还增加了对应的超时功能。
- unlock() 方法可用来进行锁的释放。
- get Blocked Threads() 用于获取当前有哪些线程被阻塞。
4.2.2 实现BooleanLock
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeoutException;
import static java.lang.System.currentTimeMillis;
import static java.lang.Thread.currentThread;
public class BooleanLock implements Lock{
private Thread currentThread;
private boolean locked=false;
private final List<Thread> blockedList = new ArrayList<>();
// 其中current Thread代表当前拥有锁的线程, locked是一个boolean开关, false代表当
// 前该锁没有被任何线程获得或者已经释放, true代表该锁已经被某个线程获得, 该线程就是
// current Thread; blocked List用来存储哪些线程在获取当前线程时进人了阻塞状态
@Override
public void lock() throws InterruptedException {
synchronized (this) { // ①Lock() 方法使用同步代码块的方式进行方法同步。
while(locked) { // ②如果当前锁已经被某个线程获得, 则该线程将加入阻塞队列, 并且使当前线程wait 释放对this monitor的所有权。
blockedList.add(currentThread());
this.wait();
}
blockedList.remove(currentThread()); // ③如果当前锁没有被其他线程获得,则该线程将尝试从阻塞队列中删除自己(注意:如
//果当前线程从未进入过阻塞队列, 删除方法不会有任何影响; 如果当前线程是从wait set中
//被唤醒的,则需要从阻塞队列中将自己删除)。
this.locked=true; // ④locked开关被指定为true。
this.currentThread = currentThread(); // ⑤记录获取锁的线程。
}
}
@Override
public void lock(long mills) throws InterruptedException, TimeoutException {
synchronized (this) {
if ( mills <= 0) {// ①如果mills不合法, 则默认调用 lock() 方法, 当然也可以抛出参数非法的异常,
// 一般来说,抛出异常是一种比较好的做法。
this.lock();
} else {
long remainingMills = mills;
long endMills = currentTimeMillis() + remainingMills;
while(locked) {
if ( remainingMills <=0 ) { // ②如果remaining Mills小于等于0 则意味着当前线程被其他线程唤醒或者在指定的
// wait时间到了之后还没有获得锁, 这种 情况下会抛出超时的异常。
throw new TimeoutException("can not get the lock during " + mills);
}
if ( !blockedList.contains(currentThread())) {
blockedList.add(currentThread());
this.wait(remainingMills); // ③等待remaining Mills的毫秒数, 该值最开始是由其他线程传入的, 但在多次wait的
// 过程中会重新计算。
remainingMills = endMills - currentTimeMillis(); // ④重新计算remaining Mills时间。
}
blockedList.remove(currentThread()); // ⑤获得该锁, 并且从block列表中 删除当前线程, 将locked的状态修改为true并且指
// 定获得锁的线程就是当前线程。
this.locked = true;
this.currentThread = currentThread();
}
}
}
}
@Override
public void unlock() {
synchronized (this) {
if ( currentThread == currentThread()) { // ①判断当前线程是否为获取锁的那个线程,只有加了锁的线程才有资格进行解锁。
this.locked = false; // ②将锁的locked状态修改为false。
this.notifyAll(); // ③通知其他在wait set中的线程, 你们可以再次尝试抢锁了, 这里使用notify和 notifyAll都可以。
}
}
}
@Override
public List<Thread> getBlockedThreads() {
return Collections.unmodifiableList(blockedList);
}
}
4.2.3 使用BooleanLock
1) 多个线程通过lock()方法争抢锁
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import static java.util.concurrent.ThreadLocalRandom.current;
public class BooleanLockTest {
// 定义BooleanLock
private final Lock lock = new BooleanLock();
// 使用try...finally语句块确保lock每次都能被正确释放
public void syncMethod() {
try {
lock.lock();
int ramdomint = current().nextInt(10);
System.out.println(Thread.currentThread() + " get the lock");
TimeUnit.SECONDS.sleep(ramdomint);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
BooleanLockTest blt = new BooleanLockTest();
//
IntStream.range(0,10).mapToObj(i -> new Thread(blt::syncMethod)).forEach(Thread::start);
}
}
2)可中断被堵塞的锁
public static void main(String[] args) throws InterruptedException {
BooleanLockTest blt = new BooleanLockTest();
//
// IntStream.range(0,10).mapToObj(i -> new Thread(blt::syncMethod)).forEach(Thread::start);
new Thread(blt::syncMethod, "T1").start();
TimeUnit.MILLISECONDS.sleep(2);
Thread t2 = new Thread(blt::syncMethod,"T2");
t2.start();
TimeUnit.MILLISECONDS.sleep(10);
t2.interrupt();
}
运行上面的程序,在T2线程启动10毫秒以后,主动将其中断,T2线程会收到中断信号, 也就是InterruptedException异常, 这样也就弥补了Synchronized同步方式不可被中断的缺陷。上述程序的运行结果如下:
但是, Boolean Lock还存在一个问题, 如果某个线程被中断, 那么它将有可能还存在于blockList中, 该问题的修复也非常简单, 可以对Boolean Lock的lock方法进行进一步的增强加以修复, 另外一个lock重载方法的实现思路与之类似, 如下代码所示:
@Override
public void lock() throws InterruptedException {
synchronized (this) { // ①Lock() 方法使用同步代码块的方式进行方法同步。
while(locked) { // ②如果当前锁已经被某个线程获得, 则该线程将加入阻塞队列, 并且使当前线程wait 释放对this monitor的所有权。
final Thread tempThread = currentThread();
try {
if ( !blockedList.contains(tempThread)) {
blockedList.add(tempThread);
}
this.wait();
} catch (InterruptedException e) {
// 如果当前线程在wait时被中断, 则从blocked List中将其删除, 避免内存泄漏
blockedList.remove(tempThread);
throw e;
}
}
blockedList.remove(currentThread()); // ③如果当前锁没有被其他线程获得,则该线程将尝试从阻塞队列中删除自己(注意:如
//果当前线程从未进入过阻塞队列, 删除方法不会有任何影响; 如果当前线程是从wait set中
//被唤醒的,则需要从阻塞队列中将自己删除)。
this.locked=true; // ④locked开关被指定为true。
this.currentThread = currentThread(); // ⑤记录获取锁的线程。
}
}
3) 阻塞的线程可超时
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static java.lang.Thread.currentThread;
import static java.util.concurrent.ThreadLocalRandom.current;
public class BooleanLockTest1 {
// 定义BooleanLock
private final Lock lock = new BooleanLock();
public void syncMethodTimeoutable() {
try {
lock.lock(1000);
System.out.println(currentThread() + " get the lock");
int randomInt = current().nextInt(1);
TimeUnit.SECONDS.sleep(randomInt);
} catch (InterruptedException | TimeoutException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
BooleanLockTest1 blt = new BooleanLockTest1();
new Thread(blt::syncMethodTimeoutable, "T1").start();
TimeUnit.MILLISECONDS.sleep(2);
Thread t2 = new Thread(blt::syncMethodTimeoutable, "T2");
t2.start();
TimeUnit.MILLISECONDS.sleep(10);
}
}