synchronized 中等待和唤醒线程示例
package com.itsoku.chat09;
import java.util.concurrent.TimeUnit;
/
微信公众号:程序员路人,专注于java技术分享(带你玩转 爬虫、分布式事务、异步消息服务、任务调度、分库分表、大数据等),喜欢请关注!
/
public class Demo1 {
static Object lock = new** Object();
**public** **static** **class** **T1** **extends** **Thread** {<br /> @Override<br /> **public** **void** **run**() {<br /> System.out.println(System.currentTimeMillis() + "," + **this**.getName() + "准备获取锁!");<br /> **synchronized** (lock) {<br /> System.out.println(System.currentTimeMillis() + "," + **this**.getName() + "获取锁成功!");<br /> **try** {<br /> lock.wait();<br /> } **catch** (InterruptedException e) {<br /> e.printStackTrace();<br /> }<br /> }<br /> System.out.println(System.currentTimeMillis() + "," + **this**.getName() + "释放锁成功!");<br /> }<br /> }
**public** **static** **class** **T2** **extends** **Thread** {<br /> @Override<br /> **public** **void** **run**() {<br /> System.out.println(System.currentTimeMillis() + "," + **this**.getName() + "准备获取锁!");<br /> **synchronized** (lock) {<br /> System.out.println(System.currentTimeMillis() + "," + **this**.getName() + "获取锁成功!");<br /> lock.notify();<br /> System.out.println(System.currentTimeMillis() + "," + **this**.getName() + " notify!");<br /> **try** {<br /> TimeUnit.SECONDS.sleep(5);<br /> } **catch** (InterruptedException e) {<br /> e.printStackTrace();<br /> }<br /> System.out.println(System.currentTimeMillis() + "," + **this**.getName() + "准备释放锁!");<br /> }<br /> System.out.println(System.currentTimeMillis() + "," + **this**.getName() + "释放锁成功!");<br /> }<br /> }
**public** **static** **void** **main**(String[] args) **throws** InterruptedException {<br /> T1 t1 = **new** T1();<br /> t1.setName("t1");<br /> t1.start();<br /> TimeUnit.SECONDS.sleep(5);<br /> T2 t2 = **new** T2();<br /> t2.setName("t2");<br /> t2.start();<br /> }<br />}
输出:
1:1563530109234,t1准备获取锁!
2:1563530109234,t1获取锁成功!
3:1563530114236,t2准备获取锁!
4:1563530114236,t2获取锁成功!
5:1563530114236,t2 notify!
6:1563530119237,t2准备释放锁!
7:1563530119237,t2释放锁成功!
8:1563530119237,t1释放锁成功!
代码结合输出的结果我们分析一下:
- 线程 t1 先获取锁,然后调用了 wait()方法将线程置为等待状态,然后会释放 lock 的锁
- 主线程等待 5 秒之后,启动线程 t2,t2 获取到了锁,结果中 1、3 行时间相差 5 秒左右
- t2 调用 lock.notify()方法,准备将等待在 lock 上的线程 t1 唤醒,notify()方法之后又休眠了 5 秒,看一下输出的 5、8 可知,notify()方法之后,t1 并不能立即被唤醒,需要等到 t2 将 synchronized 块执行完毕,释放锁之后,t1 才被唤醒
wait()方法和 notify()方法必须放在同步块内调用(synchronized 块内),否则会报错
Condition 使用简介
在了解 Condition 之前,需要先了解一下重入锁 ReentrantLock,可以移步到:JUC 中的 ReentranLock。
任何一个 java 对象都天然继承于 Object 类,在线程间实现通信的往往会应用到 Object 的几个方法,比如 wait()、wait(long timeout)、wait(long timeout, int nanos)与 notify()、notifyAll()几个方法实现等待/通知机制,同样的, 在 java Lock 体系下依然会有同样的方法实现等待/通知机制。
从整体上来看Object 的 wait 和 notify/notify 是与对象监视器配合完成线程间的等待/通知机制,而 Condition 与 Lock 配合完成等待通知机制,前者是 java 底层级别的,后者是语言级别的,具有更高的可控制性和扩展性。两者除了在使用方式上不同外,在功能特性上还是有很多的不同:Condition 能够支持不响应中断,而通过使用 Object 方式不支持
- Condition 能够支持多个等待队列(new 多个 Condition 对象),而 Object 方式只能支持一个
- Condition 能够支持超时时间的设置,而 Object 不支持
Condition 由 ReentrantLock 对象创建,并且可以同时创建多个,Condition 接口在使用前必须先调用 ReentrantLock 的 lock()方法获得锁,之后调用 Condition 接口的 await()将释放锁,并且在该 Condition 上等待,直到有其他线程调用 Condition 的 signal()方法唤醒线程,使用方式和 wait()、notify()类似。
示例代码:
package com.itsoku.chat09;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/
微信公众号:程序员路人,专注于java技术分享(带你玩转 爬虫、分布式事务、异步消息服务、任务调度、分库分表、大数据等),喜欢请关注!
/
public class Demo2 {
static ReentrantLock lock = new ReentrantLock();
static** Condition condition = lock.newCondition();
**public** **static** **class** **T1** **extends** **Thread** {<br /> @Override<br /> **public** **void** **run**() {<br /> System.out.println(System.currentTimeMillis() + "," + **this**.getName() + "准备获取锁!");<br /> lock.lock();<br /> **try** {<br /> System.out.println(System.currentTimeMillis() + "," + **this**.getName() + "获取锁成功!");<br /> condition.await();<br /> } **catch** (InterruptedException e) {<br /> e.printStackTrace();<br /> } **finally** {<br /> lock.unlock();<br /> }<br /> System.out.println(System.currentTimeMillis() + "," + **this**.getName() + "释放锁成功!");<br /> }<br /> }
**public** **static** **class** **T2** **extends** **Thread** {<br /> @Override<br /> **public** **void** **run**() {<br /> System.out.println(System.currentTimeMillis() + "," + **this**.getName() + "准备获取锁!");<br /> lock.lock();<br /> **try** {<br /> System.out.println(System.currentTimeMillis() + "," + **this**.getName() + "获取锁成功!");<br /> condition.signal();<br /> System.out.println(System.currentTimeMillis() + "," + **this**.getName() + " signal!");<br /> **try** {<br /> TimeUnit.SECONDS.sleep(5);<br /> } **catch** (InterruptedException e) {<br /> e.printStackTrace();<br /> }<br /> System.out.println(System.currentTimeMillis() + "," + **this**.getName() + "准备释放锁!");<br /> } **finally** {<br /> lock.unlock();<br /> }<br /> System.out.println(System.currentTimeMillis() + "," + **this**.getName() + "释放锁成功!");<br /> }<br /> }
**public** **static** **void** **main**(String[] args) **throws** InterruptedException {<br /> T1 t1 = **new** T1();<br /> t1.setName("t1");<br /> t1.start();<br /> TimeUnit.SECONDS.sleep(5);<br /> T2 t2 = **new** T2();<br /> t2.setName("t2");<br /> t2.start();<br /> }<br />}
输出:
1563532185827,t1准备获取锁!
1563532185827,t1获取锁成功!
1563532190829,t2准备获取锁!
1563532190829,t2获取锁成功!
1563532190829,t2 signal!
1563532195829,t2准备释放锁!
1563532195829,t2释放锁成功!
1563532195829,t1释放锁成功!
输出的结果和使用 synchronized 关键字的实例类似。
Condition.await()方法和 Object.wait()方法类似,当使用 Condition.await()方法时,需要先获取 Condition 对象关联的 ReentrantLock 的锁,在 Condition.await()方法被调用时,当前线程会释放这个锁,并且当前线程会进行等待(处于阻塞状态)。在 signal()方法被调用后,系统会从 Condition 对象的等待队列中唤醒一个线程,一旦线程被唤醒,被唤醒的线程会尝试重新获取锁,一旦获取成功,就可以继续执行了。因此,在 signal 被调用后,一般需要释放相关的锁,让给其他被唤醒的线程,让他可以继续执行。
Condition 常用方法
Condition 接口提供的常用方法有:
和 Object 中 wait 类似的方法
- void await() throws InterruptedException:当前线程进入等待状态,如果其他线程调用 condition 的 signal 或者 signalAll 方法并且当前线程获取 Lock 从 await 方法返回,如果在等待状态中被中断会抛出被中断异常;
- long awaitNanos(long nanosTimeout):当前线程进入等待状态直到被通知,中断或者超时;
- boolean await(long time, TimeUnit unit) throws InterruptedException:同第二种,支持自定义时间单位,false:表示方法超时之后自动返回的,true:表示等待还未超时时,await 方法就返回了(超时之前,被其他线程唤醒了)
- boolean awaitUntil(Date deadline) throws InterruptedException:当前线程进入等待状态直到被通知,中断或者到了某个时间
- void awaitUninterruptibly();:当前线程进入等待状态,不会响应线程中断操作,只能通过唤醒的方式让线程继续
和 Object 的 notify/notifyAll 类似的方法
- void signal():唤醒一个等待在 condition 上的线程,将该线程从等待队列中转移到同步队列中,如果在同步队列中能够竞争到 Lock 则可以从等待方法中返回。
- void signalAll():与 1 的区别在于能够唤醒所有等待在 condition 上的线程
Condition.await()过程中被打断
package com.itsoku.chat09;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/
微信公众号:程序员路人,专注于java技术分享(带你玩转 爬虫、分布式事务、异步消息服务、任务调度、分库分表、大数据等),喜欢请关注!
/
public class Demo4 {
static ReentrantLock lock = new ReentrantLock();
static** Condition condition = lock.newCondition();
**public** **static** **class** **T1** **extends** **Thread** {<br /> @Override<br /> **public** **void** **run**() {<br /> lock.lock();<br /> **try** {<br /> condition.await();<br /> } **catch** (InterruptedException e) {<br /> System.out.println("中断标志:" + **this**.isInterrupted());<br /> e.printStackTrace();<br /> } **finally** {<br /> lock.unlock();<br /> }<br /> }<br /> }
**public** **static** **void** **main**(String[] args) **throws** InterruptedException {<br /> T1 t1 = **new** T1();<br /> t1.setName("t1");<br /> t1.start();<br /> TimeUnit.SECONDS.sleep(2);<br /> //给t1线程发送中断信号<br /> System.out.println("1、t1中断标志:" + t1.isInterrupted());<br /> t1.interrupt();<br /> System.out.println("2、t1中断标志:" + t1.isInterrupted());<br /> }<br />}
输出:
1、t1中断标志:false
2、t1中断标志:true
中断标志:false
java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
at com.itsoku.chat09.Demo4$T1.run(Demo4.java:19)
调用 condition.await()之后,线程进入阻塞中,调用 t1.interrupt(),给 t1 线程发送中断信号,await()方法内部会检测到线程中断信号,然后触发InterruptedException异常,线程中断标志被清除。从输出结果中可以看出,线程 t1 中断标志的变换过程:false->true->false
await(long time, TimeUnit unit)超时之后自动返回
package com.itsoku.chat09;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/
微信公众号:程序员路人,专注于java技术分享(带你玩转 爬虫、分布式事务、异步消息服务、任务调度、分库分表、大数据等),喜欢请关注!
/
public class Demo5 {
static ReentrantLock lock = new ReentrantLock();
static** Condition condition = lock.newCondition();
**public** **static** **class** **T1** **extends** **Thread** {<br /> @Override<br /> **public** **void** **run**() {<br /> lock.lock();<br /> **try** {<br /> System.out.println(System.currentTimeMillis() + "," + **this**.getName() + ",start");<br /> **boolean** r = condition.await(2, TimeUnit.SECONDS);<br /> System.out.println(r);<br /> System.out.println(System.currentTimeMillis() + "," + **this**.getName() + ",end");<br /> } **catch** (InterruptedException e) {<br /> e.printStackTrace();<br /> } **finally** {<br /> lock.unlock();<br /> }<br /> }<br /> }
**public** **static** **void** **main**(String[] args) **throws** InterruptedException {<br /> T1 t1 = **new** T1();<br /> t1.setName("t1");<br /> t1.start();<br /> }<br />}
输出:
1563541624082,t1,start
false
1563541626085,t1,end
t1 线程等待 2 秒之后,自动返回继续执行,最后 await 方法返回 false,await 返回 false 表示超时之后自动返回
await(long time, TimeUnit unit)超时之前被唤醒
package com.itsoku.chat09;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/
微信公众号:程序员路人,专注于java技术分享(带你玩转 爬虫、分布式事务、异步消息服务、任务调度、分库分表、大数据等),喜欢请关注!
/
public class Demo6 {
static ReentrantLock lock = new ReentrantLock();
static** Condition condition = lock.newCondition();
**public** **static** **class** **T1** **extends** **Thread** {<br /> @Override<br /> **public** **void** **run**() {<br /> lock.lock();<br /> **try** {<br /> System.out.println(System.currentTimeMillis() + "," + **this**.getName() + ",start");<br /> **boolean** r = condition.await(5, TimeUnit.SECONDS);<br /> System.out.println(r);<br /> System.out.println(System.currentTimeMillis() + "," + **this**.getName() + ",end");<br /> } **catch** (InterruptedException e) {<br /> e.printStackTrace();<br /> } **finally** {<br /> lock.unlock();<br /> }<br /> }<br /> }
**public** **static** **void** **main**(String[] args) **throws** InterruptedException {<br /> T1 t1 = **new** T1();<br /> t1.setName("t1");<br /> t1.start();<br /> //休眠1秒之后,唤醒t1线程<br /> TimeUnit.SECONDS.sleep(1);<br /> lock.lock();<br /> **try** {<br /> condition.signal();<br /> } **finally** {<br /> lock.unlock();<br /> }<br /> }<br />}
输出:
1563542046046,t1,start
true
1563542047048,t1,end
t1 线程中调用condition.await(5, TimeUnit.SECONDS);方法会释放锁,等待 5 秒,主线程休眠 1 秒,然后获取锁,之后调用 signal()方法唤醒 t1,输出结果中发现 await 后过了 1 秒(1、3 行输出结果的时间差),await 方法就返回了,并且返回值是 true。true 表示 await 方法超时之前被其他线程唤醒了。
long awaitNanos(long nanosTimeout)超时返回
package com.itsoku.chat09;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/
微信公众号:程序员路人,专注于java技术分享(带你玩转 爬虫、分布式事务、异步消息服务、任务调度、分库分表、大数据等),喜欢请关注!
/
public class Demo7 {
static ReentrantLock lock = new ReentrantLock();
static** Condition condition = lock.newCondition();
**public** **static** **class** **T1** **extends** **Thread** {<br /> @Override<br /> **public** **void** **run**() {<br /> lock.lock();<br /> **try** {<br /> System.out.println(System.currentTimeMillis() + "," + **this**.getName() + ",start");<br /> **long** r = condition.awaitNanos(TimeUnit.SECONDS.toNanos(5));<br /> System.out.println(r);<br /> System.out.println(System.currentTimeMillis() + "," + **this**.getName() + ",end");<br /> } **catch** (InterruptedException e) {<br /> e.printStackTrace();<br /> } **finally** {<br /> lock.unlock();<br /> }<br /> }<br /> }
**public** **static** **void** **main**(String[] args) **throws** InterruptedException {<br /> T1 t1 = **new** T1();<br /> t1.setName("t1");<br /> t1.start();<br /> }<br />}
输出:
1563542547302,t1,start
-258200
1563542552304,t1,end
awaitNanos 参数为纳秒,可以调用 TimeUnit 中的一些方法将时间转换为纳秒。
t1 调用 await 方法等待 5 秒超时返回,返回结果为负数,表示超时之后返回的。
waitNanos(long nanosTimeout)超时之前被唤醒
package com.itsoku.chat09;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/
微信公众号:程序员路人,专注于java技术分享(带你玩转 爬虫、分布式事务、异步消息服务、任务调度、分库分表、大数据等),喜欢请关注!
/
public class Demo8 {
static ReentrantLock lock = new ReentrantLock();
static** Condition condition = lock.newCondition();
**public** **static** **class** **T1** **extends** **Thread** {<br /> @Override<br /> **public** **void** **run**() {<br /> lock.lock();<br /> **try** {<br /> System.out.println(System.currentTimeMillis() + "," + **this**.getName() + ",start");<br /> **long** r = condition.awaitNanos(TimeUnit.SECONDS.toNanos(5));<br /> System.out.println(r);<br /> System.out.println(System.currentTimeMillis() + "," + **this**.getName() + ",end");<br /> } **catch** (InterruptedException e) {<br /> e.printStackTrace();<br /> } **finally** {<br /> lock.unlock();<br /> }<br /> }<br /> }
**public** **static** **void** **main**(String[] args) **throws** InterruptedException {<br /> T1 t1 = **new** T1();<br /> t1.setName("t1");<br /> t1.start();<br /> //休眠1秒之后,唤醒t1线程<br /> TimeUnit.SECONDS.sleep(1);<br /> lock.lock();<br /> **try** {<br /> condition.signal();<br /> } **finally** {<br /> lock.unlock();<br /> }<br /> }<br />}
输出:
1563542915991,t1,start
3999988500
1563542916992,t1,end
t1 中调用 await 休眠 5 秒,主线程休眠 1 秒之后,调用 signal()唤醒线程 t1,await 方法返回正数,表示返回时距离超时时间还有多久,将近 4 秒,返回正数表示,线程在超时之前被唤醒了。
其他几个有参的 await 方法和无参的 await 方法一样,线程调用 interrupt()方法时,这些方法都会触发 InterruptedException 异常,并且线程的中断标志会被清除。
同一个锁支持创建多个 Condition
使用两个 Condition 来实现一个阻塞队列的例子:
package com.itsoku.chat09;
import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/
微信公众号:程序员路人,专注于java技术分享(带你玩转 爬虫、分布式事务、异步消息服务、任务调度、分库分表、大数据等),喜欢请关注!
/
public class BlockingQueueDemo<E> {
int** size;//阻塞队列最大容量
ReentrantLock lock = **new** ReentrantLock();
LinkedList<E> list = **new** LinkedList<>();//队列底层实现
Condition notFull = lock.newCondition();//队列满时的等待条件<br /> Condition notEmpty = lock.newCondition();//队列空时的等待条件
**public** **BlockingQueueDemo**(**int** size) {<br /> **this**.size = size;<br /> }
**public** **void** **enqueue**(E e) **throws** InterruptedException {<br /> lock.lock();<br /> **try** {<br /> **while** (list.size() == size)//队列已满,在notFull条件上等待<br /> notFull.await();<br /> list.add(e);//入队:加入链表末尾<br /> System.out.println("入队:" + e);<br /> notEmpty.signal(); //通知在notEmpty条件上等待的线程<br /> } **finally** {<br /> lock.unlock();<br /> }<br /> }
**public** E **dequeue**() **throws** InterruptedException {<br /> E e;<br /> lock.lock();<br /> **try** {<br /> **while** (list.size() == 0)//队列为空,在notEmpty条件上等待<br /> notEmpty.await();<br /> e = list.removeFirst();//出队:移除链表首元素<br /> System.out.println("出队:" + e);<br /> notFull.signal();//通知在notFull条件上等待的线程<br /> **return** e;<br /> } **finally** {<br /> lock.unlock();<br /> }<br /> }
**public** **static** **void** **main**(String[] args) {<br /> BlockingQueueDemo<Integer> queue = **new** BlockingQueueDemo<>(2);<br /> **for** (**int** i = 0; i < 10; i++) {<br /> **int** data = i;<br /> **new** Thread(**new** Runnable() {<br /> @Override<br /> **public** **void** **run**() {<br /> **try** {<br /> queue.enqueue(data);<br /> } **catch** (InterruptedException e) {
}<br /> }<br /> }).start();<br /> }<br /> **for** (**int** i = 0; i < 10; i++) {<br /> **new** Thread(**new** Runnable() {<br /> @Override<br /> **public** **void** **run**() {<br /> **try** {<br /> Integer data = queue.dequeue();<br /> } **catch** (InterruptedException e) {<br /> e.printStackTrace();<br /> }<br /> }<br /> }).start();<br /> }<br /> }<br />}
代码非常容易理解,创建了一个阻塞队列,大小为 3,队列满的时候,会被阻塞,等待其他线程去消费,队列中的元素被消费之后,会唤醒生产者,生产数据进入队列。上面代码将队列大小置为 1,可以实现同步阻塞队列,生产 1 个元素之后,生产者会被阻塞,待消费者消费队列中的元素之后,生产者才能继续工作。
Object 的监视器方法与 Condition 接口的对比
对比项 | Object 监视器方法 | Condition |
---|---|---|
前置条件 | 获取对象的锁 | 调用 Lock.lock 获取锁,调用 Lock.newCondition()获取 Condition 对象 |
调用方式 | 直接调用,如:object.wait() | 直接调用,如:condition.await() |
等待队列个数 | 一个 | 多个,使用多个 condition 实现 |
当前线程释放锁并进入等待状态 | 支持 | 支持 |
当前线程释放锁进入等待状态中不响应中断 | 不支持 | 支持 |
当前线程释放锁并进入超时等待状态 | 支持 | 支持 |
当前线程释放锁并进入等待状态到将来某个时间 | 不支持 | 支持 |
唤醒等待队列中的一个线程 | 支持 | 支持 |
唤醒等待队列中的全部线程 | 支持 | 支持 |
总结
- 使用 condition 的步骤:创建 condition 对象,获取锁,然后调用 condition 的方法
- 一个 ReentrantLock 支持多个 condition 对象
- void await() throws InterruptedException;方法会释放锁,让当前线程等待,支持唤醒,支持线程中断
- void awaitUninterruptibly();方法会释放锁,让当前线程等待,支持唤醒,不支持线程中断
- long awaitNanos(long nanosTimeout) throws InterruptedException;参数为纳秒,此方法会释放锁,让当前线程等待,支持唤醒,支持中断。超时之后返回的,结果为负数;超时之前返回的,结果为正数(表示返回时距离超时时间相差的纳秒数)
- boolean await(long time, TimeUnit unit) throws InterruptedException;方法会释放锁,让当前线程等待,支持唤醒,支持中断。超时之后返回的,结果为 false;超时之前返回的,结果为 true
- boolean awaitUntil(Date deadline) throws InterruptedException;参数表示超时的截止时间点,方法会释放锁,让当前线程等待,支持唤醒,支持中断。超时之后返回的,结果为 false;超时之前返回的,结果为 true
- void signal();会唤醒一个等待中的线程,然后被唤醒的线程会被加入同步队列,去尝试获取锁
- void signalAll();会唤醒所有等待中的线程,将所有等待中的线程加入同步队列,然后去尝试获取锁