我们使用的是多线程并发
一个进程可以由很多线程
进程有自己的地址空间,不会影响其他进程。但是使用进程作为并发方式有数量和开销的限制。
Java的线程机制是抢占式的,这表示调度机制会周期性地中断线程,将上下文切换到另一个线程,从而为每个线程都提供时间片,使得每个线程都会分配到数量合理的时间去驱动它的任务。
互斥同步
互斥和同步是一种最常见、最主要的实现并发正确性(相对线程安全)的保障手段。
同步:指的是多个线程并发访问共享数据时,保证共享数据在同一个时刻只能被一个线程使用。
互斥:指的是实现同步的一种手段,临界区互斥量和信号量都是主要的互斥实现方式。
在java中,最基本的互斥同步手段就是synchronized关键字。synchronized可以保证线程竞争共享资源的正确性(多个线程并发访问共享数据时,保证共享数据在同一个时刻只能被一个线程使用)。
基本的线程机制
一个线程就是在进程中的一个单一的顺序控制流,因此,单个进程可以拥有多个并发执行的任务,但是你的程序使得每个任务都好像有自己的CPU一样。底层机制是切分CPU时间片,但我们通常不回去考虑它。
定义任务
线程可以驱动任务,因此你需要一种描述任务的方式。
描述任务 只需要 实现Runnable接口并编写它的run()方法,可以让该任务执行你的命令
public class LiftOff implements Runnable {
protected int countDown =10;
private static int taskDown =0;
private final int id = taskDown++;
public LiftOff(){}
public LiftOff(int countDown){
this.countDown=countDown;
}
public String str() {
return "#"+id+" ("+
(countDown > 0? countDown :"发射!")+")";
}
@Override
public void run() {
while (countDown-->0){
System.out.println(str());
//对于Thread.yield()的调用是对线程调度器的一种建议。告诉CPU可以将一个线程转移给另一个线程
Thread.yield();
}
}
}
从Runnable导出一个类,必须实现run()方法,但是要想实现线程的行为,你必须显示的把一个任务附着到线程上
Thread类
将Runnable对象转变为工作任务的常规方式是把它提交给一个Thread构造器
Thread构造器可以只接受一个Runnable对象.
线程的调度机制是非确定性的,因此含有线程的程序每次运行的结果可能不同.
public class Main {
public static void main(String[] args) throws InterruptedException {
Thread t = new Thread(new LiftOff);
t.start();
}
}
Thread.start()方法:
使线程开始执行;Java虚拟机调用该线程的run方法。
结果是两个线程并发运行:当前线程(从调用start方法返回)和另一个线程(执行其run方法)。
t Main
使用Executor
Java SE5中的java.util.concurrent包中的执行器(Executor)将为你管理Thread对象从而简化了并发编程.
Executor在客户端和任务执行之间提供了一个间接层;与客户端直接执行任务不同,这个中介对象将执行任务,Executor允许你管理异步任务的执行,而无须显式地管理线程的生命周期.
Executor在Java SE5/6中是启动任务的优选方法.
CachedThreadPool
线程数根据任务动态调整的线程池;
final LiftOff target = new LiftOff();
for (int i = 0; i < 3; i++) {
// TimeUnit.SECONDS.sleep(1);
new Thread(target).start(); //三个线程抢占一个target,会导致问题
}
// Executors.newCachedThreadPool().execute(new LiftOff());
ExecutorService service = Executors.newCachedThreadPool();
service.execute(new LiftOff());
service.shutdown();
等待所有线程任务执行完成后,再关闭程序。但是并不阻塞main方法的执行。
shutdown()方法可以防止新任务提交给这个Executor,当前线程(上例中的Main线程) 将继续执行shutdown()被调用前提交的所有任务。 这个程序将在Executor中所有任务完成之后尽快的退出。
线程池在程序结束的时候要关闭。
shutdown()方法关闭线程池的时候,它会等待正在执行的任务先完成,然后再关闭。
shutdownNow()会立刻停止正在执行的任务,
awaitTermination()则会等待指定的时间让线程池关闭。
如果我们想把线程池的大小限制在4~10个之间动态调整怎么办?我们查看Executors.newCachedThreadPool()方法的源码:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
因此,想创建指定动态范围的线程池,可以这么写:
int min = 4;
int max = 10;
ExecutorService es = new ThreadPoolExecutor(min, max,
60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
FixedThreadPool
- 线程数固定的线程池;
FixedThreadPool使用了有限的线程集来执行所提交的任务.
//Fixed 固定的
//你的线程池中的线程是固定的数量 现有的线程会自动被复用
ExecutorService executorService = Executors.newFixedThreadPool(5);
for (int i = 0; i < 20; i++) {
executorService.execute(new LiftOff());
}
executorService.shutdown();
有了FixedThreadPool,你就可以一次性预先执行代价高昂的线程分配,因而也就可以限制线程的数量了.
通过使用FixedThreadPool使用的Thread对象的数量是有界的.
在任何线程池中,现有线程在可能的情况下,都会被自动复用
CachedThreadPool在程序执行过程中通常会创建与所需数量相同的线程,然后在它回收线程时停止创建新线程.因此它是合理的Executor的首选.
当第一个任务执行完毕时,此时第n个以后任务还没有开创线程,将复用其他的线程.
SingleThreadExecutor
:仅单线程执行的线程池。
SingleThreadExecutor就像是线程数量为1的FixedThreadPool.
如果向SingleThreadExecutor提交多个任务,那么这些任务将排队,每个任务都会在下一个任务开始之前运行结束.所有的任务都将使用相同的线程.
SingleThreadExecutor会序列化所有提交给它的任务,并会维护它自己(隐藏)的悬挂任务队列.
SingleThreadExecutor运行线程,可以确保在任意时刻现场中都只有唯一的任务在运行.
SingleThreadExecutor可以让你省去只是为了维持某些事物的原型而进行的各种协调努力.通过序列化任务,你可以消除对序列化对象的需求.
ScheduledThreadPool
还有一种任务,需要定期反复执行,例如,每秒刷新证券价格。这种任务本身固定,需要反复执行的,可以使用ScheduledThreadPool。放入ScheduledThreadPool的任务可以定期反复执行。
创建一个ScheduledThreadPool仍然是通过Executors类:
ScheduledExecutorService ses = Executors.newScheduledThreadPool(4);
我们可以提交一次性任务,它会在指定延迟后只执行一次:
// 1秒后执行一次性任务:
ses.schedule(new Task("one-time"), 1, TimeUnit.SECONDS);
如果任务以固定的每3秒执行,我们可以这样写:
// 2秒后开始执行定时任务,每3秒执行:
ses.scheduleAtFixedRate(new Task("fixed-rate"), 2, 3, TimeUnit.SECONDS);
如果任务以固定的3秒为间隔执行,我们可以这样写:
// 2秒后开始执行定时任务,以3秒为间隔执行:
ses.scheduleWithFixedDelay(new Task("fixed-delay"), 2, 3, TimeUnit.SECONDS);
Java标准库还提供了一个java.util.Timer类,这个类也可以定期执行任务,但是,一个Timer会对应一个Thread,所以,一个Timer只能定期执行一个任务,多个定时任务必须启动多个Timer,而一个ScheduledThreadPool就可以调度多个定时任务,所以,我们完全可以用ScheduledThreadPool取代旧的Timer。
Callable接口
runnable是执行工作的独立接口,它不返回任何值。如果希望任务完成时可以返一个值,可以使用Callable接口
在JavaSE5 中引入的callable接口是一种带有类型参数的泛型,他的类型参数表示的是从call()方法中返回的值而不是run()方法,并且必须用ExecutorService.submit()调用
class TaskWithResult implements Callable {
private int id;
public TaskWithResult(int id) {
this.id = id;
}
public String call() throws InterruptedException {
/*for (int i = 0; i < 5; i++) {
TimeUnit.SECONDS.sleep(1);
System.out.println("is run");
}*/
return "结果是:" + id;
}
}
public class CallableDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService exe = Executors.newCachedThreadPool();
final Future submit = exe.submit(new TaskWithResult(1));
TimeUnit.MILLISECONDS.sleep(1); //这里加休眠是为了给编译器时间让他知道submbit.isDone
if (submit.isDone()) {
System.out.println(submit.get());
}
exe.shutdown();
}
结果是 1
}
submit()方法会产生Future对象,它用callable返回结果的特定类型进行了参数化。
你可以使用 isDone()方法来查询Future是否已经完成。
当任务完成时它具有一个结果,你可以调用get()方法来获取结果
submit()方法相当于start()方法,前者调用了call(),后者调用了run()
get()方法是为了拿到call()方法中的return
休眠
给定任务中止的时间
对sleep()的调用可以抛出InterruptedException异常,
因为异常不能跨线程传播回main(),所以你必须处理所有在任务内部产生的异常
旧方法
thread.sleep();
新方法
TimeUnit.MILLISECONDS.sleep();
让步
调用yield()方法,就是建议让具有相同优先级的其他线程可以运行。但是没有任何机制会保证它会被采纳
后台线程
所谓后台线程(daemon),是指程序运行的时候在后台提供的一种通用的服务的线程,并且这种线程并不属于程序中不可或缺的部分
当所有的非后台线程结束时,程序也就终止了,同时会杀死进程中所有的后台线程
public class SimpleDaemons implements Runnable{
@Override
public void run() {
/*while (true){
System.out.println(Thread.currentThread() + " " + this);
}*/
try {
//下面已经设置为后台线程,这里延迟大于主线程的175延迟的话,就什么也不会输出
TimeUnit.MILLISECONDS.sleep(100);
System.out.println(Thread.currentThread() + " " + this);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 4; i++) {
final Thread thread = new Thread(new SimpleDaemons());
thread.setDaemon(true);
thread.start();
}
System.out.println("所有线程开始");
TimeUnit.MILLISECONDS.sleep(175);
}
}
必须在线程启动之前调用setDaemon()方法,才能把它设置为后台线程
后台线程工厂
public class DaemonThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
}
它和普通的ThreadFactory区别就是将后台线程状态全部设置为了true。
现在可以用一个DaemonThreadFactory作为参数传递给线程池
public class DaemonFromFactory implements Runnable{
@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(100);
System.out.println(Thread.currentThread()+" "+this);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool(new DaemonThreadFactory());
for (int i = 0; i < 5; i++) {
exec.execute(new DaemonFromFactory());
}
System.out.println("所有后台线程开始");
TimeUnit.MILLISECONDS.sleep(500);
}
}
后台进程不执行finally语句情况下就会终止其run()方法
class ADaemon implements Runnable {
@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(100);
System.out.println("开始后台线程中。。。");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("Is this always should running?");
}
}
}
public class DaemonsDontRunFinally {
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(new ADaemon());
thread.setDaemon(true);
thread.start();
TimeUnit.MILLISECONDS.sleep(120);
}
}
编码的变体
1.继承Thread 在构造器中start 即可创建的实例具有线程行为
public class SimpleThread extends Thread {
private int countDown = 5;
private static int threadCount = 0;
public SimpleThread() {
//传入线程名字
super(Integer.toString(++threadCount));
//当调用构造器时,即启动了线程 不用再放到thread中了
start();
}
public String toString() {
return "#" + getName() + "(" + countDown + "), ";
}
public void run() {
while (true) {
System.out.print(this);
if (--countDown == 0)
return;
}
}
public static void main(String[] args) {
for (int i = 0; i < 5; i++)
new SimpleThread();
}
}
2.采用实现Runnable的方式 保留继承的能力
public class SelfManaged implements Runnable {
//实现Runnable接口 内部持有一个线程对象 将自身引用传入 保留继承能力
private int countDown = 5;
private Thread t = new Thread(this);
public SelfManaged() {
t.start();
}
public String toString() {
return Thread.currentThread().getName() + "(" + countDown + "), ";
}
public void run() {
while (true) {
System.out.print(this);
if (--countDown == 0)
return;
}
}
public static void main(String[] args) {
for (int i = 0; i < 5; i++)
new SelfManaged();
}
}
隐患: 在构造器中启动线程,如果另一个任务可能在构造器结束之前开始执行,这意味着该任务能够访问处于不稳定状态的对象.(即访问未完成初始化操作)
而选择使用Executor不会产生这个隐患.
3.内部类的形式
class ThreadMethod {
private int countDown = 5;
private Thread t;
private String name;
public ThreadMethod(String name) {
this.name = name;
}
public void runTask() {
if (t == null) {
t = new Thread(name) {
public void run() {
try {
while (true) {
System.out.println(this);
if (--countDown == 0) return;
sleep(10);
}
} catch (InterruptedException e) {
System.out.println("sleep() 被打断了");
}
}
public String toString() {
return getName() + ": " + countDown;
}
};
t.start();
}
}
}
public class InnerDemo {
public static void main(String[] args) {
ThreadMethod method = new ThreadMethod("a");
method.runTask();
}
}
加入一个线程
一个线程可以在另一个线程上调用join()方法
如果一个线程(A)在另一个线程上(B)调用B.join方法,该线程会被挂起(A),等到B线程结束才恢复
也可以在调用join()时加入一个超时参数,这样如果目标线程在这段时间还没结束,join()方法总能返回
class Sleeper extends Thread {
private int duration;
public Sleeper(String name, int sleepTime) {
super(name);
duration = sleepTime;
start();
}
public void run() {
try {
sleep(duration);
} catch (InterruptedException e) {
System.out.println(getName()+"被打断了吗?-->"+interrupted());
}
System.out.println(getName()+"现在唤醒了");
}
}
class Joiner extends Thread {
private Sleeper sleeper;
public Joiner(String name, Sleeper sleeper) {
super(name);
this.sleeper = sleeper;
start();
}
public void run() {
try {
sleeper.join();
} catch (InterruptedException e) {
print("Interrupted");
}
print(getName() + " 加入已经完成");
}
}
public class Joining {
public static void main(String[] args) {
//Sleeper sleepy = new Sleeper("Sleepy", 1500);
//Joiner dopey = new Joiner("Dopey", sleepy);
Sleeper grumpy = new Sleeper("Grumpy", 1500);
Joiner doc = new Joiner("Doc", grumpy);
grumpy.interrupt();
}
}
捕获异常
由于线程的本质特性,使得你不能捕获从线程中逃逸的异常.
class ExceptionThread implements Runnable {
public void run() {
//此处没有用try-catch包裹 异常将逃出run()方法 其他地方无法捕获
throw new RuntimeException();
}
}
public class NaiveExceptionHandling {
public static void main(String[] args) {
try {
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new ExceptionThread());
} catch (RuntimeException ue) {
//捕获语句不会执行 因为异常直接传播到控制台
System.out.println("Exception has been handled!");
}
}
}
Thread.UncaughtExceptionHandler是JavaSE 5 中的新接口,它允许你在每个Thread对象上都附着一个异常处理器.
Thread.UncaughtExceptionHandle.uncaughtException()会在线程因未捕获的异常而临近死亡时被调用.
为了调用uncaughtException(),我们创建了一个新的ThreadFactory,为每个新创建的Thread对象上附着一个
Thread.UncaughtExceptionHandler
class ExceptionThread implements Runnable{
@Override
public void run() {
final Thread t = Thread.currentThread();
System.out.println("当前线程是"+t+" "+"en = "+t.getUncaughtExceptionHandler());
System.out.println("---------下面抛出异常---------");
throw new RuntimeException();
}
}
class MyUncaughtException implements Thread.UncaughtExceptionHandler{
// run()方法未捕获的异常在此处捕获
@Override
public void uncaughtException(Thread t, Throwable e) {
System.out.println("捕获的异常为"+e);
}
}
class HandlerThreadFactory implements ThreadFactory{
//这个类的作用就是调用 Thread.UncaughtExceptionHandler.uncaughtException方法
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
//System.out.println("创建了线程:" + t);
//设置当此线程因未捕获的异常而突然终止时调用的处理程序
//参数:这个对象用作这个线程的未捕获异常处理程序。如果为空,则此线程没有显式处理程序。
t.setUncaughtExceptionHandler(new MyUncaughtException());
System.out.println("en--> "+t.getUncaughtExceptionHandler());
return t;
}
}
public class CaptureUncaught {
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool(new HandlerThreadFactory());
exec.execute(new ExceptionThread());
}
}
/*
想要捕获线程中的异常,需要实现Thread.UncaughtExceptionHandler接口,
接口中的uncaughtException可以捕获异常
为了调用uncaughtException,我们要在ThreadFactory接口中 设置捕获异常的处理程序,
同时还可以get到该异常处理程序
为什么要用ThreadFactory接口? 因为创建线程池的构造器中可以传入该接口的实现类
* */
class MyException implements Thread.UncaughtExceptionHandler{
@Override
public void uncaughtException(Thread t, Throwable e) {
System.out.println("被捕获的异常: "+e);
}
}
class Factory implements ThreadFactory{
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setUncaughtExceptionHandler(new MyException());
//问题 为什么会执行两遍?
System.out.println(thread.getUncaughtExceptionHandler());
return thread;
}
}
public class CaughtException {
public static void main(String[] args) {
class Exception implements Runnable{
@Override
public void run() {
Thread t = Thread.currentThread();
System.out.println("当前线程是"+t);
throw new RuntimeException();
}
}
ExecutorService exec = Executors.newCachedThreadPool(new Factory());
exec.execute(new Exception());
/* //下面的代码没有设置异常捕获器
Thread thread = new Thread(new Exception());
thread.start();*/
}
}
共享受限的资源
可重入锁
可重入锁就是一个线程中可以多次获得同一把锁
即: 一个线程在执行一个带锁的方法,该方法中又调用了另一个需要相同锁的方法,
则该线程可以直接执行调用的方法,而无需重新获得锁;
在java中 ReentrantLock 和 synchronized都是可重入锁
public class ReentrantLockDemo {
//创建一个可重入锁
private Lock lock = new ReentrantLock();
public synchronized void method1(){
System.out.println(Thread.currentThread().getName()+"执行了method1");
method2();
}
public synchronized void method2(){
System.out.println(Thread.currentThread().getName()+"执行了method2");
}
public void method3() {
try {
lock.tryLock(3,TimeUnit.SECONDS);
} catch (InterruptedException e) {
System.out.println(e);;
}
try {
System.out.println(Thread.currentThread().getName()+"执行了method3");
//因为在一个线程中 方法4 和方法3 使用的锁是一样的
method4();
}finally {
lock.unlock();
}
}
public void method4(){
lock.lock();
try {
System.out.println(Thread.currentThread().getName()+"执行了method4");
}finally {
lock.unlock();
}
}
public static void main(String[] args) {
ReentrantLockDemo lockDemo = new ReentrantLockDemo();
new Thread(()->{
lockDemo.method1();
},"t1").start();
//方法构造器没有参数可以这样写
new Thread(lockDemo::method3,"t2").start();
/*和上面写法效果一样
new Thread(new Runnable() {
@Override
public void run() {
lockDemo.method1();
}
},"t1").start();*/
}
}
t1执行了method1
t1执行了method2
t2执行了method3
t2执行了method4
使用显式的Lock对象
1.Lock对象相当于一个对象锁,但可以控制获得锁和释放锁的位置
private Lock lock = new ReentrantLock();
public int next() {
lock.lock();
try {
++currentEvenValue;
Thread.yield(); // Cause failure faster
++currentEvenValue;
return currentEvenValue;
} finally {
lock.unlock();
}
}
必须使用try-finally语句确保锁的释放
2.ReentrantLock 允许尝试获取锁但最终未获取锁
这样如果其他人已经获取了这个锁,那么你就可以决定离开去执行其他一些事情,而不是等待直至这个锁被释放.
private ReentrantLock lock = new ReentrantLock();
public void untimed() {
//tryLock()进在锁没有被其他线程持有时获取锁
boolean captured = lock.tryLock();
try {
System.out.println("tryLock(): " + captured);
} finally {
if (captured)
lock.unlock();
}
}
public void timed() {
boolean captured = false;
try {
//尝试获取锁的时间 超过抛错
captured = lock.tryLock(2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
try {
System.out.println("tryLock(2, TimeUnit.SECONDS): " +captured);
} finally {
if (captured)
lock.unlock();
}
}
显式的Lock对象在加锁和释放锁方面,相对于内建的synchronized锁来说,还赋予了更细粒度的控制力.
小结
ReentrantLock可以替代synchronized进行同步;
ReentrantLock获取锁更安全;线程在tryLock()失败的时候不会导致死锁。
必须先获取到锁,再进入try {…}代码块,最后使用finally保证释放锁;
可以使用tryLock()尝试获取锁。
原子类
对于常规编程来说,他们很少会用,但是在涉及性能调优的时候,他们会有大用场
原子操作 不存在中间状态,要么 发生 要么没有发生.
原子性可以应用于除long和double之外的所有基本类型之上的”简单操作”.
JVM可以将64位(long和double变量)的读取和写入当作两个分离的32位操作来执行.这就产生了在一个读取和写入操作中间发生上下文切换,从而导致不同的任务可以看到不正确结果的可能性.(字撕裂)
如果使用volatile关键字,就会获得(简单的赋值与返回操作的)原子性.
volatile关键字还确保了应用中的可视性.如果你将一个域声明为volatile的,那么只要对这个域产生了写操作,那么所有的读操作都可以看到这个修改.即便使用了本地缓存,volatile域会被立即被写入到主存中,而读取操作就发生在主存中.
javaSE5 引入了像 AtomicInteger、 AtomicLong 、AtomicReference 等特殊原子类的变量类
他们提供下面形式的原子性条件更新操作
如果当前值==预期值,则自动将该值设置为给定的更新值。
参数: 期望-期望值
Update -新值
返回:
如果成功。False return表示实际值与期望值不相等
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
这些类被调整为可以使用在某些现代处理器上的可获得的,并且是在机器级别上的原子性
例如:用AtomicInteger 来重写 AtomicityTest
public class AtomcityTest1 implements Runnable {
private int i =0;
public int getValue(){return i;}
private synchronized void evenIncrement(){++i;++i;}
@Override
public void run() {
while (true) {
evenIncrement();
}
}
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
AtomcityTest1 at = new AtomcityTest1();
exec.execute(at);
while (true){
int value = at.getValue();
if (value%2 != 0){
System.out.println(value);
System.exit(0);
}
}
}
}
会输出一个 奇数
//利用原子性操作改进代码
public class AtomcityTest1 implements Runnable{
private AtomicInteger i = new AtomicInteger(0);
public int getValue() {
return i.get();
}
private void evenIncrement(){
i.addAndGet(2);
}
@Override
public void run() {
while (true){
evenIncrement();
}
}
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
AtomcityTest1 at = new AtomcityTest1();
exec.execute(at);
while (true){
int value = at.getValue();
if (value%2 !=0){
System.out.println(value+"不是偶数");
System.exit(0);
}
}
}
}
不会输出任何奇数,因为是原子性的
volatile
在Java虚拟机中,变量的值保存在主内存中,但是,当线程访问变量时,它会先获取一个副本,并保存在自己的工作内存中。如果线程修改了变量的值,虚拟机会在某个时刻把修改后的值回写到主内存,但是,这个时间是不确定的!
这会导致如果一个线程更新了某个变量,另一个线程读取的值可能还是更新前的。
例如,主内存的变量a = true,线程1执行a = false时,它在此刻仅仅是把变量a的副本变成了false,
主内存的变量a还是true,在JVM把修改后的a回写到主内存之前,
其他线程读取到的a的值仍然是true,这就造成了多线程之间共享的变量不一致。
因此,volatile关键字的目的是告诉虚拟机:
- 每次访问变量时,总是获取主内存的最新值;
- 每次修改变量后,立刻回写到主内存。
volatile关键字解决的是可见性问题:当一个线程修改了某个共享变量的值,其他线程能够立刻看到修改后的值。
synchronized
在java中,最基本的互斥同步手段就是synchronized关键字。
synchronized可以保证线程竞争共享资源的正确性
(多个线程并发访问共享数据时,保证共享数据在同一个时刻只能被一个线程使用)。
synchronized特性
原子性:确保线程互斥的访问同步代码。synchronized保证只有一个线程拿到锁,进入同步代码块操作共享资源,因此具有原子性。
可见性:保证共享变量的修改能够及时课件。执行synchronized时,会对应执行lock,unlock原子操作。lock操作,就会清空工作空间该变量的值;执行unlock操作之前,必须先把变量同步回主内存中。
有序性:synchronized内的代码和外部的代码禁止排序,至于内部的代码,则不会禁止排序,但是由于只有一个线程进入同步代码块,因此在同步代码块中相当于是单线程的,根据as-if-serial语义,即使代码块内部发生了重排序,也不会影响程序执行的结果。
悲观锁:synchronized是悲观锁,每次使用共享资源时都认为会和其他线程产生竞争,所以每次使用共享资源都会上锁。
独占锁:synchronized是独占锁,该锁一次只能被一个线程所持有,其他线程被阻塞。
非公平锁:synchronized是非公平锁,线程获取锁的顺序可以不按照线程的阻塞顺序,允许线程发出请求后立即尝试获取锁。
可重入锁:synchronized是可重入锁。持锁线程可以再次获取自己的内部的锁。
悲观锁 or 乐观锁:是否一定要锁。
共享锁 or 独占锁(排他锁):是否可以有多个线程同时拿锁。
公平锁 or 非公平锁:是否按阻塞顺序拿锁。
可重入锁 or 不可重入锁: 拿锁线程是否可以多次拿锁。
实例锁
作用于给当前实例加锁,进入同步代码前要获得当前实例的锁。
(注意:是当前实例,一个类有N个实例,一个实例有一把锁)。
Java以提供关键字synchronized的形式,为防止资源冲突提供了内置支持.当任务要执行被synchronized关键字保护的代码片段的时候,它将检查锁是否可用,然后获取锁,执行代码,释放锁.
所有的对象都自动含有单一的锁(也称为监视器).当在对象上调用其任意synchronized方法的时候,此对象都被加锁.这时该对象的其他synchronized方法只有等到前一个方法调用完毕并释放了锁之后才能被调用.
所以使用对象锁的情况,只有使用同一实例的线程才会受锁的影响,
多个实例调用同一方法也不会受影响。
在其他对象上的同步:
两个任务可以同时进入一个对象,只要这个对象上的方法是在不同的锁上同步的即可。
(即多个实例调用同一方法也不会受影响。)
(下面例子中的 f() 方法 和 m() 方法 )
class InsLock {
private Object insLock = new Object(); //成员变量,方法中的每个实例都拥有
//实例锁
/*synchronized*/ private void f() {
synchronized (this) { //和synchronized private void f()等价
try {
System.out.println("当前线程为:" + Thread.currentThread().getName() + " " +
System.currentTimeMillis() + "执行f()方法");
TimeUnit.SECONDS.sleep(2);
System.out.println("当前线程为:" + Thread.currentThread().getName() + " " +
System.currentTimeMillis() + "离开f()方法");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
//对象锁
//同一个实例 中m()方法 和 f()方法可以同时执行,不会出现争抢资源
//因为m方法 被对象锁 锁住 , f方法被实例锁 锁住,它们不是同一把锁
private void m() {
synchronized (insLock) { //和synchronized (this.insLock)等价
try {
System.out.println("当前线程为:" + Thread.currentThread().getName() + " " +
System.currentTimeMillis() + "执行m()方法");
TimeUnit.SECONDS.sleep(2);
System.out.println("当前线程为:" + Thread.currentThread().getName() + " " +
System.currentTimeMillis() + "离开m()方法");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
synchronized private void g() {
try {
System.out.println("当前线程为:" + Thread.currentThread().getName() +" "+
System.currentTimeMillis() + "执行g()方法");
TimeUnit.SECONDS.sleep(2);
System.out.println("当前线程为:" + Thread.currentThread().getName() +" "+
System.currentTimeMillis() + "离开g()方法");
} catch (InterruptedException e) {
e.printStackTrace();
}
}public static void main(String[] args) {
InsLock str = new InsLock();
InsLock str2 = new InsLock();
new Thread(str::f).start();
new Thread(str2::g).start();
//换成类锁
/*new Thread(()->{
f();
},"t1").start();
new Thread(()->{
g();
},"t2").start();*/
}
}
直接加在方法上
class InLock{
synchronized private void f(){}
}
作用于代码块
private void f(){
synchronized (this) {
//方法
}
}
3.作用于静态方法
表示找类锁,类锁永远只有一把,就算创建了100个对象,那类锁也只有一把。
对象锁:一个对象一把锁,
类锁:100个对象,也是一把锁。
class InLock{
synchronized static private void f(){}
}
类锁的实现
1.通过对Class对象上锁
class InLock{
private void f(){
synchronized (InsLock.class) {
//方法
}
}
}
2.通过static对象锁
同 实例锁 作用于static方法上
死锁
一个线程可以获取一个锁后,再继续获取另一个锁。例如:
public void add(int m) {
synchronized(lockA) { // 获得lockA的锁
this.value += m;
synchronized(lockB) { // 获得lockB的锁
this.another += m;
} // 释放lockB的锁
} // 释放lockA的锁
}
public void dec(int m) {
synchronized(lockB) { // 获得lockB的锁
this.another -= m;
synchronized(lockA) { // 获得lockA的锁
this.value -= m;
} // 释放lockA的锁
} // 释放lockB的锁
}
在获取多个锁的时候,不同线程获取多个不同对象的锁可能导致死锁。
对于上述代码,线程1和线程2如果分别执行add()和dec()方法时:
- 线程1:进入add(),获得lockA;
- 线程2:进入dec(),获得lockB。
随后:
- 线程1:准备获得lockB,失败,等待中;
- 线程2:准备获得lockA,失败,等待中。
此时,两个线程各自持有不同的锁,然后各自试图获取对方手里的锁,造成了双方无限等待下去,这就是死锁。
死锁发生后,没有任何机制能解除死锁,只能强制结束JVM进程。
那么我们应该如何避免死锁呢?答案是:线程获取锁的顺序要一致。即严格按照先获取lockA,再获取lockB的顺序,改写dec()方法如下:
public void dec(int m) {
synchronized(lockA) { // 获得lockA的锁
this.value -= m;
synchronized(lockB) { // 获得lockB的锁
this.another -= m;
} // 释放lockB的锁
} // 释放lockA的锁
}
线程之间的协作
使用wait和notify
synchronized并没有解决多线程协调的问题。例如
class TaskQueue {
Queue<String> queue = new LinkedList<>();
public synchronized void addTask(String s) {
this.queue.add(s);
}
public synchronized String getTask() {
while (queue.isEmpty()) {
}
return queue.remove();
}
}
上述代码看上去没有问题:getTask()内部先判断队列是否为空,如果为空,就循环等待,直到另一个线程往队列中放入了一个任务,while()循环退出,就可以返回队列的元素了。
但实际上while()循环永远不会退出。因为线程在执行while()循环时,已经在getTask()入口获取了this锁,其他线程根本无法调用addTask(),因为addTask()执行条件也是获取this锁。
因此,执行上述代码,线程会在getTask()中因为死循环而100%占用CPU资源。
我们想要的执行效果是:
- 线程1可以调用addTask()不断往队列中添加任务;
- 线程2可以调用getTask()从队列中获取任务。如果队列为空,则getTask()应该等待,直到队列中至少有一个任务时再返回。
多线程协调运行的原则就是:当条件不满足时,线程进入等待状态;
当条件满足时,线程被唤醒,继续执行任务。
wait()方法的执行机制非常复杂。
首先,它不是一个普通的Java方法,而是定义在Object类的一个native方法,也就是由JVM的C代码实现的。
其次,必须在synchronized块中才能调用wait()方法,因为wait()方法调用时,会释放线程获得的锁,wait()方法返回后,线程又会重新试图获得锁。
在wait()期间对象的锁是释放的
可以通过notify() 、notifyAll()、或者令时间到期,从wait()中恢复执行
改进上面的代码
public synchronized String getTask() {
while (queue.isEmpty()) {
// 释放this锁:
this.wait();
// 重新获取this锁
}
return queue.remove();
}
public synchronized void addTask(String s) {
this.queue.add(s);
this.notify(); // 唤醒在this锁等待的线程
}
注意到在往队列中添加了任务后,线程立刻对this锁对象调用notify()方法,
这个方法会唤醒一个正在this锁等待的线程(就是在getTask()中位于this.wait()的线程),
从而使得等待线程从this.wait()方法返回。
notify()只会唤醒其中一个(具体哪个依赖操作系统,有一定的随机性)。
这是因为可能有多个线程正在getTask()方法内部的wait()中等待,
使用notifyAll()将一次性全部唤醒。通常来说,notifyAll()更安全。
有些时候,如果我们的代码逻辑考虑不周,用notify()会导致只唤醒了一个线程,而其他线程可能永远等待下去醒不过来了。
注意到wait()方法返回时需要重新获得this锁。
假设当前有3个线程被唤醒,唤醒后,首先要等待执行addTask()的线程结束此方法后,才能释放this锁,
随后,这3个线程中只能有一个获取到this锁,剩下两个将继续等待。
上面的例子完整版
class TaskQueue2 implements Runnable {
@Override
public void run() {
}
Queue<String> queue = new LinkedList<>();
public synchronized void addTask(String s) {
this.queue.add(s);
//生产者生产完毕 提醒消费者消费
this.notify(); //唤醒 使用相同锁 但是在等待的线程
}
public synchronized String getTask() throws InterruptedException {
//空的时候 消费者等待
while (queue.isEmpty()) {
this.wait(); //调用该方法会释放当前锁,等被唤醒后又重新尝试获取锁
System.out.println("==");
}
//消费者消费
return queue.remove();
}
public static void main(String[] args) {
TaskQueue2 task = new TaskQueue2();
//生产者
new Thread(() -> {
//task2.addTask("sss");
task.addTask("abc");
System.out.println(task.queue);
}).start();
//消费者
new Thread(() -> {
try {
task.getTask();
System.out.println(task.queue);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
使用Condition
Condition可以替代wait和notify;
Condition对象必须从Lock对象获取。
使用ReentrantLock比直接使用synchronized更安全,可以替代synchronized进行线程同步。
但是,synchronized可以配合wait和notify实现线程在条件不满足时等待,条件满足时唤醒,用ReentrantLock我们怎么编写wait和notify的功能呢?
答案是使用Condition对象来实现wait和notify的功能。
我们仍然以TaskQueue为例,把前面用synchronized实现的功能通过ReentrantLock和Condition来实现:
class TaskQueue {
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
private Queue<String> queue = new LinkedList<>();
public void addTask(String s) {
lock.lock();
try {
queue.add(s);
condition.signalAll();
} finally {
lock.unlock();
}
}
public String getTask() {
lock.lock();
try {
while (queue.isEmpty()) {
condition.await();
}
return queue.remove();
} finally {
lock.unlock();
}
}
}
可见,使用Condition时,引用的Condition对象必须从Lock实例的newCondition()返回,这样才能获得一个绑定了Lock实例的Condition实例。
Condition提供的await()、signal()、signalAll()原理和synchronized锁对象的wait()、notify()、notifyAll()是一致的,并且其行为也是一样的:
- await()会释放当前锁,进入等待状态;
- signal()会唤醒某个等待线程;
- signalAll()会唤醒所有等待线程;
- 唤醒线程从await()返回后需要重新获得锁。
此外,和tryLock()类似,await()可以在等待指定时间后,如果还没有被其他线程通过signal()或signalAll()唤醒,可以自己醒来:
if (condition.await(1, TimeUnit.SECOND)) {
// 被其他线程唤醒
} else {
// 指定时间内没有被其他线程唤醒
}
新类库中的构件
Semaphore(信号量)
用来控制同时访问特定资源的线程数量,通过协调各个线程以保证合理地使用公共资源。
Semaphore通过使用计数器来控制对共享资源的访问。
允许n个任务同时访问这个资源
如果计数器大于0,则允许访问。 如果为0,则拒绝访问。
计数器所计数的是允许访问共享资源的许可。 因此,要访问资源,必须从信号量中授予线程许可。
API
void acquire():从信号量获取一个许可,如果无可用许可前将一直阻塞等待,
void acquire(int permits) :获取指定数目的许可,如果无可用许可前也将会一直阻塞等待
boolean tryAcquire():从信号量尝试获取一个许可,如果无可用许可,直接返回false,不会阻塞
boolean tryAcquire(int permits): 尝试获取指定数目的许可,如果无可用许可直接返回false
boolean tryAcquire(int permits, long timeout, TimeUnit unit)
在指定的时间内尝试从信号量中获取许可,如果在指定的时间内获取成功,返回true,否则返回false
void release():释放一个许可,别忘了在finally中使用,注意:多次调用该方法,会使信号量的许可数增加,达到动态扩展的效果,如:初始permits为1,调用了两次release,最大许可会改变为2
int availablePermits() 获取当前信号量可用的许可
构造器
public Semaphore(int permits) { //参数permits表示许可数目,即同时可以允许多少线程进行访问
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) { //这个多了一个参数fair表示是否是公平的,即等待时间越久的越先获取许可
sync = (fair)? new FairSync(permits) : new NonfairSync(permits);
}
假若一个工厂有5台机器,但是有8个工人,一台机器同时只能被一个工人使用,只有使用完了,其他工人才能继续使用。
public class Test {
public static void main(String[] args) {
int N = 8; //工人数
Semaphore semaphore = new Semaphore(5); //机器数目
for(int i=0;i<N;i++)
new Worker(i,semaphore).start();
}
static class Worker extends Thread{
private int num;
private Semaphore semaphore;
public Worker(int num,Semaphore semaphore){
this.num = num;
this.semaphore = semaphore;
}
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("工人"+this.num+"占用一个机器在生产...");
Thread.sleep(2000);
System.out.println("工人"+this.num+"释放出机器");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
工人2占用一个机器在生产...
工人4占用一个机器在生产...
工人0占用一个机器在生产...
工人3占用一个机器在生产...
工人1占用一个机器在生产...
工人2释放出机器
工人5占用一个机器在生产...
工人1释放出机器
工人4释放出机器
工人3释放出机器
工人0释放出机器
工人7占用一个机器在生产...
工人6占用一个机器在生产...
工人5释放出机器
工人6释放出机器
工人7释放出机器
CountDownLatch
CountDownLatch是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程的操作执行完后再执行。
CountDownLatch 被设计为只触发一次,计数值不能被重新赋值
CountDownLatch的构造函数接收一个int类型的参数作为计数器,如果你想等待N个点完成,这里就传入N。这里所说的N个 点,可以是N个线程,也可以是1个线程里的N个执行步骤。CountDownLatch构造函数
如下
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
计数器参数count必须大于等于0,等于0的时候,调用await方法时不会 阻塞当前线程。
当我们调用CountDownLatch的countDown()方法时,N就会减1,
CountDownLatch的await()方法 会阻塞当前线程,直到N变成零
//有三个工人在为老板干活,这个老板有一个习惯,就是当三个工人把一天的活都干完了的时候,
// 他就来检查所有工人所干的活。记住这个条件:三个工人先全部干完活,老板才检查。
// 所以在这里用Java代码设计两个类,Worker代表工人,Boss代表老板
public class Worker implements Runnable{
private CountDownLatch countDownLatch;
private final String name;
public Worker(CountDownLatch countDownLatch, String name){
this.countDownLatch =countDownLatch;
this.name =name;
}
@Override
public void run() {
this.doWork();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
System.out.println(this.name+"工作完成了");
this.countDownLatch.countDown();
}
void doWork(){
System.out.println(this.name+" 正在工作!");
}
}
public class Boss implements Runnable{
private CountDownLatch countDownLatch;
public Boss(CountDownLatch countDownLatch){
this.countDownLatch =countDownLatch;
}
@Override
public void run() {
System.out.println("老板正在等待所有工人干完活。。。。");
try {
countDownLatch.await();
} catch (InterruptedException e) {
System.out.println("老板在等待的时候发生了异常");
}
System.out.println("所有工人的活儿干完了,老板开始检查工作。。");
}
}
public class WorkTest {
public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(3);
ExecutorService exec = Executors.newCachedThreadPool();
Worker w1 = new Worker(countDownLatch, "张三");
Worker w2 = new Worker(countDownLatch, "李四");
Worker w3 = new Worker(countDownLatch, "王五");
Boss boss = new Boss(countDownLatch);
exec.execute(w1);
exec.execute(w2);
exec.execute(w3);
exec.execute(boss);
exec.shutdown();
}
}
张三 正在工作!
老板正在等待所有工人干完活。。。。
王五 正在工作!
李四 正在工作!
张三工作完成了
王五工作完成了
李四工作完成了
所有工人的活儿干完了,老板开始检查工作。。
Process finished with exit code 0
CyclicBarrier
CountDownLatch 被设计为只触发一次,但是CyclicBarrier可以
字面意思回环栅栏,通过它可以实现让一组线程等待至某个状态之后再全部同时执行。
叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。
我们暂且把这个状态就叫做barrier,当调用await()字面意思回环栅栏,通过它可以实现让一组线程等待至某个状态之后再全部同时执行。
叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。我们暂且把这个状态就叫做barrier,当调用await()方法之后,线程就处于barrier了。
CyclicBarrier类位于java.util.concurrent包下,CyclicBarrier提供2个构造器:方法之后,线程就处于barrier了。
public CyclicBarrier(int parties, Runnable barrierAction) {
}
public CyclicBarrier(int parties) {
}
参数parties指让多少个线程或者任务等待至barrier状态;
参数barrierAction为当这些线程都达到barrier状态时会执行的内容。
然后CyclicBarrier中最重要的方法就是await方法,它有2个重载版本:
第一个版本比较常用,用来挂起当前线程,直至所有线程都到达barrier状态再同时执行后续任务;
第二个版本是让这些线程等待至一定的时间,如果还有线程没有到达barrier状态就直接让到达barrier的线程执行后续任务。
class Add implements Runnable{
private AtomicInteger atomicInteger;
private CyclicBarrier barrier;
public Add(CyclicBarrier barrier,AtomicInteger atomicInteger){
this.barrier=barrier;
this.atomicInteger=atomicInteger;
}
@Override
public void run() {
try {
while (!Thread.currentThread().isInterrupted()){
System.out.println("===");
atomicInteger.incrementAndGet();
System.out.println(Thread.currentThread().getName()+" ->"+atomicInteger);
barrier.await();
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
class Sum implements Runnable{
private AtomicInteger atomicInteger;
private ExecutorService exec;
public Sum(AtomicInteger atomicInteger,ExecutorService exec){
this.atomicInteger=atomicInteger;
this.exec =exec;
}
@Override
public void run() {
if (atomicInteger.get() ==20){
exec.shutdownNow();
}
}
}
public class CyclicBarrierDemo3 {
public static void main(String[] args) throws InterruptedException {
AtomicInteger atomicInteger = new AtomicInteger();
ExecutorService exec = Executors.newCachedThreadPool();
CyclicBarrier barrier = new CyclicBarrier(2, new Sum(atomicInteger,exec));
for (int i = 0; i < 2; i++) {
exec.execute(new Add(barrier,atomicInteger));
}
TimeUnit.SECONDS.sleep(2);
System.out.println(atomicInteger.get());
}
}
class Horse implements Runnable {
private static int counter = 0;
private final int id = counter++;
private int strides = 0;
private static Random rand = new Random(47);
private static CyclicBarrier barrier;
public Horse(CyclicBarrier b) {
barrier = b;
}
public /*synchronized*/ int getStrides() {
return strides;
}
public void run() {
try {
while (!Thread.interrupted()) {
// synchronized (this) {
strides += rand.nextInt(3); // Produces 0, 1 or 2
// }
//等待直到在此barrie上的所有线程都进入await()
//查看源码可以得到 最后一个调用await()会调用barrier的run()方法
//直到所有的马都完成前进后 所有的马被唤醒进入下一次循环
barrier.await(); //等所有线程结束
}
} catch (InterruptedException e) {
// A legitimate way to exit
} catch (BrokenBarrierException e) {
// This one we want to know about
throw new RuntimeException(e);
}
}
public String toString() {
return "Horse " + id + " ";
}
public String tracks() {
StringBuilder s = new StringBuilder();
for (int i = 0; i < getStrides(); i++)
s.append("*");
s.append(id);
return s.toString();
}
}
public class HorseRace {
static final int FINISH_LINE = 75;
private List<Horse> horses = new ArrayList<Horse>();
private ExecutorService exec =
Executors.newCachedThreadPool();
private CyclicBarrier barrier;
for (int i = 0; i < nHorses; i++) {
Horse horse = new Horse(barrier);
horses.add(horse);
exec.execute(horse);
}
public HorseRace(int nHorses, final int pause) {
//所有的任务持有一个公共的CyclicBarrier,Horses中的任务跑完后,
再去执行CyclicBarrier中的任务
barrier = new CyclicBarrier(nHorses, new Runnable() {
public void run() {
StringBuilder s = new StringBuilder();
for (int i = 0; i < FINISH_LINE; i++)
s.append("="); // The fence on the racetrack
print(s);
for (Horse horse : horses)
print(horse.tracks());
for (Horse horse : horses)
if (horse.getStrides() >= FINISH_LINE) {
print(horse + "won!");
exec.shutdownNow();
return;
}
try {
TimeUnit.MILLISECONDS.sleep(pause);
} catch (InterruptedException e) {
print("barrier-action sleep interrupted");
}
}
});
}
public static void main(String[] args) {
int nHorses = 7;
int pause = 200;
new HorseRace(nHorses, pause);
}
}
DelayQueue
DelayQueue 是一个无界阻塞队列,要添加进去的元素必须实现Delayed接口,
只有在延迟期满时才能从中提取元素。
当生产者线程调用put之类的方法加入元素时,会触发Delayed接口中的compareTo方法进行排序,
也就是说队列中元素的顺序是按到期时间排序的,而非它们进入队列的顺序。
排在队列头部的元素是最早到期的,越往后到期时间赿晚。
该队列的头部 是延迟期满后保存时间最长的 Delayed 元素。
如果延迟都还没有期满,则队列没有头部,并且 poll 将返回 null。
当一个元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一个小于等于 0 的值时,表示该元素到期了。
无法使用 take 或 poll 移除未到期的元素,也不会将这些元素作为正常元素对待。
例如,size 方法同时返回到期和未到期元素的计数。
此队列不允许使用 null 元素。
public class DelayQueueTest {
private static DelayQueue delayQueue = new DelayQueue();
public static void main(String[] args) throws InterruptedException {
new Thread(new Runnable() {
@Override
public void run() {
delayQueue.offer(new MyDelayedTask("task1",10000));
delayQueue.offer(new MyDelayedTask("task2",3900));
delayQueue.offer(new MyDelayedTask("task3",1900));
delayQueue.offer(new MyDelayedTask("task4",5900));
delayQueue.offer(new MyDelayedTask("task5",6900));
delayQueue.offer(new MyDelayedTask("task6",7900));
delayQueue.offer(new MyDelayedTask("task7",4900));
}
}).start();
while (true) {
//take(),检索并删除该队列的头部,如果需要,等待直到该队列上有一个过期延迟的元素可用。
Delayed take = delayQueue.take();
System.out.println(take);
}
}
}
/**
* compareTo 方法必须提供与 getDelay 方法一致的排序
*/
class MyDelayedTask implements Delayed{
private String name ;
private long start = System.currentTimeMillis();
private long time ;
public MyDelayedTask(String name,long time) {
this.name = name;
this.time = time;
}
/**
* 需要实现的接口,获得延迟时间 用过期时间-当前时间
* @param unit
* @return
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert((start+time) - System.currentTimeMillis(),TimeUnit.MILLISECONDS);
}
/**
* 用于延迟队列内部比较排序 当前时间的延迟时间 - 比较对象的延迟时间
* @param o
* @return
*/
@Override
public int compareTo(Delayed o) {
MyDelayedTask o1 = (MyDelayedTask) o;
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
@Override
public String toString() {
return "MyDelayedTask{" +
"name='" + name + '\'' +
", time=" + time +
'}';
}
}
输出:
MyDelayedTask{name='task3', time=1900}
MyDelayedTask{name='task2', time=3900}
MyDelayedTask{name='task7', time=4900}
MyDelayedTask{name='task4', time=5900}
MyDelayedTask{name='task5', time=6900}
MyDelayedTask{name='task6', time=7900}
MyDelayedTask{name='task1', time=10000}
class DelayedTask implements Runnable, Delayed {
private static int counter = 0;
private final int id = counter++;
private final int delta;
private final long trigger;
//存放延时任务
protected static List<DelayedTask> sequence = new ArrayList<DelayedTask>();
public DelayedTask(int delayInMilliseconds) {
delta = delayInMilliseconds;
//设置触发时间
trigger = System.nanoTime() + NANOSECONDS.convert(delta, MILLISECONDS);
sequence.add(this);
}
//获得多久后触发
public long getDelay(TimeUnit unit) {
return unit.convert(trigger - System.nanoTime(), NANOSECONDS);
}
//比较等待时间
public int compareTo(Delayed arg) {
DelayedTask that = (DelayedTask) arg;
if (trigger < that.trigger) return -1;
if (trigger > that.trigger) return 1;
return 0;
}
public void run() {
print(this + " ");
}
public String toString() {
return String.format("[%1$-4d]", delta) + " Task " + id;
}
public String summary() {
return "(" + id + ":" + delta + ")";
}
//末尾哨兵
public static class EndSentinel extends DelayedTask {
private ExecutorService exec;
public EndSentinel(int delay, ExecutorService e) {
super(delay);
exec = e;
}
//因为是哨兵要确保是最后一个执行的 将打印前面所有任务的信息
public void run() {
for (DelayedTask pt : sequence) {
print(pt.summary() + " ");
}
print();
print(this + " Calling shutdownNow()");
exec.shutdownNow();
}
}
}
class DelayedTaskConsumer implements Runnable {
private DelayQueue<DelayedTask> q;
public DelayedTaskConsumer(DelayQueue<DelayedTask> q) {
this.q = q;
}
public void run() {
try {
while (!Thread.interrupted())
//此处必须执行完任务才会去取下一个
q.take().run(); // Run task with the current thread
} catch (InterruptedException e) {
// Acceptable way to exit
}
print("Finished DelayedTaskConsumer");
}
}
public class DelayQueueDemo {
public static void main(String[] args) {
Random rand = new Random(47);
ExecutorService exec = Executors.newCachedThreadPool();
DelayQueue<DelayedTask> queue = new DelayQueue<DelayedTask>();
// Fill with tasks that have random delays:
//放20个随机触发 5秒延时时间
//DelayQueue底层是数组 按照二分查找进行排序
for (int i = 0; i < 20; i++)
queue.put(new DelayedTask(rand.nextInt(5000)));
// Set the stopping point
queue.add(new DelayedTask.EndSentinel(5000, exec));
exec.execute(new DelayedTaskConsumer(queue));
try {
SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
//System.out.println(2278);
//queue.add(new DelayedTask(10));
}
}
此代码需留意哨兵机制,如何安全的确保所有任务执行完毕后关闭线程池
性能调优
优先使用synchronized关键字入手,只有在性能调优时才替换为Lock对象这种做法.