我们使用的是多线程并发
一个进程可以由很多线程
进程有自己的地址空间,不会影响其他进程。但是使用进程作为并发方式有数量和开销的限制。
Java的线程机制是抢占式的,这表示调度机制会周期性地中断线程,将上下文切换到另一个线程,从而为每个线程都提供时间片,使得每个线程都会分配到数量合理的时间去驱动它的任务。
互斥同步
互斥和同步是一种最常见、最主要的实现并发正确性(相对线程安全)的保障手段。
同步:指的是多个线程并发访问共享数据时,保证共享数据在同一个时刻只能被一个线程使用。
互斥:指的是实现同步的一种手段,临界区互斥量和信号量都是主要的互斥实现方式。
在java中,最基本的互斥同步手段就是synchronized关键字。synchronized可以保证线程竞争共享资源的正确性(多个线程并发访问共享数据时,保证共享数据在同一个时刻只能被一个线程使用)。
基本的线程机制
一个线程就是在进程中的一个单一的顺序控制流,因此,单个进程可以拥有多个并发执行的任务,但是你的程序使得每个任务都好像有自己的CPU一样。底层机制是切分CPU时间片,但我们通常不回去考虑它。
定义任务
线程可以驱动任务,因此你需要一种描述任务的方式。
描述任务 只需要 实现Runnable接口并编写它的run()方法,可以让该任务执行你的命令
public class LiftOff implements Runnable {protected int countDown =10;private static int taskDown =0;private final int id = taskDown++;public LiftOff(){}public LiftOff(int countDown){this.countDown=countDown;}public String str() {return "#"+id+" ("+(countDown > 0? countDown :"发射!")+")";}@Overridepublic void run() {while (countDown-->0){System.out.println(str());//对于Thread.yield()的调用是对线程调度器的一种建议。告诉CPU可以将一个线程转移给另一个线程Thread.yield();}}}
从Runnable导出一个类,必须实现run()方法,但是要想实现线程的行为,你必须显示的把一个任务附着到线程上
Thread类
将Runnable对象转变为工作任务的常规方式是把它提交给一个Thread构造器
Thread构造器可以只接受一个Runnable对象.
线程的调度机制是非确定性的,因此含有线程的程序每次运行的结果可能不同.
public class Main {public static void main(String[] args) throws InterruptedException {Thread t = new Thread(new LiftOff);t.start();}}
Thread.start()方法:
使线程开始执行;Java虚拟机调用该线程的run方法。
结果是两个线程并发运行:当前线程(从调用start方法返回)和另一个线程(执行其run方法)。
t Main
使用Executor
Java SE5中的java.util.concurrent包中的执行器(Executor)将为你管理Thread对象从而简化了并发编程.
Executor在客户端和任务执行之间提供了一个间接层;与客户端直接执行任务不同,这个中介对象将执行任务,Executor允许你管理异步任务的执行,而无须显式地管理线程的生命周期.
Executor在Java SE5/6中是启动任务的优选方法.
CachedThreadPool
线程数根据任务动态调整的线程池;
final LiftOff target = new LiftOff();for (int i = 0; i < 3; i++) {// TimeUnit.SECONDS.sleep(1);new Thread(target).start(); //三个线程抢占一个target,会导致问题}// Executors.newCachedThreadPool().execute(new LiftOff());ExecutorService service = Executors.newCachedThreadPool();service.execute(new LiftOff());service.shutdown();
等待所有线程任务执行完成后,再关闭程序。但是并不阻塞main方法的执行。
shutdown()方法可以防止新任务提交给这个Executor,当前线程(上例中的Main线程) 将继续执行shutdown()被调用前提交的所有任务。 这个程序将在Executor中所有任务完成之后尽快的退出。
线程池在程序结束的时候要关闭。
shutdown()方法关闭线程池的时候,它会等待正在执行的任务先完成,然后再关闭。
shutdownNow()会立刻停止正在执行的任务,
awaitTermination()则会等待指定的时间让线程池关闭。
如果我们想把线程池的大小限制在4~10个之间动态调整怎么办?我们查看Executors.newCachedThreadPool()方法的源码:
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}
因此,想创建指定动态范围的线程池,可以这么写:
int min = 4;int max = 10;ExecutorService es = new ThreadPoolExecutor(min, max,60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
FixedThreadPool
- 线程数固定的线程池;
FixedThreadPool使用了有限的线程集来执行所提交的任务.
//Fixed 固定的//你的线程池中的线程是固定的数量 现有的线程会自动被复用ExecutorService executorService = Executors.newFixedThreadPool(5);for (int i = 0; i < 20; i++) {executorService.execute(new LiftOff());}executorService.shutdown();
有了FixedThreadPool,你就可以一次性预先执行代价高昂的线程分配,因而也就可以限制线程的数量了.
通过使用FixedThreadPool使用的Thread对象的数量是有界的.
在任何线程池中,现有线程在可能的情况下,都会被自动复用
CachedThreadPool在程序执行过程中通常会创建与所需数量相同的线程,然后在它回收线程时停止创建新线程.因此它是合理的Executor的首选.
当第一个任务执行完毕时,此时第n个以后任务还没有开创线程,将复用其他的线程.
SingleThreadExecutor
:仅单线程执行的线程池。
SingleThreadExecutor就像是线程数量为1的FixedThreadPool.
如果向SingleThreadExecutor提交多个任务,那么这些任务将排队,每个任务都会在下一个任务开始之前运行结束.所有的任务都将使用相同的线程.
SingleThreadExecutor会序列化所有提交给它的任务,并会维护它自己(隐藏)的悬挂任务队列.
SingleThreadExecutor运行线程,可以确保在任意时刻现场中都只有唯一的任务在运行.
SingleThreadExecutor可以让你省去只是为了维持某些事物的原型而进行的各种协调努力.通过序列化任务,你可以消除对序列化对象的需求.
ScheduledThreadPool
还有一种任务,需要定期反复执行,例如,每秒刷新证券价格。这种任务本身固定,需要反复执行的,可以使用ScheduledThreadPool。放入ScheduledThreadPool的任务可以定期反复执行。
创建一个ScheduledThreadPool仍然是通过Executors类:
ScheduledExecutorService ses = Executors.newScheduledThreadPool(4);
我们可以提交一次性任务,它会在指定延迟后只执行一次:
// 1秒后执行一次性任务:ses.schedule(new Task("one-time"), 1, TimeUnit.SECONDS);
如果任务以固定的每3秒执行,我们可以这样写:
// 2秒后开始执行定时任务,每3秒执行:ses.scheduleAtFixedRate(new Task("fixed-rate"), 2, 3, TimeUnit.SECONDS);
如果任务以固定的3秒为间隔执行,我们可以这样写:
// 2秒后开始执行定时任务,以3秒为间隔执行:ses.scheduleWithFixedDelay(new Task("fixed-delay"), 2, 3, TimeUnit.SECONDS);
Java标准库还提供了一个java.util.Timer类,这个类也可以定期执行任务,但是,一个Timer会对应一个Thread,所以,一个Timer只能定期执行一个任务,多个定时任务必须启动多个Timer,而一个ScheduledThreadPool就可以调度多个定时任务,所以,我们完全可以用ScheduledThreadPool取代旧的Timer。
Callable接口
runnable是执行工作的独立接口,它不返回任何值。如果希望任务完成时可以返一个值,可以使用Callable接口
在JavaSE5 中引入的callable接口是一种带有类型参数的泛型,他的类型参数表示的是从call()方法中返回的值而不是run()方法,并且必须用ExecutorService.submit()调用
class TaskWithResult implements Callable {private int id;public TaskWithResult(int id) {this.id = id;}public String call() throws InterruptedException {/*for (int i = 0; i < 5; i++) {TimeUnit.SECONDS.sleep(1);System.out.println("is run");}*/return "结果是:" + id;}}public class CallableDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService exe = Executors.newCachedThreadPool();final Future submit = exe.submit(new TaskWithResult(1));TimeUnit.MILLISECONDS.sleep(1); //这里加休眠是为了给编译器时间让他知道submbit.isDoneif (submit.isDone()) {System.out.println(submit.get());}exe.shutdown();}结果是 1}
submit()方法会产生Future对象,它用callable返回结果的特定类型进行了参数化。
你可以使用 isDone()方法来查询Future是否已经完成。
当任务完成时它具有一个结果,你可以调用get()方法来获取结果
submit()方法相当于start()方法,前者调用了call(),后者调用了run()
get()方法是为了拿到call()方法中的return
休眠
给定任务中止的时间
对sleep()的调用可以抛出InterruptedException异常,
因为异常不能跨线程传播回main(),所以你必须处理所有在任务内部产生的异常
旧方法thread.sleep();新方法TimeUnit.MILLISECONDS.sleep();
让步
调用yield()方法,就是建议让具有相同优先级的其他线程可以运行。但是没有任何机制会保证它会被采纳
后台线程
所谓后台线程(daemon),是指程序运行的时候在后台提供的一种通用的服务的线程,并且这种线程并不属于程序中不可或缺的部分
当所有的非后台线程结束时,程序也就终止了,同时会杀死进程中所有的后台线程
public class SimpleDaemons implements Runnable{@Overridepublic void run() {/*while (true){System.out.println(Thread.currentThread() + " " + this);}*/try {//下面已经设置为后台线程,这里延迟大于主线程的175延迟的话,就什么也不会输出TimeUnit.MILLISECONDS.sleep(100);System.out.println(Thread.currentThread() + " " + this);} catch (InterruptedException e) {e.printStackTrace();}}public static void main(String[] args) throws InterruptedException {for (int i = 0; i < 4; i++) {final Thread thread = new Thread(new SimpleDaemons());thread.setDaemon(true);thread.start();}System.out.println("所有线程开始");TimeUnit.MILLISECONDS.sleep(175);}}
必须在线程启动之前调用setDaemon()方法,才能把它设置为后台线程
后台线程工厂
public class DaemonThreadFactory implements ThreadFactory {@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r);t.setDaemon(true);return t;}}
它和普通的ThreadFactory区别就是将后台线程状态全部设置为了true。
现在可以用一个DaemonThreadFactory作为参数传递给线程池
public class DaemonFromFactory implements Runnable{@Overridepublic void run() {try {TimeUnit.MILLISECONDS.sleep(100);System.out.println(Thread.currentThread()+" "+this);} catch (InterruptedException e) {e.printStackTrace();}}public static void main(String[] args) throws InterruptedException {ExecutorService exec = Executors.newCachedThreadPool(new DaemonThreadFactory());for (int i = 0; i < 5; i++) {exec.execute(new DaemonFromFactory());}System.out.println("所有后台线程开始");TimeUnit.MILLISECONDS.sleep(500);}}
后台进程不执行finally语句情况下就会终止其run()方法
class ADaemon implements Runnable {@Overridepublic void run() {try {TimeUnit.MILLISECONDS.sleep(100);System.out.println("开始后台线程中。。。");} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("Is this always should running?");}}}public class DaemonsDontRunFinally {public static void main(String[] args) throws InterruptedException {Thread thread = new Thread(new ADaemon());thread.setDaemon(true);thread.start();TimeUnit.MILLISECONDS.sleep(120);}}
编码的变体
1.继承Thread 在构造器中start 即可创建的实例具有线程行为
public class SimpleThread extends Thread {private int countDown = 5;private static int threadCount = 0;public SimpleThread() {//传入线程名字super(Integer.toString(++threadCount));//当调用构造器时,即启动了线程 不用再放到thread中了start();}public String toString() {return "#" + getName() + "(" + countDown + "), ";}public void run() {while (true) {System.out.print(this);if (--countDown == 0)return;}}public static void main(String[] args) {for (int i = 0; i < 5; i++)new SimpleThread();}}
2.采用实现Runnable的方式 保留继承的能力
public class SelfManaged implements Runnable {//实现Runnable接口 内部持有一个线程对象 将自身引用传入 保留继承能力private int countDown = 5;private Thread t = new Thread(this);public SelfManaged() {t.start();}public String toString() {return Thread.currentThread().getName() + "(" + countDown + "), ";}public void run() {while (true) {System.out.print(this);if (--countDown == 0)return;}}public static void main(String[] args) {for (int i = 0; i < 5; i++)new SelfManaged();}}
隐患: 在构造器中启动线程,如果另一个任务可能在构造器结束之前开始执行,这意味着该任务能够访问处于不稳定状态的对象.(即访问未完成初始化操作)
而选择使用Executor不会产生这个隐患.
3.内部类的形式
class ThreadMethod {private int countDown = 5;private Thread t;private String name;public ThreadMethod(String name) {this.name = name;}public void runTask() {if (t == null) {t = new Thread(name) {public void run() {try {while (true) {System.out.println(this);if (--countDown == 0) return;sleep(10);}} catch (InterruptedException e) {System.out.println("sleep() 被打断了");}}public String toString() {return getName() + ": " + countDown;}};t.start();}}}public class InnerDemo {public static void main(String[] args) {ThreadMethod method = new ThreadMethod("a");method.runTask();}}
加入一个线程
一个线程可以在另一个线程上调用join()方法
如果一个线程(A)在另一个线程上(B)调用B.join方法,该线程会被挂起(A),等到B线程结束才恢复
也可以在调用join()时加入一个超时参数,这样如果目标线程在这段时间还没结束,join()方法总能返回
class Sleeper extends Thread {private int duration;public Sleeper(String name, int sleepTime) {super(name);duration = sleepTime;start();}public void run() {try {sleep(duration);} catch (InterruptedException e) {System.out.println(getName()+"被打断了吗?-->"+interrupted());}System.out.println(getName()+"现在唤醒了");}}class Joiner extends Thread {private Sleeper sleeper;public Joiner(String name, Sleeper sleeper) {super(name);this.sleeper = sleeper;start();}public void run() {try {sleeper.join();} catch (InterruptedException e) {print("Interrupted");}print(getName() + " 加入已经完成");}}public class Joining {public static void main(String[] args) {//Sleeper sleepy = new Sleeper("Sleepy", 1500);//Joiner dopey = new Joiner("Dopey", sleepy);Sleeper grumpy = new Sleeper("Grumpy", 1500);Joiner doc = new Joiner("Doc", grumpy);grumpy.interrupt();}}
捕获异常
由于线程的本质特性,使得你不能捕获从线程中逃逸的异常.
class ExceptionThread implements Runnable {public void run() {//此处没有用try-catch包裹 异常将逃出run()方法 其他地方无法捕获throw new RuntimeException();}}public class NaiveExceptionHandling {public static void main(String[] args) {try {ExecutorService exec = Executors.newCachedThreadPool();exec.execute(new ExceptionThread());} catch (RuntimeException ue) {//捕获语句不会执行 因为异常直接传播到控制台System.out.println("Exception has been handled!");}}}
Thread.UncaughtExceptionHandler是JavaSE 5 中的新接口,它允许你在每个Thread对象上都附着一个异常处理器.
Thread.UncaughtExceptionHandle.uncaughtException()会在线程因未捕获的异常而临近死亡时被调用.
为了调用uncaughtException(),我们创建了一个新的ThreadFactory,为每个新创建的Thread对象上附着一个
Thread.UncaughtExceptionHandler
class ExceptionThread implements Runnable{@Overridepublic void run() {final Thread t = Thread.currentThread();System.out.println("当前线程是"+t+" "+"en = "+t.getUncaughtExceptionHandler());System.out.println("---------下面抛出异常---------");throw new RuntimeException();}}class MyUncaughtException implements Thread.UncaughtExceptionHandler{// run()方法未捕获的异常在此处捕获@Overridepublic void uncaughtException(Thread t, Throwable e) {System.out.println("捕获的异常为"+e);}}class HandlerThreadFactory implements ThreadFactory{//这个类的作用就是调用 Thread.UncaughtExceptionHandler.uncaughtException方法@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r);//System.out.println("创建了线程:" + t);//设置当此线程因未捕获的异常而突然终止时调用的处理程序//参数:这个对象用作这个线程的未捕获异常处理程序。如果为空,则此线程没有显式处理程序。t.setUncaughtExceptionHandler(new MyUncaughtException());System.out.println("en--> "+t.getUncaughtExceptionHandler());return t;}}public class CaptureUncaught {public static void main(String[] args) {ExecutorService exec = Executors.newCachedThreadPool(new HandlerThreadFactory());exec.execute(new ExceptionThread());}}
/*想要捕获线程中的异常,需要实现Thread.UncaughtExceptionHandler接口,接口中的uncaughtException可以捕获异常为了调用uncaughtException,我们要在ThreadFactory接口中 设置捕获异常的处理程序,同时还可以get到该异常处理程序为什么要用ThreadFactory接口? 因为创建线程池的构造器中可以传入该接口的实现类* */class MyException implements Thread.UncaughtExceptionHandler{@Overridepublic void uncaughtException(Thread t, Throwable e) {System.out.println("被捕获的异常: "+e);}}class Factory implements ThreadFactory{@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setUncaughtExceptionHandler(new MyException());//问题 为什么会执行两遍?System.out.println(thread.getUncaughtExceptionHandler());return thread;}}public class CaughtException {public static void main(String[] args) {class Exception implements Runnable{@Overridepublic void run() {Thread t = Thread.currentThread();System.out.println("当前线程是"+t);throw new RuntimeException();}}ExecutorService exec = Executors.newCachedThreadPool(new Factory());exec.execute(new Exception());/* //下面的代码没有设置异常捕获器Thread thread = new Thread(new Exception());thread.start();*/}}
共享受限的资源
可重入锁
可重入锁就是一个线程中可以多次获得同一把锁
即: 一个线程在执行一个带锁的方法,该方法中又调用了另一个需要相同锁的方法,
则该线程可以直接执行调用的方法,而无需重新获得锁;
在java中 ReentrantLock 和 synchronized都是可重入锁
public class ReentrantLockDemo {//创建一个可重入锁private Lock lock = new ReentrantLock();public synchronized void method1(){System.out.println(Thread.currentThread().getName()+"执行了method1");method2();}public synchronized void method2(){System.out.println(Thread.currentThread().getName()+"执行了method2");}public void method3() {try {lock.tryLock(3,TimeUnit.SECONDS);} catch (InterruptedException e) {System.out.println(e);;}try {System.out.println(Thread.currentThread().getName()+"执行了method3");//因为在一个线程中 方法4 和方法3 使用的锁是一样的method4();}finally {lock.unlock();}}public void method4(){lock.lock();try {System.out.println(Thread.currentThread().getName()+"执行了method4");}finally {lock.unlock();}}public static void main(String[] args) {ReentrantLockDemo lockDemo = new ReentrantLockDemo();new Thread(()->{lockDemo.method1();},"t1").start();//方法构造器没有参数可以这样写new Thread(lockDemo::method3,"t2").start();/*和上面写法效果一样new Thread(new Runnable() {@Overridepublic void run() {lockDemo.method1();}},"t1").start();*/}}t1执行了method1t1执行了method2t2执行了method3t2执行了method4
使用显式的Lock对象
1.Lock对象相当于一个对象锁,但可以控制获得锁和释放锁的位置
private Lock lock = new ReentrantLock();public int next() {lock.lock();try {++currentEvenValue;Thread.yield(); // Cause failure faster++currentEvenValue;return currentEvenValue;} finally {lock.unlock();}}
必须使用try-finally语句确保锁的释放
2.ReentrantLock 允许尝试获取锁但最终未获取锁
这样如果其他人已经获取了这个锁,那么你就可以决定离开去执行其他一些事情,而不是等待直至这个锁被释放.
private ReentrantLock lock = new ReentrantLock();public void untimed() {//tryLock()进在锁没有被其他线程持有时获取锁boolean captured = lock.tryLock();try {System.out.println("tryLock(): " + captured);} finally {if (captured)lock.unlock();}}public void timed() {boolean captured = false;try {//尝试获取锁的时间 超过抛错captured = lock.tryLock(2, TimeUnit.SECONDS);} catch (InterruptedException e) {throw new RuntimeException(e);}try {System.out.println("tryLock(2, TimeUnit.SECONDS): " +captured);} finally {if (captured)lock.unlock();}}
显式的Lock对象在加锁和释放锁方面,相对于内建的synchronized锁来说,还赋予了更细粒度的控制力.
小结
ReentrantLock可以替代synchronized进行同步;
ReentrantLock获取锁更安全;线程在tryLock()失败的时候不会导致死锁。
必须先获取到锁,再进入try {…}代码块,最后使用finally保证释放锁;
可以使用tryLock()尝试获取锁。
原子类
对于常规编程来说,他们很少会用,但是在涉及性能调优的时候,他们会有大用场
原子操作 不存在中间状态,要么 发生 要么没有发生.
原子性可以应用于除long和double之外的所有基本类型之上的”简单操作”.
JVM可以将64位(long和double变量)的读取和写入当作两个分离的32位操作来执行.这就产生了在一个读取和写入操作中间发生上下文切换,从而导致不同的任务可以看到不正确结果的可能性.(字撕裂)
如果使用volatile关键字,就会获得(简单的赋值与返回操作的)原子性.
volatile关键字还确保了应用中的可视性.如果你将一个域声明为volatile的,那么只要对这个域产生了写操作,那么所有的读操作都可以看到这个修改.即便使用了本地缓存,volatile域会被立即被写入到主存中,而读取操作就发生在主存中.
javaSE5 引入了像 AtomicInteger、 AtomicLong 、AtomicReference 等特殊原子类的变量类
他们提供下面形式的原子性条件更新操作
如果当前值==预期值,则自动将该值设置为给定的更新值。参数: 期望-期望值Update -新值返回:如果成功。False return表示实际值与期望值不相等public final boolean compareAndSet(int expect, int update) {return unsafe.compareAndSwapInt(this, valueOffset, expect, update);}
这些类被调整为可以使用在某些现代处理器上的可获得的,并且是在机器级别上的原子性
例如:用AtomicInteger 来重写 AtomicityTest
public class AtomcityTest1 implements Runnable {private int i =0;public int getValue(){return i;}private synchronized void evenIncrement(){++i;++i;}@Overridepublic void run() {while (true) {evenIncrement();}}public static void main(String[] args) {ExecutorService exec = Executors.newCachedThreadPool();AtomcityTest1 at = new AtomcityTest1();exec.execute(at);while (true){int value = at.getValue();if (value%2 != 0){System.out.println(value);System.exit(0);}}}}会输出一个 奇数
//利用原子性操作改进代码public class AtomcityTest1 implements Runnable{private AtomicInteger i = new AtomicInteger(0);public int getValue() {return i.get();}private void evenIncrement(){i.addAndGet(2);}@Overridepublic void run() {while (true){evenIncrement();}}public static void main(String[] args) {ExecutorService exec = Executors.newCachedThreadPool();AtomcityTest1 at = new AtomcityTest1();exec.execute(at);while (true){int value = at.getValue();if (value%2 !=0){System.out.println(value+"不是偶数");System.exit(0);}}}}不会输出任何奇数,因为是原子性的
volatile
在Java虚拟机中,变量的值保存在主内存中,但是,当线程访问变量时,它会先获取一个副本,并保存在自己的工作内存中。如果线程修改了变量的值,虚拟机会在某个时刻把修改后的值回写到主内存,但是,这个时间是不确定的!
这会导致如果一个线程更新了某个变量,另一个线程读取的值可能还是更新前的。
例如,主内存的变量a = true,线程1执行a = false时,它在此刻仅仅是把变量a的副本变成了false,
主内存的变量a还是true,在JVM把修改后的a回写到主内存之前,
其他线程读取到的a的值仍然是true,这就造成了多线程之间共享的变量不一致。
因此,volatile关键字的目的是告诉虚拟机:
- 每次访问变量时,总是获取主内存的最新值;
- 每次修改变量后,立刻回写到主内存。
volatile关键字解决的是可见性问题:当一个线程修改了某个共享变量的值,其他线程能够立刻看到修改后的值。
synchronized
在java中,最基本的互斥同步手段就是synchronized关键字。
synchronized可以保证线程竞争共享资源的正确性
(多个线程并发访问共享数据时,保证共享数据在同一个时刻只能被一个线程使用)。
synchronized特性
原子性:确保线程互斥的访问同步代码。synchronized保证只有一个线程拿到锁,进入同步代码块操作共享资源,因此具有原子性。
可见性:保证共享变量的修改能够及时课件。执行synchronized时,会对应执行lock,unlock原子操作。lock操作,就会清空工作空间该变量的值;执行unlock操作之前,必须先把变量同步回主内存中。
有序性:synchronized内的代码和外部的代码禁止排序,至于内部的代码,则不会禁止排序,但是由于只有一个线程进入同步代码块,因此在同步代码块中相当于是单线程的,根据as-if-serial语义,即使代码块内部发生了重排序,也不会影响程序执行的结果。
悲观锁:synchronized是悲观锁,每次使用共享资源时都认为会和其他线程产生竞争,所以每次使用共享资源都会上锁。
独占锁:synchronized是独占锁,该锁一次只能被一个线程所持有,其他线程被阻塞。
非公平锁:synchronized是非公平锁,线程获取锁的顺序可以不按照线程的阻塞顺序,允许线程发出请求后立即尝试获取锁。
可重入锁:synchronized是可重入锁。持锁线程可以再次获取自己的内部的锁。
悲观锁 or 乐观锁:是否一定要锁。
共享锁 or 独占锁(排他锁):是否可以有多个线程同时拿锁。
公平锁 or 非公平锁:是否按阻塞顺序拿锁。
可重入锁 or 不可重入锁: 拿锁线程是否可以多次拿锁。
实例锁
作用于给当前实例加锁,进入同步代码前要获得当前实例的锁。
(注意:是当前实例,一个类有N个实例,一个实例有一把锁)。
Java以提供关键字synchronized的形式,为防止资源冲突提供了内置支持.当任务要执行被synchronized关键字保护的代码片段的时候,它将检查锁是否可用,然后获取锁,执行代码,释放锁.
所有的对象都自动含有单一的锁(也称为监视器).当在对象上调用其任意synchronized方法的时候,此对象都被加锁.这时该对象的其他synchronized方法只有等到前一个方法调用完毕并释放了锁之后才能被调用.
所以使用对象锁的情况,只有使用同一实例的线程才会受锁的影响,
多个实例调用同一方法也不会受影响。
在其他对象上的同步:
两个任务可以同时进入一个对象,只要这个对象上的方法是在不同的锁上同步的即可。
(即多个实例调用同一方法也不会受影响。)
(下面例子中的 f() 方法 和 m() 方法 )
class InsLock {private Object insLock = new Object(); //成员变量,方法中的每个实例都拥有//实例锁/*synchronized*/ private void f() {synchronized (this) { //和synchronized private void f()等价try {System.out.println("当前线程为:" + Thread.currentThread().getName() + " " +System.currentTimeMillis() + "执行f()方法");TimeUnit.SECONDS.sleep(2);System.out.println("当前线程为:" + Thread.currentThread().getName() + " " +System.currentTimeMillis() + "离开f()方法");} catch (InterruptedException e) {e.printStackTrace();}}}//对象锁//同一个实例 中m()方法 和 f()方法可以同时执行,不会出现争抢资源//因为m方法 被对象锁 锁住 , f方法被实例锁 锁住,它们不是同一把锁private void m() {synchronized (insLock) { //和synchronized (this.insLock)等价try {System.out.println("当前线程为:" + Thread.currentThread().getName() + " " +System.currentTimeMillis() + "执行m()方法");TimeUnit.SECONDS.sleep(2);System.out.println("当前线程为:" + Thread.currentThread().getName() + " " +System.currentTimeMillis() + "离开m()方法");} catch (InterruptedException e) {e.printStackTrace();}}}synchronized private void g() {try {System.out.println("当前线程为:" + Thread.currentThread().getName() +" "+System.currentTimeMillis() + "执行g()方法");TimeUnit.SECONDS.sleep(2);System.out.println("当前线程为:" + Thread.currentThread().getName() +" "+System.currentTimeMillis() + "离开g()方法");} catch (InterruptedException e) {e.printStackTrace();}}public static void main(String[] args) {InsLock str = new InsLock();InsLock str2 = new InsLock();new Thread(str::f).start();new Thread(str2::g).start();//换成类锁/*new Thread(()->{f();},"t1").start();new Thread(()->{g();},"t2").start();*/}}
直接加在方法上
class InLock{synchronized private void f(){}}
作用于代码块
private void f(){synchronized (this) {//方法}}
3.作用于静态方法
表示找类锁,类锁永远只有一把,就算创建了100个对象,那类锁也只有一把。
对象锁:一个对象一把锁,
类锁:100个对象,也是一把锁。
class InLock{synchronized static private void f(){}}
类锁的实现
1.通过对Class对象上锁
class InLock{private void f(){synchronized (InsLock.class) {//方法}}}
2.通过static对象锁
同 实例锁 作用于static方法上
死锁
一个线程可以获取一个锁后,再继续获取另一个锁。例如:
public void add(int m) {synchronized(lockA) { // 获得lockA的锁this.value += m;synchronized(lockB) { // 获得lockB的锁this.another += m;} // 释放lockB的锁} // 释放lockA的锁}public void dec(int m) {synchronized(lockB) { // 获得lockB的锁this.another -= m;synchronized(lockA) { // 获得lockA的锁this.value -= m;} // 释放lockA的锁} // 释放lockB的锁}
在获取多个锁的时候,不同线程获取多个不同对象的锁可能导致死锁。
对于上述代码,线程1和线程2如果分别执行add()和dec()方法时:
- 线程1:进入add(),获得lockA;
- 线程2:进入dec(),获得lockB。
随后:
- 线程1:准备获得lockB,失败,等待中;
- 线程2:准备获得lockA,失败,等待中。
此时,两个线程各自持有不同的锁,然后各自试图获取对方手里的锁,造成了双方无限等待下去,这就是死锁。
死锁发生后,没有任何机制能解除死锁,只能强制结束JVM进程。
那么我们应该如何避免死锁呢?答案是:线程获取锁的顺序要一致。即严格按照先获取lockA,再获取lockB的顺序,改写dec()方法如下:
public void dec(int m) {synchronized(lockA) { // 获得lockA的锁this.value -= m;synchronized(lockB) { // 获得lockB的锁this.another -= m;} // 释放lockB的锁} // 释放lockA的锁}
线程之间的协作
使用wait和notify
synchronized并没有解决多线程协调的问题。例如
class TaskQueue {Queue<String> queue = new LinkedList<>();public synchronized void addTask(String s) {this.queue.add(s);}public synchronized String getTask() {while (queue.isEmpty()) {}return queue.remove();}}
上述代码看上去没有问题:getTask()内部先判断队列是否为空,如果为空,就循环等待,直到另一个线程往队列中放入了一个任务,while()循环退出,就可以返回队列的元素了。
但实际上while()循环永远不会退出。因为线程在执行while()循环时,已经在getTask()入口获取了this锁,其他线程根本无法调用addTask(),因为addTask()执行条件也是获取this锁。
因此,执行上述代码,线程会在getTask()中因为死循环而100%占用CPU资源。
我们想要的执行效果是:
- 线程1可以调用addTask()不断往队列中添加任务;
- 线程2可以调用getTask()从队列中获取任务。如果队列为空,则getTask()应该等待,直到队列中至少有一个任务时再返回。
多线程协调运行的原则就是:当条件不满足时,线程进入等待状态;
当条件满足时,线程被唤醒,继续执行任务。
wait()方法的执行机制非常复杂。
首先,它不是一个普通的Java方法,而是定义在Object类的一个native方法,也就是由JVM的C代码实现的。
其次,必须在synchronized块中才能调用wait()方法,因为wait()方法调用时,会释放线程获得的锁,wait()方法返回后,线程又会重新试图获得锁。
在wait()期间对象的锁是释放的
可以通过notify() 、notifyAll()、或者令时间到期,从wait()中恢复执行
改进上面的代码
public synchronized String getTask() {while (queue.isEmpty()) {// 释放this锁:this.wait();// 重新获取this锁}return queue.remove();}
public synchronized void addTask(String s) {this.queue.add(s);this.notify(); // 唤醒在this锁等待的线程}
注意到在往队列中添加了任务后,线程立刻对this锁对象调用notify()方法,
这个方法会唤醒一个正在this锁等待的线程(就是在getTask()中位于this.wait()的线程),
从而使得等待线程从this.wait()方法返回。
notify()只会唤醒其中一个(具体哪个依赖操作系统,有一定的随机性)。
这是因为可能有多个线程正在getTask()方法内部的wait()中等待,
使用notifyAll()将一次性全部唤醒。通常来说,notifyAll()更安全。
有些时候,如果我们的代码逻辑考虑不周,用notify()会导致只唤醒了一个线程,而其他线程可能永远等待下去醒不过来了。
注意到wait()方法返回时需要重新获得this锁。
假设当前有3个线程被唤醒,唤醒后,首先要等待执行addTask()的线程结束此方法后,才能释放this锁,
随后,这3个线程中只能有一个获取到this锁,剩下两个将继续等待。
上面的例子完整版
class TaskQueue2 implements Runnable {@Overridepublic void run() {}Queue<String> queue = new LinkedList<>();public synchronized void addTask(String s) {this.queue.add(s);//生产者生产完毕 提醒消费者消费this.notify(); //唤醒 使用相同锁 但是在等待的线程}public synchronized String getTask() throws InterruptedException {//空的时候 消费者等待while (queue.isEmpty()) {this.wait(); //调用该方法会释放当前锁,等被唤醒后又重新尝试获取锁System.out.println("==");}//消费者消费return queue.remove();}public static void main(String[] args) {TaskQueue2 task = new TaskQueue2();//生产者new Thread(() -> {//task2.addTask("sss");task.addTask("abc");System.out.println(task.queue);}).start();//消费者new Thread(() -> {try {task.getTask();System.out.println(task.queue);} catch (InterruptedException e) {e.printStackTrace();}}).start();}}
使用Condition
Condition可以替代wait和notify;
Condition对象必须从Lock对象获取。
使用ReentrantLock比直接使用synchronized更安全,可以替代synchronized进行线程同步。
但是,synchronized可以配合wait和notify实现线程在条件不满足时等待,条件满足时唤醒,用ReentrantLock我们怎么编写wait和notify的功能呢?
答案是使用Condition对象来实现wait和notify的功能。
我们仍然以TaskQueue为例,把前面用synchronized实现的功能通过ReentrantLock和Condition来实现:
class TaskQueue {private final Lock lock = new ReentrantLock();private final Condition condition = lock.newCondition();private Queue<String> queue = new LinkedList<>();public void addTask(String s) {lock.lock();try {queue.add(s);condition.signalAll();} finally {lock.unlock();}}public String getTask() {lock.lock();try {while (queue.isEmpty()) {condition.await();}return queue.remove();} finally {lock.unlock();}}}
可见,使用Condition时,引用的Condition对象必须从Lock实例的newCondition()返回,这样才能获得一个绑定了Lock实例的Condition实例。
Condition提供的await()、signal()、signalAll()原理和synchronized锁对象的wait()、notify()、notifyAll()是一致的,并且其行为也是一样的:
- await()会释放当前锁,进入等待状态;
- signal()会唤醒某个等待线程;
- signalAll()会唤醒所有等待线程;
- 唤醒线程从await()返回后需要重新获得锁。
此外,和tryLock()类似,await()可以在等待指定时间后,如果还没有被其他线程通过signal()或signalAll()唤醒,可以自己醒来:
if (condition.await(1, TimeUnit.SECOND)) {// 被其他线程唤醒} else {// 指定时间内没有被其他线程唤醒}
新类库中的构件
Semaphore(信号量)
用来控制同时访问特定资源的线程数量,通过协调各个线程以保证合理地使用公共资源。
Semaphore通过使用计数器来控制对共享资源的访问。
允许n个任务同时访问这个资源
如果计数器大于0,则允许访问。 如果为0,则拒绝访问。
计数器所计数的是允许访问共享资源的许可。 因此,要访问资源,必须从信号量中授予线程许可。
API
void acquire():从信号量获取一个许可,如果无可用许可前将一直阻塞等待,
void acquire(int permits) :获取指定数目的许可,如果无可用许可前也将会一直阻塞等待
boolean tryAcquire():从信号量尝试获取一个许可,如果无可用许可,直接返回false,不会阻塞
boolean tryAcquire(int permits): 尝试获取指定数目的许可,如果无可用许可直接返回false
boolean tryAcquire(int permits, long timeout, TimeUnit unit)
在指定的时间内尝试从信号量中获取许可,如果在指定的时间内获取成功,返回true,否则返回false
void release():释放一个许可,别忘了在finally中使用,注意:多次调用该方法,会使信号量的许可数增加,达到动态扩展的效果,如:初始permits为1,调用了两次release,最大许可会改变为2
int availablePermits() 获取当前信号量可用的许可
构造器
public Semaphore(int permits) { //参数permits表示许可数目,即同时可以允许多少线程进行访问sync = new NonfairSync(permits);}public Semaphore(int permits, boolean fair) { //这个多了一个参数fair表示是否是公平的,即等待时间越久的越先获取许可sync = (fair)? new FairSync(permits) : new NonfairSync(permits);}
假若一个工厂有5台机器,但是有8个工人,一台机器同时只能被一个工人使用,只有使用完了,其他工人才能继续使用。
public class Test {public static void main(String[] args) {int N = 8; //工人数Semaphore semaphore = new Semaphore(5); //机器数目for(int i=0;i<N;i++)new Worker(i,semaphore).start();}static class Worker extends Thread{private int num;private Semaphore semaphore;public Worker(int num,Semaphore semaphore){this.num = num;this.semaphore = semaphore;}@Overridepublic void run() {try {semaphore.acquire();System.out.println("工人"+this.num+"占用一个机器在生产...");Thread.sleep(2000);System.out.println("工人"+this.num+"释放出机器");semaphore.release();} catch (InterruptedException e) {e.printStackTrace();}}}}工人2占用一个机器在生产...工人4占用一个机器在生产...工人0占用一个机器在生产...工人3占用一个机器在生产...工人1占用一个机器在生产...工人2释放出机器工人5占用一个机器在生产...工人1释放出机器工人4释放出机器工人3释放出机器工人0释放出机器工人7占用一个机器在生产...工人6占用一个机器在生产...工人5释放出机器工人6释放出机器工人7释放出机器
CountDownLatch
CountDownLatch是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程的操作执行完后再执行。
CountDownLatch 被设计为只触发一次,计数值不能被重新赋值
CountDownLatch的构造函数接收一个int类型的参数作为计数器,如果你想等待N个点完成,这里就传入N。这里所说的N个 点,可以是N个线程,也可以是1个线程里的N个执行步骤。CountDownLatch构造函数
如下
public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");this.sync = new Sync(count);}
计数器参数count必须大于等于0,等于0的时候,调用await方法时不会 阻塞当前线程。
当我们调用CountDownLatch的countDown()方法时,N就会减1,
CountDownLatch的await()方法 会阻塞当前线程,直到N变成零
//有三个工人在为老板干活,这个老板有一个习惯,就是当三个工人把一天的活都干完了的时候,// 他就来检查所有工人所干的活。记住这个条件:三个工人先全部干完活,老板才检查。// 所以在这里用Java代码设计两个类,Worker代表工人,Boss代表老板public class Worker implements Runnable{private CountDownLatch countDownLatch;private final String name;public Worker(CountDownLatch countDownLatch, String name){this.countDownLatch =countDownLatch;this.name =name;}@Overridepublic void run() {this.doWork();try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {}System.out.println(this.name+"工作完成了");this.countDownLatch.countDown();}void doWork(){System.out.println(this.name+" 正在工作!");}}
public class Boss implements Runnable{private CountDownLatch countDownLatch;public Boss(CountDownLatch countDownLatch){this.countDownLatch =countDownLatch;}@Overridepublic void run() {System.out.println("老板正在等待所有工人干完活。。。。");try {countDownLatch.await();} catch (InterruptedException e) {System.out.println("老板在等待的时候发生了异常");}System.out.println("所有工人的活儿干完了,老板开始检查工作。。");}}
public class WorkTest {public static void main(String[] args) {CountDownLatch countDownLatch = new CountDownLatch(3);ExecutorService exec = Executors.newCachedThreadPool();Worker w1 = new Worker(countDownLatch, "张三");Worker w2 = new Worker(countDownLatch, "李四");Worker w3 = new Worker(countDownLatch, "王五");Boss boss = new Boss(countDownLatch);exec.execute(w1);exec.execute(w2);exec.execute(w3);exec.execute(boss);exec.shutdown();}}
张三 正在工作!老板正在等待所有工人干完活。。。。王五 正在工作!李四 正在工作!张三工作完成了王五工作完成了李四工作完成了所有工人的活儿干完了,老板开始检查工作。。Process finished with exit code 0
CyclicBarrier
CountDownLatch 被设计为只触发一次,但是CyclicBarrier可以
字面意思回环栅栏,通过它可以实现让一组线程等待至某个状态之后再全部同时执行。
叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。
我们暂且把这个状态就叫做barrier,当调用await()字面意思回环栅栏,通过它可以实现让一组线程等待至某个状态之后再全部同时执行。
叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。我们暂且把这个状态就叫做barrier,当调用await()方法之后,线程就处于barrier了。
CyclicBarrier类位于java.util.concurrent包下,CyclicBarrier提供2个构造器:方法之后,线程就处于barrier了。
public CyclicBarrier(int parties, Runnable barrierAction) {}public CyclicBarrier(int parties) {}

参数parties指让多少个线程或者任务等待至barrier状态;
参数barrierAction为当这些线程都达到barrier状态时会执行的内容。
然后CyclicBarrier中最重要的方法就是await方法,它有2个重载版本:
第一个版本比较常用,用来挂起当前线程,直至所有线程都到达barrier状态再同时执行后续任务;
第二个版本是让这些线程等待至一定的时间,如果还有线程没有到达barrier状态就直接让到达barrier的线程执行后续任务。
class Add implements Runnable{private AtomicInteger atomicInteger;private CyclicBarrier barrier;public Add(CyclicBarrier barrier,AtomicInteger atomicInteger){this.barrier=barrier;this.atomicInteger=atomicInteger;}@Overridepublic void run() {try {while (!Thread.currentThread().isInterrupted()){System.out.println("===");atomicInteger.incrementAndGet();System.out.println(Thread.currentThread().getName()+" ->"+atomicInteger);barrier.await();}} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}}class Sum implements Runnable{private AtomicInteger atomicInteger;private ExecutorService exec;public Sum(AtomicInteger atomicInteger,ExecutorService exec){this.atomicInteger=atomicInteger;this.exec =exec;}@Overridepublic void run() {if (atomicInteger.get() ==20){exec.shutdownNow();}}}public class CyclicBarrierDemo3 {public static void main(String[] args) throws InterruptedException {AtomicInteger atomicInteger = new AtomicInteger();ExecutorService exec = Executors.newCachedThreadPool();CyclicBarrier barrier = new CyclicBarrier(2, new Sum(atomicInteger,exec));for (int i = 0; i < 2; i++) {exec.execute(new Add(barrier,atomicInteger));}TimeUnit.SECONDS.sleep(2);System.out.println(atomicInteger.get());}}
class Horse implements Runnable {private static int counter = 0;private final int id = counter++;private int strides = 0;private static Random rand = new Random(47);private static CyclicBarrier barrier;public Horse(CyclicBarrier b) {barrier = b;}public /*synchronized*/ int getStrides() {return strides;}public void run() {try {while (!Thread.interrupted()) {// synchronized (this) {strides += rand.nextInt(3); // Produces 0, 1 or 2// }//等待直到在此barrie上的所有线程都进入await()//查看源码可以得到 最后一个调用await()会调用barrier的run()方法//直到所有的马都完成前进后 所有的马被唤醒进入下一次循环barrier.await(); //等所有线程结束}} catch (InterruptedException e) {// A legitimate way to exit} catch (BrokenBarrierException e) {// This one we want to know aboutthrow new RuntimeException(e);}}public String toString() {return "Horse " + id + " ";}public String tracks() {StringBuilder s = new StringBuilder();for (int i = 0; i < getStrides(); i++)s.append("*");s.append(id);return s.toString();}}public class HorseRace {static final int FINISH_LINE = 75;private List<Horse> horses = new ArrayList<Horse>();private ExecutorService exec =Executors.newCachedThreadPool();private CyclicBarrier barrier;for (int i = 0; i < nHorses; i++) {Horse horse = new Horse(barrier);horses.add(horse);exec.execute(horse);}public HorseRace(int nHorses, final int pause) {//所有的任务持有一个公共的CyclicBarrier,Horses中的任务跑完后,再去执行CyclicBarrier中的任务barrier = new CyclicBarrier(nHorses, new Runnable() {public void run() {StringBuilder s = new StringBuilder();for (int i = 0; i < FINISH_LINE; i++)s.append("="); // The fence on the racetrackprint(s);for (Horse horse : horses)print(horse.tracks());for (Horse horse : horses)if (horse.getStrides() >= FINISH_LINE) {print(horse + "won!");exec.shutdownNow();return;}try {TimeUnit.MILLISECONDS.sleep(pause);} catch (InterruptedException e) {print("barrier-action sleep interrupted");}}});}public static void main(String[] args) {int nHorses = 7;int pause = 200;new HorseRace(nHorses, pause);}}
DelayQueue
DelayQueue 是一个无界阻塞队列,要添加进去的元素必须实现Delayed接口,
只有在延迟期满时才能从中提取元素。
当生产者线程调用put之类的方法加入元素时,会触发Delayed接口中的compareTo方法进行排序,
也就是说队列中元素的顺序是按到期时间排序的,而非它们进入队列的顺序。
排在队列头部的元素是最早到期的,越往后到期时间赿晚。
该队列的头部 是延迟期满后保存时间最长的 Delayed 元素。
如果延迟都还没有期满,则队列没有头部,并且 poll 将返回 null。
当一个元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一个小于等于 0 的值时,表示该元素到期了。
无法使用 take 或 poll 移除未到期的元素,也不会将这些元素作为正常元素对待。
例如,size 方法同时返回到期和未到期元素的计数。
此队列不允许使用 null 元素。
public class DelayQueueTest {private static DelayQueue delayQueue = new DelayQueue();public static void main(String[] args) throws InterruptedException {new Thread(new Runnable() {@Overridepublic void run() {delayQueue.offer(new MyDelayedTask("task1",10000));delayQueue.offer(new MyDelayedTask("task2",3900));delayQueue.offer(new MyDelayedTask("task3",1900));delayQueue.offer(new MyDelayedTask("task4",5900));delayQueue.offer(new MyDelayedTask("task5",6900));delayQueue.offer(new MyDelayedTask("task6",7900));delayQueue.offer(new MyDelayedTask("task7",4900));}}).start();while (true) {//take(),检索并删除该队列的头部,如果需要,等待直到该队列上有一个过期延迟的元素可用。Delayed take = delayQueue.take();System.out.println(take);}}}/*** compareTo 方法必须提供与 getDelay 方法一致的排序*/class MyDelayedTask implements Delayed{private String name ;private long start = System.currentTimeMillis();private long time ;public MyDelayedTask(String name,long time) {this.name = name;this.time = time;}/*** 需要实现的接口,获得延迟时间 用过期时间-当前时间* @param unit* @return*/@Overridepublic long getDelay(TimeUnit unit) {return unit.convert((start+time) - System.currentTimeMillis(),TimeUnit.MILLISECONDS);}/*** 用于延迟队列内部比较排序 当前时间的延迟时间 - 比较对象的延迟时间* @param o* @return*/@Overridepublic int compareTo(Delayed o) {MyDelayedTask o1 = (MyDelayedTask) o;return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));}@Overridepublic String toString() {return "MyDelayedTask{" +"name='" + name + '\'' +", time=" + time +'}';}}输出:MyDelayedTask{name='task3', time=1900}MyDelayedTask{name='task2', time=3900}MyDelayedTask{name='task7', time=4900}MyDelayedTask{name='task4', time=5900}MyDelayedTask{name='task5', time=6900}MyDelayedTask{name='task6', time=7900}MyDelayedTask{name='task1', time=10000}
class DelayedTask implements Runnable, Delayed {private static int counter = 0;private final int id = counter++;private final int delta;private final long trigger;//存放延时任务protected static List<DelayedTask> sequence = new ArrayList<DelayedTask>();public DelayedTask(int delayInMilliseconds) {delta = delayInMilliseconds;//设置触发时间trigger = System.nanoTime() + NANOSECONDS.convert(delta, MILLISECONDS);sequence.add(this);}//获得多久后触发public long getDelay(TimeUnit unit) {return unit.convert(trigger - System.nanoTime(), NANOSECONDS);}//比较等待时间public int compareTo(Delayed arg) {DelayedTask that = (DelayedTask) arg;if (trigger < that.trigger) return -1;if (trigger > that.trigger) return 1;return 0;}public void run() {print(this + " ");}public String toString() {return String.format("[%1$-4d]", delta) + " Task " + id;}public String summary() {return "(" + id + ":" + delta + ")";}//末尾哨兵public static class EndSentinel extends DelayedTask {private ExecutorService exec;public EndSentinel(int delay, ExecutorService e) {super(delay);exec = e;}//因为是哨兵要确保是最后一个执行的 将打印前面所有任务的信息public void run() {for (DelayedTask pt : sequence) {print(pt.summary() + " ");}print();print(this + " Calling shutdownNow()");exec.shutdownNow();}}}class DelayedTaskConsumer implements Runnable {private DelayQueue<DelayedTask> q;public DelayedTaskConsumer(DelayQueue<DelayedTask> q) {this.q = q;}public void run() {try {while (!Thread.interrupted())//此处必须执行完任务才会去取下一个q.take().run(); // Run task with the current thread} catch (InterruptedException e) {// Acceptable way to exit}print("Finished DelayedTaskConsumer");}}public class DelayQueueDemo {public static void main(String[] args) {Random rand = new Random(47);ExecutorService exec = Executors.newCachedThreadPool();DelayQueue<DelayedTask> queue = new DelayQueue<DelayedTask>();// Fill with tasks that have random delays://放20个随机触发 5秒延时时间//DelayQueue底层是数组 按照二分查找进行排序for (int i = 0; i < 20; i++)queue.put(new DelayedTask(rand.nextInt(5000)));// Set the stopping pointqueue.add(new DelayedTask.EndSentinel(5000, exec));exec.execute(new DelayedTaskConsumer(queue));try {SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}//System.out.println(2278);//queue.add(new DelayedTask(10));}}
此代码需留意哨兵机制,如何安全的确保所有任务执行完毕后关闭线程池
性能调优
优先使用synchronized关键字入手,只有在性能调优时才替换为Lock对象这种做法.
