我们使用的是多线程并发
一个进程可以由很多线程
进程有自己的地址空间,不会影响其他进程。但是使用进程作为并发方式有数量和开销的限制。

Java的线程机制是抢占式的,这表示调度机制会周期性地中断线程,将上下文切换到另一个线程,从而为每个线程都提供时间片,使得每个线程都会分配到数量合理的时间去驱动它的任务。

互斥同步

互斥和同步是一种最常见、最主要的实现并发正确性(相对线程安全)的保障手段。

同步:指的是多个线程并发访问共享数据时,保证共享数据在同一个时刻只能被一个线程使用。

互斥:指的是实现同步的一种手段,临界区互斥量和信号量都是主要的互斥实现方式。

在java中,最基本的互斥同步手段就是synchronized关键字。synchronized可以保证线程竞争共享资源的正确性(多个线程并发访问共享数据时,保证共享数据在同一个时刻只能被一个线程使用)。

基本的线程机制

一个线程就是在进程中的一个单一的顺序控制流,因此,单个进程可以拥有多个并发执行的任务,但是你的程序使得每个任务都好像有自己的CPU一样。底层机制是切分CPU时间片,但我们通常不回去考虑它。

定义任务

线程可以驱动任务,因此你需要一种描述任务的方式。
描述任务 只需要 实现Runnable接口并编写它的run()方法,可以让该任务执行你的命令

  1. public class LiftOff implements Runnable {
  2. protected int countDown =10;
  3. private static int taskDown =0;
  4. private final int id = taskDown++;
  5. public LiftOff(){}
  6. public LiftOff(int countDown){
  7. this.countDown=countDown;
  8. }
  9. public String str() {
  10. return "#"+id+" ("+
  11. (countDown > 0? countDown :"发射!")+")";
  12. }
  13. @Override
  14. public void run() {
  15. while (countDown-->0){
  16. System.out.println(str());
  17. //对于Thread.yield()的调用是对线程调度器的一种建议。告诉CPU可以将一个线程转移给另一个线程
  18. Thread.yield();
  19. }
  20. }
  21. }

从Runnable导出一个类,必须实现run()方法,但是要想实现线程的行为,你必须显示的把一个任务附着到线程上

Thread类

将Runnable对象转变为工作任务的常规方式是把它提交给一个Thread构造器
Thread构造器可以只接受一个Runnable对象.
线程的调度机制是非确定性的,因此含有线程的程序每次运行的结果可能不同.

  1. public class Main {
  2. public static void main(String[] args) throws InterruptedException {
  3. Thread t = new Thread(new LiftOff);
  4. t.start();
  5. }
  6. }

Thread.start()方法:
使线程开始执行;Java虚拟机调用该线程的run方法。
结果是两个线程并发运行:当前线程(从调用start方法返回)和另一个线程(执行其run方法)。
t Main

使用Executor

Java SE5中的java.util.concurrent包中的执行器(Executor)将为你管理Thread对象从而简化了并发编程.

Executor在客户端和任务执行之间提供了一个间接层;与客户端直接执行任务不同,这个中介对象将执行任务,Executor允许你管理异步任务的执行,而无须显式地管理线程的生命周期.

Executor在Java SE5/6中是启动任务的优选方法.

CachedThreadPool

线程数根据任务动态调整的线程池;

  1. final LiftOff target = new LiftOff();
  2. for (int i = 0; i < 3; i++) {
  3. // TimeUnit.SECONDS.sleep(1);
  4. new Thread(target).start(); //三个线程抢占一个target,会导致问题
  5. }
  6. // Executors.newCachedThreadPool().execute(new LiftOff());
  7. ExecutorService service = Executors.newCachedThreadPool();
  8. service.execute(new LiftOff());
  9. service.shutdown();

等待所有线程任务执行完成后,再关闭程序。但是并不阻塞main方法的执行。
shutdown()方法可以防止新任务提交给这个Executor,当前线程(上例中的Main线程) 将继续执行shutdown()被调用前提交的所有任务。 这个程序将在Executor中所有任务完成之后尽快的退出。

线程池在程序结束的时候要关闭。
shutdown()方法关闭线程池的时候,它会等待正在执行的任务先完成,然后再关闭。
shutdownNow()会立刻停止正在执行的任务,
awaitTermination()则会等待指定的时间让线程池关闭。

如果我们想把线程池的大小限制在4~10个之间动态调整怎么办?我们查看Executors.newCachedThreadPool()方法的源码:

  1. public static ExecutorService newCachedThreadPool() {
  2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  3. 60L, TimeUnit.SECONDS,
  4. new SynchronousQueue<Runnable>());
  5. }

因此,想创建指定动态范围的线程池,可以这么写:

  1. int min = 4;
  2. int max = 10;
  3. ExecutorService es = new ThreadPoolExecutor(min, max,
  4. 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());

FixedThreadPool

  • 线程数固定的线程池;

FixedThreadPool使用了有限的线程集来执行所提交的任务.

  1. //Fixed 固定的
  2. //你的线程池中的线程是固定的数量 现有的线程会自动被复用
  3. ExecutorService executorService = Executors.newFixedThreadPool(5);
  4. for (int i = 0; i < 20; i++) {
  5. executorService.execute(new LiftOff());
  6. }
  7. executorService.shutdown();

有了FixedThreadPool,你就可以一次性预先执行代价高昂的线程分配,因而也就可以限制线程的数量了.
通过使用FixedThreadPool使用的Thread对象的数量是有界的.

在任何线程池中,现有线程在可能的情况下,都会被自动复用

CachedThreadPool在程序执行过程中通常会创建与所需数量相同的线程,然后在它回收线程时停止创建新线程.因此它是合理的Executor的首选.

当第一个任务执行完毕时,此时第n个以后任务还没有开创线程,将复用其他的线程.

SingleThreadExecutor

:仅单线程执行的线程池。
SingleThreadExecutor就像是线程数量为1的FixedThreadPool.
如果向SingleThreadExecutor提交多个任务,那么这些任务将排队,每个任务都会在下一个任务开始之前运行结束.所有的任务都将使用相同的线程.

SingleThreadExecutor会序列化所有提交给它的任务,并会维护它自己(隐藏)的悬挂任务队列.

SingleThreadExecutor运行线程,可以确保在任意时刻现场中都只有唯一的任务在运行.
SingleThreadExecutor可以让你省去只是为了维持某些事物的原型而进行的各种协调努力.通过序列化任务,你可以消除对序列化对象的需求.

ScheduledThreadPool

还有一种任务,需要定期反复执行,例如,每秒刷新证券价格。这种任务本身固定,需要反复执行的,可以使用ScheduledThreadPool。放入ScheduledThreadPool的任务可以定期反复执行。
创建一个ScheduledThreadPool仍然是通过Executors类:

  1. ScheduledExecutorService ses = Executors.newScheduledThreadPool(4);

我们可以提交一次性任务,它会在指定延迟后只执行一次:

  1. // 1秒后执行一次性任务:
  2. ses.schedule(new Task("one-time"), 1, TimeUnit.SECONDS);

如果任务以固定的每3秒执行,我们可以这样写:

  1. // 2秒后开始执行定时任务,每3秒执行:
  2. ses.scheduleAtFixedRate(new Task("fixed-rate"), 2, 3, TimeUnit.SECONDS);

如果任务以固定的3秒为间隔执行,我们可以这样写:

  1. // 2秒后开始执行定时任务,以3秒为间隔执行:
  2. ses.scheduleWithFixedDelay(new Task("fixed-delay"), 2, 3, TimeUnit.SECONDS);

Java标准库还提供了一个java.util.Timer类,这个类也可以定期执行任务,但是,一个Timer会对应一个Thread,所以,一个Timer只能定期执行一个任务,多个定时任务必须启动多个Timer,而一个ScheduledThreadPool就可以调度多个定时任务,所以,我们完全可以用ScheduledThreadPool取代旧的Timer。

Callable接口

runnable是执行工作的独立接口,它不返回任何值。如果希望任务完成时可以返一个值,可以使用Callable接口

在JavaSE5 中引入的callable接口是一种带有类型参数的泛型,他的类型参数表示的是从call()方法中返回的值而不是run()方法,并且必须用ExecutorService.submit()调用

  1. class TaskWithResult implements Callable {
  2. private int id;
  3. public TaskWithResult(int id) {
  4. this.id = id;
  5. }
  6. public String call() throws InterruptedException {
  7. /*for (int i = 0; i < 5; i++) {
  8. TimeUnit.SECONDS.sleep(1);
  9. System.out.println("is run");
  10. }*/
  11. return "结果是:" + id;
  12. }
  13. }
  14. public class CallableDemo {
  15. public static void main(String[] args) throws ExecutionException, InterruptedException {
  16. ExecutorService exe = Executors.newCachedThreadPool();
  17. final Future submit = exe.submit(new TaskWithResult(1));
  18. TimeUnit.MILLISECONDS.sleep(1); //这里加休眠是为了给编译器时间让他知道submbit.isDone
  19. if (submit.isDone()) {
  20. System.out.println(submit.get());
  21. }
  22. exe.shutdown();
  23. }
  24. 结果是 1
  25. }

submit()方法会产生Future对象,它用callable返回结果的特定类型进行了参数化。
你可以使用 isDone()方法来查询Future是否已经完成。
当任务完成时它具有一个结果,你可以调用get()方法来获取结果
submit()方法相当于start()方法,前者调用了call(),后者调用了run()
get()方法是为了拿到call()方法中的return

休眠

给定任务中止的时间
对sleep()的调用可以抛出InterruptedException异常,
因为异常不能跨线程传播回main(),所以你必须处理所有在任务内部产生的异常

  1. 旧方法
  2. thread.sleep();
  3. 新方法
  4. TimeUnit.MILLISECONDS.sleep();

让步

调用yield()方法,就是建议让具有相同优先级的其他线程可以运行。但是没有任何机制会保证它会被采纳

后台线程

所谓后台线程(daemon),是指程序运行的时候在后台提供的一种通用的服务的线程,并且这种线程并不属于程序中不可或缺的部分

当所有的非后台线程结束时,程序也就终止了,同时会杀死进程中所有的后台线程

  1. public class SimpleDaemons implements Runnable{
  2. @Override
  3. public void run() {
  4. /*while (true){
  5. System.out.println(Thread.currentThread() + " " + this);
  6. }*/
  7. try {
  8. //下面已经设置为后台线程,这里延迟大于主线程的175延迟的话,就什么也不会输出
  9. TimeUnit.MILLISECONDS.sleep(100);
  10. System.out.println(Thread.currentThread() + " " + this);
  11. } catch (InterruptedException e) {
  12. e.printStackTrace();
  13. }
  14. }
  15. public static void main(String[] args) throws InterruptedException {
  16. for (int i = 0; i < 4; i++) {
  17. final Thread thread = new Thread(new SimpleDaemons());
  18. thread.setDaemon(true);
  19. thread.start();
  20. }
  21. System.out.println("所有线程开始");
  22. TimeUnit.MILLISECONDS.sleep(175);
  23. }
  24. }

必须在线程启动之前调用setDaemon()方法,才能把它设置为后台线程

后台线程工厂

  1. public class DaemonThreadFactory implements ThreadFactory {
  2. @Override
  3. public Thread newThread(Runnable r) {
  4. Thread t = new Thread(r);
  5. t.setDaemon(true);
  6. return t;
  7. }
  8. }

它和普通的ThreadFactory区别就是将后台线程状态全部设置为了true。
现在可以用一个DaemonThreadFactory作为参数传递给线程池

  1. public class DaemonFromFactory implements Runnable{
  2. @Override
  3. public void run() {
  4. try {
  5. TimeUnit.MILLISECONDS.sleep(100);
  6. System.out.println(Thread.currentThread()+" "+this);
  7. } catch (InterruptedException e) {
  8. e.printStackTrace();
  9. }
  10. }
  11. public static void main(String[] args) throws InterruptedException {
  12. ExecutorService exec = Executors.newCachedThreadPool(new DaemonThreadFactory());
  13. for (int i = 0; i < 5; i++) {
  14. exec.execute(new DaemonFromFactory());
  15. }
  16. System.out.println("所有后台线程开始");
  17. TimeUnit.MILLISECONDS.sleep(500);
  18. }
  19. }

后台进程不执行finally语句情况下就会终止其run()方法

  1. class ADaemon implements Runnable {
  2. @Override
  3. public void run() {
  4. try {
  5. TimeUnit.MILLISECONDS.sleep(100);
  6. System.out.println("开始后台线程中。。。");
  7. } catch (InterruptedException e) {
  8. e.printStackTrace();
  9. } finally {
  10. System.out.println("Is this always should running?");
  11. }
  12. }
  13. }
  14. public class DaemonsDontRunFinally {
  15. public static void main(String[] args) throws InterruptedException {
  16. Thread thread = new Thread(new ADaemon());
  17. thread.setDaemon(true);
  18. thread.start();
  19. TimeUnit.MILLISECONDS.sleep(120);
  20. }
  21. }

编码的变体

1.继承Thread 在构造器中start 即可创建的实例具有线程行为

  1. public class SimpleThread extends Thread {
  2. private int countDown = 5;
  3. private static int threadCount = 0;
  4. public SimpleThread() {
  5. //传入线程名字
  6. super(Integer.toString(++threadCount));
  7. //当调用构造器时,即启动了线程 不用再放到thread中了
  8. start();
  9. }
  10. public String toString() {
  11. return "#" + getName() + "(" + countDown + "), ";
  12. }
  13. public void run() {
  14. while (true) {
  15. System.out.print(this);
  16. if (--countDown == 0)
  17. return;
  18. }
  19. }
  20. public static void main(String[] args) {
  21. for (int i = 0; i < 5; i++)
  22. new SimpleThread();
  23. }
  24. }

2.采用实现Runnable的方式 保留继承的能力

  1. public class SelfManaged implements Runnable {
  2. //实现Runnable接口 内部持有一个线程对象 将自身引用传入 保留继承能力
  3. private int countDown = 5;
  4. private Thread t = new Thread(this);
  5. public SelfManaged() {
  6. t.start();
  7. }
  8. public String toString() {
  9. return Thread.currentThread().getName() + "(" + countDown + "), ";
  10. }
  11. public void run() {
  12. while (true) {
  13. System.out.print(this);
  14. if (--countDown == 0)
  15. return;
  16. }
  17. }
  18. public static void main(String[] args) {
  19. for (int i = 0; i < 5; i++)
  20. new SelfManaged();
  21. }
  22. }

隐患: 在构造器中启动线程,如果另一个任务可能在构造器结束之前开始执行,这意味着该任务能够访问处于不稳定状态的对象.(即访问未完成初始化操作)

而选择使用Executor不会产生这个隐患.

3.内部类的形式

  1. class ThreadMethod {
  2. private int countDown = 5;
  3. private Thread t;
  4. private String name;
  5. public ThreadMethod(String name) {
  6. this.name = name;
  7. }
  8. public void runTask() {
  9. if (t == null) {
  10. t = new Thread(name) {
  11. public void run() {
  12. try {
  13. while (true) {
  14. System.out.println(this);
  15. if (--countDown == 0) return;
  16. sleep(10);
  17. }
  18. } catch (InterruptedException e) {
  19. System.out.println("sleep() 被打断了");
  20. }
  21. }
  22. public String toString() {
  23. return getName() + ": " + countDown;
  24. }
  25. };
  26. t.start();
  27. }
  28. }
  29. }
  30. public class InnerDemo {
  31. public static void main(String[] args) {
  32. ThreadMethod method = new ThreadMethod("a");
  33. method.runTask();
  34. }
  35. }

加入一个线程

一个线程可以在另一个线程上调用join()方法
如果一个线程(A)在另一个线程上(B)调用B.join方法,该线程会被挂起(A),等到B线程结束才恢复

也可以在调用join()时加入一个超时参数,这样如果目标线程在这段时间还没结束,join()方法总能返回

  1. class Sleeper extends Thread {
  2. private int duration;
  3. public Sleeper(String name, int sleepTime) {
  4. super(name);
  5. duration = sleepTime;
  6. start();
  7. }
  8. public void run() {
  9. try {
  10. sleep(duration);
  11. } catch (InterruptedException e) {
  12. System.out.println(getName()+"被打断了吗?-->"+interrupted());
  13. }
  14. System.out.println(getName()+"现在唤醒了");
  15. }
  16. }
  17. class Joiner extends Thread {
  18. private Sleeper sleeper;
  19. public Joiner(String name, Sleeper sleeper) {
  20. super(name);
  21. this.sleeper = sleeper;
  22. start();
  23. }
  24. public void run() {
  25. try {
  26. sleeper.join();
  27. } catch (InterruptedException e) {
  28. print("Interrupted");
  29. }
  30. print(getName() + " 加入已经完成");
  31. }
  32. }
  33. public class Joining {
  34. public static void main(String[] args) {
  35. //Sleeper sleepy = new Sleeper("Sleepy", 1500);
  36. //Joiner dopey = new Joiner("Dopey", sleepy);
  37. Sleeper grumpy = new Sleeper("Grumpy", 1500);
  38. Joiner doc = new Joiner("Doc", grumpy);
  39. grumpy.interrupt();
  40. }
  41. }

捕获异常

由于线程的本质特性,使得你不能捕获从线程中逃逸的异常.

  1. class ExceptionThread implements Runnable {
  2. public void run() {
  3. //此处没有用try-catch包裹 异常将逃出run()方法 其他地方无法捕获
  4. throw new RuntimeException();
  5. }
  6. }
  7. public class NaiveExceptionHandling {
  8. public static void main(String[] args) {
  9. try {
  10. ExecutorService exec = Executors.newCachedThreadPool();
  11. exec.execute(new ExceptionThread());
  12. } catch (RuntimeException ue) {
  13. //捕获语句不会执行 因为异常直接传播到控制台
  14. System.out.println("Exception has been handled!");
  15. }
  16. }
  17. }

Thread.UncaughtExceptionHandler是JavaSE 5 中的新接口,它允许你在每个Thread对象上都附着一个异常处理器.

Thread.UncaughtExceptionHandle.uncaughtException()会在线程因未捕获的异常而临近死亡时被调用.

为了调用uncaughtException(),我们创建了一个新的ThreadFactory,为每个新创建的Thread对象上附着一个
Thread.UncaughtExceptionHandler

  1. class ExceptionThread implements Runnable{
  2. @Override
  3. public void run() {
  4. final Thread t = Thread.currentThread();
  5. System.out.println("当前线程是"+t+" "+"en = "+t.getUncaughtExceptionHandler());
  6. System.out.println("---------下面抛出异常---------");
  7. throw new RuntimeException();
  8. }
  9. }
  10. class MyUncaughtException implements Thread.UncaughtExceptionHandler{
  11. // run()方法未捕获的异常在此处捕获
  12. @Override
  13. public void uncaughtException(Thread t, Throwable e) {
  14. System.out.println("捕获的异常为"+e);
  15. }
  16. }
  17. class HandlerThreadFactory implements ThreadFactory{
  18. //这个类的作用就是调用 Thread.UncaughtExceptionHandler.uncaughtException方法
  19. @Override
  20. public Thread newThread(Runnable r) {
  21. Thread t = new Thread(r);
  22. //System.out.println("创建了线程:" + t);
  23. //设置当此线程因未捕获的异常而突然终止时调用的处理程序
  24. //参数:这个对象用作这个线程的未捕获异常处理程序。如果为空,则此线程没有显式处理程序。
  25. t.setUncaughtExceptionHandler(new MyUncaughtException());
  26. System.out.println("en--> "+t.getUncaughtExceptionHandler());
  27. return t;
  28. }
  29. }
  30. public class CaptureUncaught {
  31. public static void main(String[] args) {
  32. ExecutorService exec = Executors.newCachedThreadPool(new HandlerThreadFactory());
  33. exec.execute(new ExceptionThread());
  34. }
  35. }
  1. /*
  2. 想要捕获线程中的异常,需要实现Thread.UncaughtExceptionHandler接口,
  3. 接口中的uncaughtException可以捕获异常
  4. 为了调用uncaughtException,我们要在ThreadFactory接口中 设置捕获异常的处理程序,
  5. 同时还可以get到该异常处理程序
  6. 为什么要用ThreadFactory接口? 因为创建线程池的构造器中可以传入该接口的实现类
  7. * */
  8. class MyException implements Thread.UncaughtExceptionHandler{
  9. @Override
  10. public void uncaughtException(Thread t, Throwable e) {
  11. System.out.println("被捕获的异常: "+e);
  12. }
  13. }
  14. class Factory implements ThreadFactory{
  15. @Override
  16. public Thread newThread(Runnable r) {
  17. Thread thread = new Thread(r);
  18. thread.setUncaughtExceptionHandler(new MyException());
  19. //问题 为什么会执行两遍?
  20. System.out.println(thread.getUncaughtExceptionHandler());
  21. return thread;
  22. }
  23. }
  24. public class CaughtException {
  25. public static void main(String[] args) {
  26. class Exception implements Runnable{
  27. @Override
  28. public void run() {
  29. Thread t = Thread.currentThread();
  30. System.out.println("当前线程是"+t);
  31. throw new RuntimeException();
  32. }
  33. }
  34. ExecutorService exec = Executors.newCachedThreadPool(new Factory());
  35. exec.execute(new Exception());
  36. /* //下面的代码没有设置异常捕获器
  37. Thread thread = new Thread(new Exception());
  38. thread.start();*/
  39. }
  40. }

共享受限的资源

可重入锁

可重入锁就是一个线程中可以多次获得同一把锁

即: 一个线程在执行一个带锁的方法,该方法中又调用了另一个需要相同锁的方法,
则该线程可以直接执行调用的方法,而无需重新获得锁;

在java中 ReentrantLock 和 synchronized都是可重入锁

  1. public class ReentrantLockDemo {
  2. //创建一个可重入锁
  3. private Lock lock = new ReentrantLock();
  4. public synchronized void method1(){
  5. System.out.println(Thread.currentThread().getName()+"执行了method1");
  6. method2();
  7. }
  8. public synchronized void method2(){
  9. System.out.println(Thread.currentThread().getName()+"执行了method2");
  10. }
  11. public void method3() {
  12. try {
  13. lock.tryLock(3,TimeUnit.SECONDS);
  14. } catch (InterruptedException e) {
  15. System.out.println(e);;
  16. }
  17. try {
  18. System.out.println(Thread.currentThread().getName()+"执行了method3");
  19. //因为在一个线程中 方法4 和方法3 使用的锁是一样的
  20. method4();
  21. }finally {
  22. lock.unlock();
  23. }
  24. }
  25. public void method4(){
  26. lock.lock();
  27. try {
  28. System.out.println(Thread.currentThread().getName()+"执行了method4");
  29. }finally {
  30. lock.unlock();
  31. }
  32. }
  33. public static void main(String[] args) {
  34. ReentrantLockDemo lockDemo = new ReentrantLockDemo();
  35. new Thread(()->{
  36. lockDemo.method1();
  37. },"t1").start();
  38. //方法构造器没有参数可以这样写
  39. new Thread(lockDemo::method3,"t2").start();
  40. /*和上面写法效果一样
  41. new Thread(new Runnable() {
  42. @Override
  43. public void run() {
  44. lockDemo.method1();
  45. }
  46. },"t1").start();*/
  47. }
  48. }
  49. t1执行了method1
  50. t1执行了method2
  51. t2执行了method3
  52. t2执行了method4

使用显式的Lock对象

1.Lock对象相当于一个对象锁,但可以控制获得锁和释放锁的位置

  1. private Lock lock = new ReentrantLock();
  2. public int next() {
  3. lock.lock();
  4. try {
  5. ++currentEvenValue;
  6. Thread.yield(); // Cause failure faster
  7. ++currentEvenValue;
  8. return currentEvenValue;
  9. } finally {
  10. lock.unlock();
  11. }
  12. }

必须使用try-finally语句确保锁的释放

2.ReentrantLock 允许尝试获取锁但最终未获取锁

这样如果其他人已经获取了这个锁,那么你就可以决定离开去执行其他一些事情,而不是等待直至这个锁被释放.

  1. private ReentrantLock lock = new ReentrantLock();
  2. public void untimed() {
  3. //tryLock()进在锁没有被其他线程持有时获取锁
  4. boolean captured = lock.tryLock();
  5. try {
  6. System.out.println("tryLock(): " + captured);
  7. } finally {
  8. if (captured)
  9. lock.unlock();
  10. }
  11. }
  12. public void timed() {
  13. boolean captured = false;
  14. try {
  15. //尝试获取锁的时间 超过抛错
  16. captured = lock.tryLock(2, TimeUnit.SECONDS);
  17. } catch (InterruptedException e) {
  18. throw new RuntimeException(e);
  19. }
  20. try {
  21. System.out.println("tryLock(2, TimeUnit.SECONDS): " +captured);
  22. } finally {
  23. if (captured)
  24. lock.unlock();
  25. }
  26. }

显式的Lock对象在加锁和释放锁方面,相对于内建的synchronized锁来说,还赋予了更细粒度的控制力.

小结
ReentrantLock可以替代synchronized进行同步;
ReentrantLock获取锁更安全;线程在tryLock()失败的时候不会导致死锁。
必须先获取到锁,再进入try {…}代码块,最后使用finally保证释放锁;
可以使用tryLock()尝试获取锁。

原子类

对于常规编程来说,他们很少会用,但是在涉及性能调优的时候,他们会有大用场

原子操作 不存在中间状态,要么 发生 要么没有发生.

原子性可以应用于除long和double之外的所有基本类型之上的”简单操作”.

JVM可以将64位(long和double变量)的读取和写入当作两个分离的32位操作来执行.这就产生了在一个读取和写入操作中间发生上下文切换,从而导致不同的任务可以看到不正确结果的可能性.(字撕裂)

如果使用volatile关键字,就会获得(简单的赋值与返回操作的)原子性.

volatile关键字还确保了应用中的可视性.如果你将一个域声明为volatile的,那么只要对这个域产生了写操作,那么所有的读操作都可以看到这个修改.即便使用了本地缓存,volatile域会被立即被写入到主存中,而读取操作就发生在主存中.

javaSE5 引入了像 AtomicInteger、 AtomicLong 、AtomicReference 等特殊原子类的变量类
他们提供下面形式的原子性条件更新操作

  1. 如果当前值==预期值,则自动将该值设置为给定的更新值。
  2. 参数: 期望-期望值
  3. Update -新值
  4. 返回:
  5. 如果成功。False return表示实际值与期望值不相等
  6. public final boolean compareAndSet(int expect, int update) {
  7. return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
  8. }

这些类被调整为可以使用在某些现代处理器上的可获得的,并且是在机器级别上的原子性

例如:用AtomicInteger 来重写 AtomicityTest

  1. public class AtomcityTest1 implements Runnable {
  2. private int i =0;
  3. public int getValue(){return i;}
  4. private synchronized void evenIncrement(){++i;++i;}
  5. @Override
  6. public void run() {
  7. while (true) {
  8. evenIncrement();
  9. }
  10. }
  11. public static void main(String[] args) {
  12. ExecutorService exec = Executors.newCachedThreadPool();
  13. AtomcityTest1 at = new AtomcityTest1();
  14. exec.execute(at);
  15. while (true){
  16. int value = at.getValue();
  17. if (value%2 != 0){
  18. System.out.println(value);
  19. System.exit(0);
  20. }
  21. }
  22. }
  23. }
  24. 会输出一个 奇数
  1. //利用原子性操作改进代码
  2. public class AtomcityTest1 implements Runnable{
  3. private AtomicInteger i = new AtomicInteger(0);
  4. public int getValue() {
  5. return i.get();
  6. }
  7. private void evenIncrement(){
  8. i.addAndGet(2);
  9. }
  10. @Override
  11. public void run() {
  12. while (true){
  13. evenIncrement();
  14. }
  15. }
  16. public static void main(String[] args) {
  17. ExecutorService exec = Executors.newCachedThreadPool();
  18. AtomcityTest1 at = new AtomcityTest1();
  19. exec.execute(at);
  20. while (true){
  21. int value = at.getValue();
  22. if (value%2 !=0){
  23. System.out.println(value+"不是偶数");
  24. System.exit(0);
  25. }
  26. }
  27. }
  28. }
  29. 不会输出任何奇数,因为是原子性的


volatile

在Java虚拟机中,变量的值保存在主内存中,但是,当线程访问变量时,它会先获取一个副本,并保存在自己的工作内存中。如果线程修改了变量的值,虚拟机会在某个时刻把修改后的值回写到主内存,但是,这个时间是不确定的!

这会导致如果一个线程更新了某个变量,另一个线程读取的值可能还是更新前的。
例如,主内存的变量a = true,线程1执行a = false时,它在此刻仅仅是把变量a的副本变成了false,
主内存的变量a还是true,在JVM把修改后的a回写到主内存之前,
其他线程读取到的a的值仍然是true,这就造成了多线程之间共享的变量不一致。

因此,volatile关键字的目的是告诉虚拟机:

  • 每次访问变量时,总是获取主内存的最新值;
  • 每次修改变量后,立刻回写到主内存。

volatile关键字解决的是可见性问题:当一个线程修改了某个共享变量的值,其他线程能够立刻看到修改后的值。

synchronized

在java中,最基本的互斥同步手段就是synchronized关键字。

synchronized可以保证线程竞争共享资源的正确性
(多个线程并发访问共享数据时,保证共享数据在同一个时刻只能被一个线程使用)。

synchronized特性

原子性:确保线程互斥的访问同步代码。synchronized保证只有一个线程拿到锁,进入同步代码块操作共享资源,因此具有原子性。

可见性:保证共享变量的修改能够及时课件。执行synchronized时,会对应执行lock,unlock原子操作。lock操作,就会清空工作空间该变量的值;执行unlock操作之前,必须先把变量同步回主内存中。

有序性:synchronized内的代码和外部的代码禁止排序,至于内部的代码,则不会禁止排序,但是由于只有一个线程进入同步代码块,因此在同步代码块中相当于是单线程的,根据as-if-serial语义,即使代码块内部发生了重排序,也不会影响程序执行的结果。

悲观锁:synchronized是悲观锁,每次使用共享资源时都认为会和其他线程产生竞争,所以每次使用共享资源都会上锁。

独占锁:synchronized是独占锁,该锁一次只能被一个线程所持有,其他线程被阻塞。

非公平锁:synchronized是非公平锁,线程获取锁的顺序可以不按照线程的阻塞顺序,允许线程发出请求后立即尝试获取锁。

可重入锁:synchronized是可重入锁。持锁线程可以再次获取自己的内部的锁。

悲观锁 or 乐观锁:是否一定要锁。
共享锁 or 独占锁(排他锁):是否可以有多个线程同时拿锁。
公平锁 or 非公平锁:是否按阻塞顺序拿锁。
可重入锁 or 不可重入锁: 拿锁线程是否可以多次拿锁。

实例锁

作用于给当前实例加锁,进入同步代码前要获得当前实例的锁。
(注意:是当前实例,一个类有N个实例,一个实例有一把锁)。

Java以提供关键字synchronized的形式,为防止资源冲突提供了内置支持.当任务要执行被synchronized关键字保护的代码片段的时候,它将检查锁是否可用,然后获取锁,执行代码,释放锁.

所有的对象都自动含有单一的锁(也称为监视器).当在对象上调用其任意synchronized方法的时候,此对象都被加锁.这时该对象的其他synchronized方法只有等到前一个方法调用完毕并释放了锁之后才能被调用.

所以使用对象锁的情况,只有使用同一实例的线程才会受锁的影响,
多个实例调用同一方法也不会受影响。

在其他对象上的同步:
两个任务可以同时进入一个对象,只要这个对象上的方法是在不同的锁上同步的即可。
(即多个实例调用同一方法也不会受影响。)
(下面例子中的 f() 方法 和 m() 方法 )

  1. class InsLock {
  2. private Object insLock = new Object(); //成员变量,方法中的每个实例都拥有
  3. //实例锁
  4. /*synchronized*/ private void f() {
  5. synchronized (this) { //和synchronized private void f()等价
  6. try {
  7. System.out.println("当前线程为:" + Thread.currentThread().getName() + " " +
  8. System.currentTimeMillis() + "执行f()方法");
  9. TimeUnit.SECONDS.sleep(2);
  10. System.out.println("当前线程为:" + Thread.currentThread().getName() + " " +
  11. System.currentTimeMillis() + "离开f()方法");
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. }
  15. }
  16. }
  17. //对象锁
  18. //同一个实例 中m()方法 和 f()方法可以同时执行,不会出现争抢资源
  19. //因为m方法 被对象锁 锁住 , f方法被实例锁 锁住,它们不是同一把锁
  20. private void m() {
  21. synchronized (insLock) { //和synchronized (this.insLock)等价
  22. try {
  23. System.out.println("当前线程为:" + Thread.currentThread().getName() + " " +
  24. System.currentTimeMillis() + "执行m()方法");
  25. TimeUnit.SECONDS.sleep(2);
  26. System.out.println("当前线程为:" + Thread.currentThread().getName() + " " +
  27. System.currentTimeMillis() + "离开m()方法");
  28. } catch (InterruptedException e) {
  29. e.printStackTrace();
  30. }
  31. }
  32. }
  33. synchronized private void g() {
  34. try {
  35. System.out.println("当前线程为:" + Thread.currentThread().getName() +" "+
  36. System.currentTimeMillis() + "执行g()方法");
  37. TimeUnit.SECONDS.sleep(2);
  38. System.out.println("当前线程为:" + Thread.currentThread().getName() +" "+
  39. System.currentTimeMillis() + "离开g()方法");
  40. } catch (InterruptedException e) {
  41. e.printStackTrace();
  42. }
  43. }public static void main(String[] args) {
  44. InsLock str = new InsLock();
  45. InsLock str2 = new InsLock();
  46. new Thread(str::f).start();
  47. new Thread(str2::g).start();
  48. //换成类锁
  49. /*new Thread(()->{
  50. f();
  51. },"t1").start();
  52. new Thread(()->{
  53. g();
  54. },"t2").start();*/
  55. }
  56. }



实例锁的实现

  1. 直接加在方法上

    1. class InLock{
    2. synchronized private void f(){}
    3. }
  2. 作用于代码块

    1. private void f(){
    2. synchronized (this) {
    3. //方法
    4. }
    5. }

3.作用于静态方法

表示找类锁,类锁永远只有一把,就算创建了100个对象,那类锁也只有一把。
对象锁:一个对象一把锁,
类锁:100个对象,也是一把锁。

  1. class InLock{
  2. synchronized static private void f(){}
  3. }

类锁的实现

1.通过对Class对象上锁

  1. class InLock{
  2. private void f(){
  3. synchronized (InsLock.class) {
  4. //方法
  5. }
  6. }
  7. }

2.通过static对象锁
同 实例锁 作用于static方法上

死锁

一个线程可以获取一个锁后,再继续获取另一个锁。例如:

  1. public void add(int m) {
  2. synchronized(lockA) { // 获得lockA的锁
  3. this.value += m;
  4. synchronized(lockB) { // 获得lockB的锁
  5. this.another += m;
  6. } // 释放lockB的锁
  7. } // 释放lockA的锁
  8. }
  9. public void dec(int m) {
  10. synchronized(lockB) { // 获得lockB的锁
  11. this.another -= m;
  12. synchronized(lockA) { // 获得lockA的锁
  13. this.value -= m;
  14. } // 释放lockA的锁
  15. } // 释放lockB的锁
  16. }

在获取多个锁的时候,不同线程获取多个不同对象的锁可能导致死锁。
对于上述代码,线程1和线程2如果分别执行add()和dec()方法时:

  • 线程1:进入add(),获得lockA;
  • 线程2:进入dec(),获得lockB。

随后:

  • 线程1:准备获得lockB,失败,等待中;
  • 线程2:准备获得lockA,失败,等待中。

此时,两个线程各自持有不同的锁,然后各自试图获取对方手里的锁,造成了双方无限等待下去,这就是死锁。

死锁发生后,没有任何机制能解除死锁,只能强制结束JVM进程。

那么我们应该如何避免死锁呢?答案是:线程获取锁的顺序要一致。即严格按照先获取lockA,再获取lockB的顺序,改写dec()方法如下:

  1. public void dec(int m) {
  2. synchronized(lockA) { // 获得lockA的锁
  3. this.value -= m;
  4. synchronized(lockB) { // 获得lockB的锁
  5. this.another -= m;
  6. } // 释放lockB的锁
  7. } // 释放lockA的锁
  8. }

线程之间的协作

使用wait和notify

synchronized并没有解决多线程协调的问题。例如

  1. class TaskQueue {
  2. Queue<String> queue = new LinkedList<>();
  3. public synchronized void addTask(String s) {
  4. this.queue.add(s);
  5. }
  6. public synchronized String getTask() {
  7. while (queue.isEmpty()) {
  8. }
  9. return queue.remove();
  10. }
  11. }

上述代码看上去没有问题:getTask()内部先判断队列是否为空,如果为空,就循环等待,直到另一个线程往队列中放入了一个任务,while()循环退出,就可以返回队列的元素了。

但实际上while()循环永远不会退出。因为线程在执行while()循环时,已经在getTask()入口获取了this锁,其他线程根本无法调用addTask(),因为addTask()执行条件也是获取this锁。

因此,执行上述代码,线程会在getTask()中因为死循环而100%占用CPU资源。

我们想要的执行效果是:

  • 线程1可以调用addTask()不断往队列中添加任务;
  • 线程2可以调用getTask()从队列中获取任务。如果队列为空,则getTask()应该等待,直到队列中至少有一个任务时再返回。

多线程协调运行的原则就是:当条件不满足时,线程进入等待状态;
当条件满足时,线程被唤醒,继续执行任务。

wait()方法的执行机制非常复杂。

首先,它不是一个普通的Java方法,而是定义在Object类的一个native方法,也就是由JVM的C代码实现的。

其次,必须在synchronized块中才能调用wait()方法,因为wait()方法调用时,会释放线程获得的锁,wait()方法返回后,线程又会重新试图获得锁。

在wait()期间对象的锁是释放的
可以通过notify() 、notifyAll()、或者令时间到期,从wait()中恢复执行

改进上面的代码

  1. public synchronized String getTask() {
  2. while (queue.isEmpty()) {
  3. // 释放this锁:
  4. this.wait();
  5. // 重新获取this锁
  6. }
  7. return queue.remove();
  8. }
  1. public synchronized void addTask(String s) {
  2. this.queue.add(s);
  3. this.notify(); // 唤醒在this锁等待的线程
  4. }

注意到在往队列中添加了任务后,线程立刻对this锁对象调用notify()方法,
这个方法会唤醒一个正在this锁等待的线程(就是在getTask()中位于this.wait()的线程),
从而使得等待线程从this.wait()方法返回。

notify()只会唤醒其中一个(具体哪个依赖操作系统,有一定的随机性)。
这是因为可能有多个线程正在getTask()方法内部的wait()中等待,

使用notifyAll()将一次性全部唤醒。通常来说,notifyAll()更安全。
有些时候,如果我们的代码逻辑考虑不周,用notify()会导致只唤醒了一个线程,而其他线程可能永远等待下去醒不过来了。

注意到wait()方法返回时需要重新获得this锁。
假设当前有3个线程被唤醒,唤醒后,首先要等待执行addTask()的线程结束此方法后,才能释放this锁,
随后,这3个线程中只能有一个获取到this锁,剩下两个将继续等待。

上面的例子完整版

  1. class TaskQueue2 implements Runnable {
  2. @Override
  3. public void run() {
  4. }
  5. Queue<String> queue = new LinkedList<>();
  6. public synchronized void addTask(String s) {
  7. this.queue.add(s);
  8. //生产者生产完毕 提醒消费者消费
  9. this.notify(); //唤醒 使用相同锁 但是在等待的线程
  10. }
  11. public synchronized String getTask() throws InterruptedException {
  12. //空的时候 消费者等待
  13. while (queue.isEmpty()) {
  14. this.wait(); //调用该方法会释放当前锁,等被唤醒后又重新尝试获取锁
  15. System.out.println("==");
  16. }
  17. //消费者消费
  18. return queue.remove();
  19. }
  20. public static void main(String[] args) {
  21. TaskQueue2 task = new TaskQueue2();
  22. //生产者
  23. new Thread(() -> {
  24. //task2.addTask("sss");
  25. task.addTask("abc");
  26. System.out.println(task.queue);
  27. }).start();
  28. //消费者
  29. new Thread(() -> {
  30. try {
  31. task.getTask();
  32. System.out.println(task.queue);
  33. } catch (InterruptedException e) {
  34. e.printStackTrace();
  35. }
  36. }).start();
  37. }
  38. }

使用Condition

Condition可以替代wait和notify;
Condition对象必须从Lock对象获取。

使用ReentrantLock比直接使用synchronized更安全,可以替代synchronized进行线程同步。
但是,synchronized可以配合wait和notify实现线程在条件不满足时等待,条件满足时唤醒,用ReentrantLock我们怎么编写wait和notify的功能呢?

答案是使用Condition对象来实现wait和notify的功能。
我们仍然以TaskQueue为例,把前面用synchronized实现的功能通过ReentrantLock和Condition来实现:

  1. class TaskQueue {
  2. private final Lock lock = new ReentrantLock();
  3. private final Condition condition = lock.newCondition();
  4. private Queue<String> queue = new LinkedList<>();
  5. public void addTask(String s) {
  6. lock.lock();
  7. try {
  8. queue.add(s);
  9. condition.signalAll();
  10. } finally {
  11. lock.unlock();
  12. }
  13. }
  14. public String getTask() {
  15. lock.lock();
  16. try {
  17. while (queue.isEmpty()) {
  18. condition.await();
  19. }
  20. return queue.remove();
  21. } finally {
  22. lock.unlock();
  23. }
  24. }
  25. }

可见,使用Condition时,引用的Condition对象必须从Lock实例的newCondition()返回,这样才能获得一个绑定了Lock实例的Condition实例。
Condition提供的await()、signal()、signalAll()原理和synchronized锁对象的wait()、notify()、notifyAll()是一致的,并且其行为也是一样的:

  • await()会释放当前锁,进入等待状态;
  • signal()会唤醒某个等待线程;
  • signalAll()会唤醒所有等待线程;
  • 唤醒线程从await()返回后需要重新获得锁。

此外,和tryLock()类似,await()可以在等待指定时间后,如果还没有被其他线程通过signal()或signalAll()唤醒,可以自己醒来:

  1. if (condition.await(1, TimeUnit.SECOND)) {
  2. // 被其他线程唤醒
  3. } else {
  4. // 指定时间内没有被其他线程唤醒
  5. }

新类库中的构件

Semaphore(信号量)

用来控制同时访问特定资源的线程数量,通过协调各个线程以保证合理地使用公共资源。

Semaphore通过使用计数器来控制对共享资源的访问。

允许n个任务同时访问这个资源

如果计数器大于0,则允许访问。 如果为0,则拒绝访问。
计数器所计数的是允许访问共享资源的许可。 因此,要访问资源,必须从信号量中授予线程许可。

API
void acquire()从信号量获取一个许可,如果无可用许可前将一直阻塞等待,

void acquire(int permits) :获取指定数目的许可,如果无可用许可前也将会一直阻塞等待

boolean tryAcquire():从信号量尝试获取一个许可,如果无可用许可,直接返回false,不会阻塞

boolean tryAcquire(int permits): 尝试获取指定数目的许可,如果无可用许可直接返回false

boolean tryAcquire(int permits, long timeout, TimeUnit unit)
在指定的时间内尝试从信号量中获取许可,如果在指定的时间内获取成功,返回true,否则返回false

void release()释放一个许可,别忘了在finally中使用,注意:多次调用该方法,会使信号量的许可数增加,达到动态扩展的效果,如:初始permits为1,调用了两次release,最大许可会改变为2

int availablePermits() 获取当前信号量可用的许可

构造器

  1. public Semaphore(int permits) { //参数permits表示许可数目,即同时可以允许多少线程进行访问
  2. sync = new NonfairSync(permits);
  3. }
  4. public Semaphore(int permits, boolean fair) { //这个多了一个参数fair表示是否是公平的,即等待时间越久的越先获取许可
  5. sync = (fair)? new FairSync(permits) : new NonfairSync(permits);
  6. }

假若一个工厂有5台机器,但是有8个工人,一台机器同时只能被一个工人使用,只有使用完了,其他工人才能继续使用。

  1. public class Test {
  2. public static void main(String[] args) {
  3. int N = 8; //工人数
  4. Semaphore semaphore = new Semaphore(5); //机器数目
  5. for(int i=0;i<N;i++)
  6. new Worker(i,semaphore).start();
  7. }
  8. static class Worker extends Thread{
  9. private int num;
  10. private Semaphore semaphore;
  11. public Worker(int num,Semaphore semaphore){
  12. this.num = num;
  13. this.semaphore = semaphore;
  14. }
  15. @Override
  16. public void run() {
  17. try {
  18. semaphore.acquire();
  19. System.out.println("工人"+this.num+"占用一个机器在生产...");
  20. Thread.sleep(2000);
  21. System.out.println("工人"+this.num+"释放出机器");
  22. semaphore.release();
  23. } catch (InterruptedException e) {
  24. e.printStackTrace();
  25. }
  26. }
  27. }
  28. }
  29. 工人2占用一个机器在生产...
  30. 工人4占用一个机器在生产...
  31. 工人0占用一个机器在生产...
  32. 工人3占用一个机器在生产...
  33. 工人1占用一个机器在生产...
  34. 工人2释放出机器
  35. 工人5占用一个机器在生产...
  36. 工人1释放出机器
  37. 工人4释放出机器
  38. 工人3释放出机器
  39. 工人0释放出机器
  40. 工人7占用一个机器在生产...
  41. 工人6占用一个机器在生产...
  42. 工人5释放出机器
  43. 工人6释放出机器
  44. 工人7释放出机器

CountDownLatch

CountDownLatch是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程的操作执行完后再执行。

CountDownLatch 被设计为只触发一次,计数值不能被重新赋值

CountDownLatch的构造函数接收一个int类型的参数作为计数器,如果你想等待N个点完成,这里就传入N。这里所说的N个 点,可以是N个线程,也可以是1个线程里的N个执行步骤。CountDownLatch构造函数
如下

  1. public CountDownLatch(int count) {
  2. if (count < 0) throw new IllegalArgumentException("count < 0");
  3. this.sync = new Sync(count);
  4. }

计数器参数count必须大于等于0,等于0的时候,调用await方法时不会 阻塞当前线程。

当我们调用CountDownLatch的countDown()方法时,N就会减1,
CountDownLatch的await()方法 会阻塞当前线程,直到N变成零

  1. //有三个工人在为老板干活,这个老板有一个习惯,就是当三个工人把一天的活都干完了的时候,
  2. // 他就来检查所有工人所干的活。记住这个条件:三个工人先全部干完活,老板才检查。
  3. // 所以在这里用Java代码设计两个类,Worker代表工人,Boss代表老板
  4. public class Worker implements Runnable{
  5. private CountDownLatch countDownLatch;
  6. private final String name;
  7. public Worker(CountDownLatch countDownLatch, String name){
  8. this.countDownLatch =countDownLatch;
  9. this.name =name;
  10. }
  11. @Override
  12. public void run() {
  13. this.doWork();
  14. try {
  15. TimeUnit.SECONDS.sleep(1);
  16. } catch (InterruptedException e) {
  17. }
  18. System.out.println(this.name+"工作完成了");
  19. this.countDownLatch.countDown();
  20. }
  21. void doWork(){
  22. System.out.println(this.name+" 正在工作!");
  23. }
  24. }
  1. public class Boss implements Runnable{
  2. private CountDownLatch countDownLatch;
  3. public Boss(CountDownLatch countDownLatch){
  4. this.countDownLatch =countDownLatch;
  5. }
  6. @Override
  7. public void run() {
  8. System.out.println("老板正在等待所有工人干完活。。。。");
  9. try {
  10. countDownLatch.await();
  11. } catch (InterruptedException e) {
  12. System.out.println("老板在等待的时候发生了异常");
  13. }
  14. System.out.println("所有工人的活儿干完了,老板开始检查工作。。");
  15. }
  16. }
  1. public class WorkTest {
  2. public static void main(String[] args) {
  3. CountDownLatch countDownLatch = new CountDownLatch(3);
  4. ExecutorService exec = Executors.newCachedThreadPool();
  5. Worker w1 = new Worker(countDownLatch, "张三");
  6. Worker w2 = new Worker(countDownLatch, "李四");
  7. Worker w3 = new Worker(countDownLatch, "王五");
  8. Boss boss = new Boss(countDownLatch);
  9. exec.execute(w1);
  10. exec.execute(w2);
  11. exec.execute(w3);
  12. exec.execute(boss);
  13. exec.shutdown();
  14. }
  15. }
  1. 张三 正在工作!
  2. 老板正在等待所有工人干完活。。。。
  3. 王五 正在工作!
  4. 李四 正在工作!
  5. 张三工作完成了
  6. 王五工作完成了
  7. 李四工作完成了
  8. 所有工人的活儿干完了,老板开始检查工作。。
  9. Process finished with exit code 0

CyclicBarrier

CountDownLatch 被设计为只触发一次,但是CyclicBarrier可以

字面意思回环栅栏,通过它可以实现让一组线程等待至某个状态之后再全部同时执行。
叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。
我们暂且把这个状态就叫做barrier,当调用await()字面意思回环栅栏,通过它可以实现让一组线程等待至某个状态之后再全部同时执行。
叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。我们暂且把这个状态就叫做barrier,当调用await()方法之后,线程就处于barrier了。

CyclicBarrier类位于java.util.concurrent包下,CyclicBarrier提供2个构造器:方法之后,线程就处于barrier了。

  1. public CyclicBarrier(int parties, Runnable barrierAction) {
  2. }
  3. public CyclicBarrier(int parties) {
  4. }

image.png
参数parties指让多少个线程或者任务等待至barrier状态;
参数barrierAction为当这些线程都达到barrier状态时会执行的内容。

然后CyclicBarrier中最重要的方法就是await方法,它有2个重载版本:
image.png
第一个版本比较常用,用来挂起当前线程,直至所有线程都到达barrier状态再同时执行后续任务;
第二个版本是让这些线程等待至一定的时间,如果还有线程没有到达barrier状态就直接让到达barrier的线程执行后续任务。

  1. class Add implements Runnable{
  2. private AtomicInteger atomicInteger;
  3. private CyclicBarrier barrier;
  4. public Add(CyclicBarrier barrier,AtomicInteger atomicInteger){
  5. this.barrier=barrier;
  6. this.atomicInteger=atomicInteger;
  7. }
  8. @Override
  9. public void run() {
  10. try {
  11. while (!Thread.currentThread().isInterrupted()){
  12. System.out.println("===");
  13. atomicInteger.incrementAndGet();
  14. System.out.println(Thread.currentThread().getName()+" ->"+atomicInteger);
  15. barrier.await();
  16. }
  17. } catch (InterruptedException e) {
  18. e.printStackTrace();
  19. } catch (BrokenBarrierException e) {
  20. e.printStackTrace();
  21. }
  22. }
  23. }
  24. class Sum implements Runnable{
  25. private AtomicInteger atomicInteger;
  26. private ExecutorService exec;
  27. public Sum(AtomicInteger atomicInteger,ExecutorService exec){
  28. this.atomicInteger=atomicInteger;
  29. this.exec =exec;
  30. }
  31. @Override
  32. public void run() {
  33. if (atomicInteger.get() ==20){
  34. exec.shutdownNow();
  35. }
  36. }
  37. }
  38. public class CyclicBarrierDemo3 {
  39. public static void main(String[] args) throws InterruptedException {
  40. AtomicInteger atomicInteger = new AtomicInteger();
  41. ExecutorService exec = Executors.newCachedThreadPool();
  42. CyclicBarrier barrier = new CyclicBarrier(2, new Sum(atomicInteger,exec));
  43. for (int i = 0; i < 2; i++) {
  44. exec.execute(new Add(barrier,atomicInteger));
  45. }
  46. TimeUnit.SECONDS.sleep(2);
  47. System.out.println(atomicInteger.get());
  48. }
  49. }
  1. class Horse implements Runnable {
  2. private static int counter = 0;
  3. private final int id = counter++;
  4. private int strides = 0;
  5. private static Random rand = new Random(47);
  6. private static CyclicBarrier barrier;
  7. public Horse(CyclicBarrier b) {
  8. barrier = b;
  9. }
  10. public /*synchronized*/ int getStrides() {
  11. return strides;
  12. }
  13. public void run() {
  14. try {
  15. while (!Thread.interrupted()) {
  16. // synchronized (this) {
  17. strides += rand.nextInt(3); // Produces 0, 1 or 2
  18. // }
  19. //等待直到在此barrie上的所有线程都进入await()
  20. //查看源码可以得到 最后一个调用await()会调用barrier的run()方法
  21. //直到所有的马都完成前进后 所有的马被唤醒进入下一次循环
  22. barrier.await(); //等所有线程结束
  23. }
  24. } catch (InterruptedException e) {
  25. // A legitimate way to exit
  26. } catch (BrokenBarrierException e) {
  27. // This one we want to know about
  28. throw new RuntimeException(e);
  29. }
  30. }
  31. public String toString() {
  32. return "Horse " + id + " ";
  33. }
  34. public String tracks() {
  35. StringBuilder s = new StringBuilder();
  36. for (int i = 0; i < getStrides(); i++)
  37. s.append("*");
  38. s.append(id);
  39. return s.toString();
  40. }
  41. }
  42. public class HorseRace {
  43. static final int FINISH_LINE = 75;
  44. private List<Horse> horses = new ArrayList<Horse>();
  45. private ExecutorService exec =
  46. Executors.newCachedThreadPool();
  47. private CyclicBarrier barrier;
  48. for (int i = 0; i < nHorses; i++) {
  49. Horse horse = new Horse(barrier);
  50. horses.add(horse);
  51. exec.execute(horse);
  52. }
  53. public HorseRace(int nHorses, final int pause) {
  54. //所有的任务持有一个公共的CyclicBarrier,Horses中的任务跑完后,
  55. 再去执行CyclicBarrier中的任务
  56. barrier = new CyclicBarrier(nHorses, new Runnable() {
  57. public void run() {
  58. StringBuilder s = new StringBuilder();
  59. for (int i = 0; i < FINISH_LINE; i++)
  60. s.append("="); // The fence on the racetrack
  61. print(s);
  62. for (Horse horse : horses)
  63. print(horse.tracks());
  64. for (Horse horse : horses)
  65. if (horse.getStrides() >= FINISH_LINE) {
  66. print(horse + "won!");
  67. exec.shutdownNow();
  68. return;
  69. }
  70. try {
  71. TimeUnit.MILLISECONDS.sleep(pause);
  72. } catch (InterruptedException e) {
  73. print("barrier-action sleep interrupted");
  74. }
  75. }
  76. });
  77. }
  78. public static void main(String[] args) {
  79. int nHorses = 7;
  80. int pause = 200;
  81. new HorseRace(nHorses, pause);
  82. }
  83. }

DelayQueue

DelayQueue 是一个无界阻塞队列,要添加进去的元素必须实现Delayed接口,
只有在延迟期满时才能从中提取元素。

当生产者线程调用put之类的方法加入元素时,会触发Delayed接口中的compareTo方法进行排序,
也就是说队列中元素的顺序是按到期时间排序的,而非它们进入队列的顺序。
排在队列头部的元素是最早到期的,越往后到期时间赿晚。

该队列的头部 是延迟期满后保存时间最长的 Delayed 元素。
如果延迟都还没有期满,则队列没有头部,并且 poll 将返回 null。
当一个元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一个小于等于 0 的值时,表示该元素到期了。
无法使用 take 或 poll 移除未到期的元素,也不会将这些元素作为正常元素对待。
例如,size 方法同时返回到期和未到期元素的计数。
此队列不允许使用 null 元素。


image.png

  1. public class DelayQueueTest {
  2. private static DelayQueue delayQueue = new DelayQueue();
  3. public static void main(String[] args) throws InterruptedException {
  4. new Thread(new Runnable() {
  5. @Override
  6. public void run() {
  7. delayQueue.offer(new MyDelayedTask("task1",10000));
  8. delayQueue.offer(new MyDelayedTask("task2",3900));
  9. delayQueue.offer(new MyDelayedTask("task3",1900));
  10. delayQueue.offer(new MyDelayedTask("task4",5900));
  11. delayQueue.offer(new MyDelayedTask("task5",6900));
  12. delayQueue.offer(new MyDelayedTask("task6",7900));
  13. delayQueue.offer(new MyDelayedTask("task7",4900));
  14. }
  15. }).start();
  16. while (true) {
  17. //take(),检索并删除该队列的头部,如果需要,等待直到该队列上有一个过期延迟的元素可用。
  18. Delayed take = delayQueue.take();
  19. System.out.println(take);
  20. }
  21. }
  22. }
  23. /**
  24. * compareTo 方法必须提供与 getDelay 方法一致的排序
  25. */
  26. class MyDelayedTask implements Delayed{
  27. private String name ;
  28. private long start = System.currentTimeMillis();
  29. private long time ;
  30. public MyDelayedTask(String name,long time) {
  31. this.name = name;
  32. this.time = time;
  33. }
  34. /**
  35. * 需要实现的接口,获得延迟时间 用过期时间-当前时间
  36. * @param unit
  37. * @return
  38. */
  39. @Override
  40. public long getDelay(TimeUnit unit) {
  41. return unit.convert((start+time) - System.currentTimeMillis(),TimeUnit.MILLISECONDS);
  42. }
  43. /**
  44. * 用于延迟队列内部比较排序 当前时间的延迟时间 - 比较对象的延迟时间
  45. * @param o
  46. * @return
  47. */
  48. @Override
  49. public int compareTo(Delayed o) {
  50. MyDelayedTask o1 = (MyDelayedTask) o;
  51. return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
  52. }
  53. @Override
  54. public String toString() {
  55. return "MyDelayedTask{" +
  56. "name='" + name + '\'' +
  57. ", time=" + time +
  58. '}';
  59. }
  60. }
  61. 输出:
  62. MyDelayedTask{name='task3', time=1900}
  63. MyDelayedTask{name='task2', time=3900}
  64. MyDelayedTask{name='task7', time=4900}
  65. MyDelayedTask{name='task4', time=5900}
  66. MyDelayedTask{name='task5', time=6900}
  67. MyDelayedTask{name='task6', time=7900}
  68. MyDelayedTask{name='task1', time=10000}
  1. class DelayedTask implements Runnable, Delayed {
  2. private static int counter = 0;
  3. private final int id = counter++;
  4. private final int delta;
  5. private final long trigger;
  6. //存放延时任务
  7. protected static List<DelayedTask> sequence = new ArrayList<DelayedTask>();
  8. public DelayedTask(int delayInMilliseconds) {
  9. delta = delayInMilliseconds;
  10. //设置触发时间
  11. trigger = System.nanoTime() + NANOSECONDS.convert(delta, MILLISECONDS);
  12. sequence.add(this);
  13. }
  14. //获得多久后触发
  15. public long getDelay(TimeUnit unit) {
  16. return unit.convert(trigger - System.nanoTime(), NANOSECONDS);
  17. }
  18. //比较等待时间
  19. public int compareTo(Delayed arg) {
  20. DelayedTask that = (DelayedTask) arg;
  21. if (trigger < that.trigger) return -1;
  22. if (trigger > that.trigger) return 1;
  23. return 0;
  24. }
  25. public void run() {
  26. print(this + " ");
  27. }
  28. public String toString() {
  29. return String.format("[%1$-4d]", delta) + " Task " + id;
  30. }
  31. public String summary() {
  32. return "(" + id + ":" + delta + ")";
  33. }
  34. //末尾哨兵
  35. public static class EndSentinel extends DelayedTask {
  36. private ExecutorService exec;
  37. public EndSentinel(int delay, ExecutorService e) {
  38. super(delay);
  39. exec = e;
  40. }
  41. //因为是哨兵要确保是最后一个执行的 将打印前面所有任务的信息
  42. public void run() {
  43. for (DelayedTask pt : sequence) {
  44. print(pt.summary() + " ");
  45. }
  46. print();
  47. print(this + " Calling shutdownNow()");
  48. exec.shutdownNow();
  49. }
  50. }
  51. }
  52. class DelayedTaskConsumer implements Runnable {
  53. private DelayQueue<DelayedTask> q;
  54. public DelayedTaskConsumer(DelayQueue<DelayedTask> q) {
  55. this.q = q;
  56. }
  57. public void run() {
  58. try {
  59. while (!Thread.interrupted())
  60. //此处必须执行完任务才会去取下一个
  61. q.take().run(); // Run task with the current thread
  62. } catch (InterruptedException e) {
  63. // Acceptable way to exit
  64. }
  65. print("Finished DelayedTaskConsumer");
  66. }
  67. }
  68. public class DelayQueueDemo {
  69. public static void main(String[] args) {
  70. Random rand = new Random(47);
  71. ExecutorService exec = Executors.newCachedThreadPool();
  72. DelayQueue<DelayedTask> queue = new DelayQueue<DelayedTask>();
  73. // Fill with tasks that have random delays:
  74. //放20个随机触发 5秒延时时间
  75. //DelayQueue底层是数组 按照二分查找进行排序
  76. for (int i = 0; i < 20; i++)
  77. queue.put(new DelayedTask(rand.nextInt(5000)));
  78. // Set the stopping point
  79. queue.add(new DelayedTask.EndSentinel(5000, exec));
  80. exec.execute(new DelayedTaskConsumer(queue));
  81. try {
  82. SECONDS.sleep(3);
  83. } catch (InterruptedException e) {
  84. e.printStackTrace();
  85. }
  86. //System.out.println(2278);
  87. //queue.add(new DelayedTask(10));
  88. }
  89. }

此代码需留意哨兵机制,如何安全的确保所有任务执行完毕后关闭线程池

性能调优

优先使用synchronized关键字入手,只有在性能调优时才替换为Lock对象这种做法.