背景

并行计算只有在图像处理和服务端编程两个领域可以使用

重要概念

线程安全的概念

当多个线程访问某个类时,不管运行时环境采用何种调度方式或者这些线程将如何交替执行,并且在主调代码中不需要任何额外的同步或协同,这个类都能表现出正确的行为,那么就称这个类是线程安全的。
解决线程安全性的三种方法:

  • 不在线程之间共享该状态变量
  • 将状态变量修改为不可变的变量
  • 在访问状态变量时使用同步

    无状态

    1. public class StatelessFactorizer implements Servlet {
    2. public void service(ServletRequest req,ServletResponse resp){
    3. BigInteger i = extractFromRequest(req);
    4. BigInteger[] factors = factor(i);
    5. encodeIntoResponse(resp,factors);
    6. }
    7. }
    StatelessFactorizer是无状态的,它既不包含任何域,也不包含任何对其他类中域的引用。计算过程中的临时状态仅存在于线程栈上的局部变量中,并且只能由正在执行的线程访问。访问StatelessFactorizer的线程不会影响另一个访问同一个StatelessFactorizer的线程的计算结果,因为这两个线程并没有共享状态,就好像他们都在访问不同的实例。由于线程访问无状态对象的行为并不会影响其他线程中操作的正确性,因此无状态对象是线程安全的。

同步和异步

同步:同步方法调用一旦开始,调用者必须等到方法调用返回后,才能继续后续的行为。
异步:异步方法调用更像一个消息传递,一旦开始,方法调用就会立即返回,调用者就可以继续后续的操作。

并发和并行

并发和并行是两个非常容易被混淆的概念。他们都可以表示两个或者多个任务一起执行,但是偏重点有些不同。并发偏重于多个任务交替执行,而多个任务之间有可能还是串行。而并行是真正意义上的“同时执行”。
image.png
如果系统内只有一个CPU,而使用多线程或者多线程任务,那么真实环境中这些任务不可能是真实并行的,毕竟一个CPU一次只能执行一条指令,这种情况下多线程就是并发,而不是并行。真实的并行也只可能出现在拥有多个CPU的系统中。

临界区

临界区用来表示一种公共资源或者说是共享数据,可以被多个线程使用。但是每一次,只能由一个线程使用它,一旦临界区资源被占用,其他线程要想使用这个资源,就必须等待。

阻塞和非阻塞

阻塞和非阻塞通常用来形容多线程间的相互影响。比如一个线程占用了临界区资源,那么其他所有需要这个资源的线程就必须在这个临界区中进行等待。等待会导致线程挂起,这种情况就是阻塞。
非阻塞强调没有一个线程可以妨碍其他线程执行。所有线程都会尝试不断向前执行。

死锁,饥饿和活锁

死锁

image.png

饥饿

饥饿是指某一个或者多个线程因为种种原因无法获得所需要的资源,导致一直无法执行。比如它的线程优先级可能太低。

活锁

如果线程的智力不够,且都秉持着“谦让”的原则,主动将资源释放给他人使用,那么就会出现资源不断在两个线程中跳动,而没有一个线程可以同时拿到所有资源而正常执行

Amdahl定律

根据Amdahl定律,使用多核CPU对系统进行优化,优化的效果取决于CPU的数量以及系统中的串行化程序的比重。CPU数量越多,串行化比重越低,则优化效果越好。仅提高CPU数量而不降低程序的串行化比重,也无法提高系统性能。

java内存模型

原子性

原子性是指一个操作时不可中断的。即使是在多个线程一起执行的时候,一个操作一旦开始,就不会被其他线程干扰。
比如,对于一个静态全局变量int i,两个线程同时对它赋值,线程A给他赋值1,线程B给它赋值为-1。那么不管这2个线程以何种方法、何种步调工作,i的值要么是1,要么是-1。线程A和线程B之间是没有干扰的。

可见性

可见性是指当一个线程修改了某一个共享变量的值,其他线程是否能够立即直到这个修改。
显然,对于串行程序来说,可见性问题是不存在的。因为你在任何一个操作步骤中修改了某个变量,那么在后续的步骤中,读取这个变量的值,一定是修改后的新值。
但是这个问题在并行程序中就不见得了。如果一个线程修改了某一个全局变量,那么其他线程未必可以马上直到这个改动。如果再CPU1和CPU2上各运行了一个线程,它们共享变量t,由于编译器优化或者硬件优化的缘故,在CPU1上的线程将变量t进行了优化,将其缓存在cache中或者寄存器里。在这种情况下,如果在CPU2上的某个线程修改了变量t的实际值,那么CPU1上的线程可能并无法意识到这个改动,依然会读取cache中或者寄存器里的数据。因此,就产生了可见性问题。
image.png

有序性

在并发时,程序的执行可能就会出现乱序。给人直观的感觉就是:写在前面的代码,会在后面执行。有序性问题的原因时因为程序在执行时,可能会进行指令重排,重排后的指令于原指令的顺序未必一致。
为什么要指令重排?为了尽量少的中断流水线

进程和线程

操作系统在分配资源时是把资源分配给进程的,但是CPU资源比较特殊,它是被分配到线程的,因为真正要占用CPU运行的是线程。
image.png
一个进程中有多个线程,多个线程共享进程的堆和方法区资源,但是每个线程有自己的程序计数器和栈区域。

堆是一个进程中最大的一块内存,堆是被进程中的所有线程共享的,是进程创建时分配的,堆里面主要存放使用new操作创建的对象实例。

方法区则用来存放JVM加载的类、常量及静态变量等信息,也是线程共享的。

线程就是轻量级进程,是程序执行的最小单位。使用多线程而不是用多进程去进行并发程序的设计,是因为线程间的切换和调度的成本远远小于进程。

线程生命周期

image.png

线程的基本操作

启动线程

  1. Thread t1 = new Thread(){
  2. @Override
  3. public void run() {
  4. System.out.println(Thread.currentThread().getName());
  5. }
  6. };
  7. t1.start();
  8. // 输出:Thread-0

不要用run来开启线程,它只会在当前线程中,串行执行run()中的代码

  1. Thread t1 = new Thread(){
  2. @Override
  3. public void run() {
  4. System.out.println(Thread.currentThread().getName());
  5. }
  6. };
  7. t1.start();
  8. t1.run();
  9. System.out.println(Thread.currentThread().getName());
  10. // 输出:
  11. main
  12. main
  13. Thread-0

Thread类中run的实现:

  1. private Runnable target;
  2. public void run() {
  3. if (target != null) {
  4. target.run();
  5. }
  6. }

run就是个一般的方法,执行run()方法就像执行一般的方法一样,在main线程中执行。
向Thread传递Runnable接口的实现类,用Runnable接口来告诉线程该做什么

  1. public static void main(String[] args){
  2. Thread t1 = new Thread(new CreateThread());
  3. t1.start();
  4. }
  5. public static class CreateThread implements Runnable {
  6. @Override
  7. public void run() {
  8. System.out.println(Thread.currentThread().getName());
  9. }
  10. }

终止线程

Thread提供了一个stop方法,但是stop方法被标注为废弃。原因是stop方法太过于暴力,强行把执行到一般的线程终止,可能会引起一些数据不一致的问题。Thread.stop()方法在结束线程时,会直接终止线程,并且会立即释放这个线程所持有的锁,而这些锁恰恰是用来维持对象的一致性的。

  1. public class StopThreadUnsafe {
  2. public static User u = new User();
  3. public static class User{
  4. private int id;
  5. private String name;
  6. public User(){
  7. id = 0;
  8. name = "0";
  9. }
  10. public int getId() {
  11. return id;
  12. }
  13. public void setId(int id) {
  14. this.id = id;
  15. }
  16. public String getName() {
  17. return name;
  18. }
  19. public void setName(String name) {
  20. this.name = name;
  21. }
  22. @Override
  23. public String toString() {
  24. return "User{" +
  25. "id=" + id +
  26. ", name='" + name + '\'' +
  27. '}';
  28. }
  29. }
  30. // 赋值线程
  31. public static class ChangeObjectThread extends Thread{
  32. @Override
  33. public void run(){
  34. while(true){
  35. synchronized (u){
  36. // id设为v
  37. int v = (int)(System.currentTimeMillis() / 1000);
  38. u.setId(v);
  39. try {
  40. Thread.sleep(100);
  41. } catch (InterruptedException e) {
  42. e.printStackTrace();
  43. }
  44. // name设为v的string类型
  45. u.setName(String.valueOf(v));
  46. }
  47. Thread.yield();
  48. }
  49. }
  50. }
  51. // 检查线程
  52. public static class ReadObjectThread extends Thread{
  53. @Override
  54. public void run() {
  55. while(true){
  56. synchronized (u){
  57. // 当发现id和name不符的情况就打印
  58. if(u.getId() != Integer.parseInt(u.getName())){
  59. System.out.println(u.toString());
  60. }
  61. }
  62. Thread.yield();
  63. }
  64. }
  65. }
  66. public static void main(String[] args) throws InterruptedException {
  67. new ReadObjectThread().start();
  68. while(true){
  69. Thread t = new ChangeObjectThread();
  70. t.start();
  71. Thread.sleep(150);
  72. t.stop();
  73. }
  74. }
  75. }

当赋值线程赋值id后,在sleep的时候被stop了就会导致id和name的不符。
稍微改动一下就能得到一个安全的可以停止的实现

  1. public class StopThreadUnsafe {
  2. public static User u = new User();
  3. public static class User{
  4. private int id;
  5. private String name;
  6. public User(){
  7. id = 0;
  8. name = "0";
  9. }
  10. public int getId() {
  11. return id;
  12. }
  13. public void setId(int id) {
  14. this.id = id;
  15. }
  16. public String getName() {
  17. return name;
  18. }
  19. public void setName(String name) {
  20. this.name = name;
  21. }
  22. @Override
  23. public String toString() {
  24. return "User{" +
  25. "id=" + id +
  26. ", name='" + name + '\'' +
  27. '}';
  28. }
  29. }
  30. public static class ChangeObjectThread extends Thread{
  31. volatile boolean stopme = false;
  32. public void stopMe(){
  33. stopme = true;
  34. }
  35. @Override
  36. public void run(){
  37. while(true){
  38. if(stopme){
  39. System.out.println("stop");
  40. break;
  41. }
  42. synchronized (u){
  43. int v = (int)(System.currentTimeMillis() / 1000);
  44. u.setId(v);
  45. try {
  46. Thread.sleep(100);
  47. } catch (InterruptedException e) {
  48. e.printStackTrace();
  49. }
  50. u.setName(String.valueOf(v));
  51. }
  52. Thread.yield();
  53. }
  54. }
  55. }
  56. public static class ReadObjectThread extends Thread{
  57. @Override
  58. public void run() {
  59. while(true){
  60. synchronized (u){
  61. if(u.getId() != Integer.parseInt(u.getName())){
  62. System.out.println(u.toString());
  63. }
  64. }
  65. Thread.yield();
  66. }
  67. }
  68. }
  69. public static void main(String[] args) throws InterruptedException {
  70. new ReadObjectThread().start();
  71. while(true){
  72. ChangeObjectThread t = new ChangeObjectThread();
  73. t.start();
  74. Thread.sleep(150);
  75. t.stopMe();
  76. }
  77. }
  78. }

线程中断

线程中断并不会使线程立即退出,而是给线程发送一个通知,告知目标线程,有人希望你退出。至于目标线程接到通知后如何处理,则完全由目标线程自行决定。

  1. public void Thread.interrupt() // 中断线程,设置中断标志位
  2. public boolean Thread.isInterrupted() // 判断是否被中断
  3. public static boolean Thread.interrupted() // 判断是否被中断,并清除当前中断状态
  1. Thread t1 = new Thread(){
  2. @Override
  3. public void run() {
  4. while(true){
  5. if(Thread.currentThread().isInterrupted()){
  6. System.out.println("中断");
  7. break;
  8. }
  9. System.out.println("我正在"+Thread.currentThread().getName()+"中运行");
  10. }
  11. }
  12. };
  13. t1.start();
  14. Thread.sleep(100);
  15. t1.interrupt();

这看上去和终止线程中的stopme很像但是stopme碰到sleep就无效了

  1. public class StopThreadUnsafe {
  2. public static User u = new User();
  3. public static class User{
  4. private int id;
  5. private String name;
  6. public User(){
  7. id = 0;
  8. name = "0";
  9. }
  10. public int getId() {
  11. return id;
  12. }
  13. public void setId(int id) {
  14. this.id = id;
  15. }
  16. public String getName() {
  17. return name;
  18. }
  19. public void setName(String name) {
  20. this.name = name;
  21. }
  22. @Override
  23. public String toString() {
  24. return "User{" +
  25. "id=" + id +
  26. ", name='" + name + '\'' +
  27. '}';
  28. }
  29. }
  30. public static class ChangeObjectThread extends Thread{
  31. volatile boolean stopme = false;
  32. public void stopMe(){
  33. stopme = true;
  34. }
  35. @Override
  36. public void run(){
  37. while(true){
  38. if(stopme){
  39. System.out.println("stop");
  40. break;
  41. }
  42. synchronized (u){
  43. int v = (int)(System.currentTimeMillis() / 1000);
  44. u.setId(v);
  45. try {
  46. System.out.println("开始睡眠");
  47. Thread.sleep(1000);
  48. System.out.println("睡眠结束");
  49. } catch (InterruptedException e) {
  50. e.printStackTrace();
  51. }
  52. u.setName(String.valueOf(v));
  53. }
  54. Thread.yield();
  55. }
  56. }
  57. }
  58. public static class ReadObjectThread extends Thread{
  59. @Override
  60. public void run() {
  61. while(true){
  62. synchronized (u){
  63. if(u.getId() != Integer.parseInt(u.getName())){
  64. System.out.println(u.toString());
  65. }
  66. }
  67. Thread.yield();
  68. }
  69. }
  70. }
  71. public static void main(String[] args) throws InterruptedException {
  72. new ReadObjectThread().start();
  73. ChangeObjectThread t = new ChangeObjectThread();
  74. t.start();
  75. Thread.sleep(150);
  76. System.out.println("执行停止");
  77. t.stopMe();
  78. }
  79. }

输出:
开始睡眠
执行停止
睡眠结束
stop

stopme没办法让睡眠的进程停止睡眠,这是stopme的一个问题;
所以如果出现类似于wait()或者sleep()就只能通过中断来识别
Thread.sleep函数的签名:

  1. public static native void sleep(long millis) throws InterruptedException

Thread.sleep方法会让当前线程休眠若干时间,它会抛出一个InterruptedException中断异常。InterruptedException不是运行时异常,也就是说程序必须捕获并且处理它,当线程在sleep()休眠时,如果被中断,这个异常就会产生。

  1. Thread t1 = new Thread(){
  2. @Override
  3. public void run() {
  4. while(true){
  5. if(Thread.currentThread().isInterrupted()){
  6. System.out.println("中断");
  7. break;
  8. }
  9. try{
  10. System.out.println("开始sleep");
  11. Thread.sleep(2000);
  12. System.out.println("sleep结束");
  13. } catch (InterruptedException e) {
  14. System.out.println("sleep中发生中断");
  15. Thread.currentThread().interrupt();
  16. }
  17. System.out.println("我正在"+Thread.currentThread().getName()+"中运行");
  18. }
  19. }
  20. };
  21. t1.start();
  22. Thread.sleep(100);
  23. t1.interrupt();

在捕获了InterruptedException异常后,我们可以立即退出线程。但是也许代码中,我们还必须进行后续的处理,保证数据的一致性和完整性。Thread.sleep方法由于中断而抛出异常,此时,它会清除中断标记,所以需要在catch语句中执行Thread.interrupt()方法再次中断自己,置上中断标记位。只有这么做,在前面的中断检查中,才能发现当前线程已经被中断了。

挂起(suspend)和继续执行(resume)

被挂起的线程,必须等到resume之后才能继续运行。不推荐使用suspend去挂起线程的原因,是因为suspend在导致线程暂停的同时,并不会去释放任何锁资源。此时,其他任何线程想要访问被它暂用的锁时,都会被牵连,导致无法正常继续运行。直到对应的线程上进行了resume操作,被挂起的线程才能继续,从而其他所有阻塞在相关锁上的线程也可以继续执行。但是,如果resume操作意外地在suspend前就执行了,那么被挂起的线程可能很难有机会被继续执行。并且,更严重的是:它所占用的锁不会被释放,因此可能会导致整个系统工作不正常。

  1. public class BadSuspend {
  2. public static Object u = new Object();
  3. public static class ChangeObjectThread extends Thread{
  4. public ChangeObjectThread(String name){
  5. super.setName(name);
  6. }
  7. @Override
  8. public void run() {
  9. synchronized (u){
  10. System.out.println(Thread.currentThread().getName()+" in");
  11. Thread.currentThread().suspend();
  12. System.out.println(Thread.currentThread().getName()+" out");
  13. }
  14. }
  15. }
  16. public static void main(String[] args) throws InterruptedException {
  17. ChangeObjectThread t1 = new ChangeObjectThread("t1");
  18. ChangeObjectThread t2 = new ChangeObjectThread("t2");
  19. t1.start();
  20. t2.start();
  21. System.out.println("t1 resume");
  22. System.out.println("t2 resume");
  23. t1.resume();
  24. t2.resume();
  25. t1.join();
  26. t2.join();
  27. }
  28. }

等待(wait)和通知(notify)

当在一个对象实例上调用wait()方法后,当前线程就会在这个对象上等待。比如,线程A中,调用obj.wait()方法,那么线程A就会停止继续执行,而转为等待状态。直到其他线程调用obj.notify()方法为止,或者其他线程调用了该线程的interrupt()方法,该线程抛出InterruptedException异常返回。这时,obj对象就俨然成为多个线程之间的有效通信手段。
如果一个线程调用了object.wait(),那么它就会进入object对象的等待队列。这个等待队列中,可能会有多个线程,因为系统运行多个线程同时等待某一个对象。当object.notify()被调用时,它就会从这个等待队列中,随机选择一个线程,并将其唤醒。
Object.wait()方法并不是可以随便调用的。它必须包含在对应得syncharonized语句中,无论是wait()或者notify()都需要首先获得目标对象的一个监视器。否则会抛出IllegalMonitorStateException异常。

image.png

  1. public class SimpleWN {
  2. final static Object object = new Object();
  3. public static class T1 extends Thread{
  4. @Override
  5. public void run() {
  6. synchronized (object){
  7. System.out.println(System.currentTimeMillis()+":T1 start!");
  8. try{
  9. System.out.println(System.currentTimeMillis()+":T1 wait for object");
  10. object.wait();
  11. } catch (InterruptedException e) {
  12. e.printStackTrace();
  13. }
  14. System.out.println(System.currentTimeMillis()+":T1 end!");
  15. }
  16. }
  17. }
  18. public static class T2 extends Thread{
  19. @Override
  20. public void run() {
  21. synchronized (object){
  22. System.out.println(System.currentTimeMillis()+":T2 start!");
  23. object.notify();
  24. System.out.println(System.currentTimeMillis()+"T2 end!");
  25. try{
  26. System.out.println("T2开始睡眠");
  27. Thread.sleep(2000);
  28. System.out.println("T2睡眠结束");
  29. } catch (InterruptedException e) {
  30. e.printStackTrace();
  31. }
  32. }
  33. }
  34. }
  35. public static void main(String[] args) {
  36. Thread t1 = new T1();
  37. Thread t2 = new T2();
  38. t1.start();
  39. t2.start();
  40. }
  41. }

输出:
1627977599745:T1 start!
1627977599745:T1 wait for object
1627977599745:T2 start!
1627977599745T2 end!
T2开始睡眠
T2睡眠结束
1627977601747:T1 end!

T2notify()之后,T1并没有立刻继续执行,而是等到T2睡眠完之后释放锁之后才能T1继续执行

线程睡眠sleep方法

当一个执行中的线程调用了Thread的sleep方法后,调用线程会暂时让出指定时间的执行权,也就是在这期间不参与CPU的调度,但是该线程锁拥有的监视器资源,比如锁还是持有不让出。如果在睡眠期间其他线程调用了该线程的interrupt()方法中断了该线程,则该线程会在调用sleep方法的地方抛出InterruptedException异常而返回。

等待线程结束(join)和谦让(yield)

一个线程的输入可能非常依赖于另外一个或者多个线程的输出,此时,这个线程就需要等待依赖线程执行完毕,才能继续执行。
JDK提供了join()操作来实现这个功能:

  1. public final void join() throws InterruptedException
  2. public final synchronized void join(long millis) throws InterruptedException

join()的本质是让调用线程wait()在当前线程对象实例上,下面是JDK中join()实现的核心代码片段:

  1. while (isAlive()) {
  2. wait(0);
  3. }

可以看到join让调用线程在当前线程对象上进行等待。当线程执行完成后,被等待的线程会在退出前调用notifyAll()通知所有等待线程继续执行。

Thread.yield():

  1. public static native void yield();

这是一个静态方法,一旦执行,它会使当前线程让出CPU。让出CPU不表示当前线程不执行了。当前线程在让出CPU后,还会进行CPU资源的争夺。

volatile

当你用volatile去申明一个变量时,就等于告诉了虚拟机,这个变量极有可能会被某些程序或者线程修改。为了确保这个变量被修改后,应用程序范围内的所有线程能够“看到”这个改动,虚拟机就必须采用一些特殊的手段,保证这个变量的可见性等特点。

volatile是轻量级的synchronized,它比synchronized的使用和执行成本更低,因为它不会引起线程上下文的切换和调度。

volatile并不能真正的保证线程安全,当两个线程同时修改某一个数据时,却依然会产生线程不安全。

原理

image.png

  • 将当前处理器缓存行的数据写回到系统内存
  • 这个写回内存的操作会使在其他CPU里缓存了该内存地址的数据无效

为了提高处理速度,处理器不直接和内存进行通信,而是先将系统内存的数据读到内部缓存后再进行操作,但操作完不知道何时会写到内存。如果对声明了volatile的变量进行写操作,JVM就会向处理器发送一条Lock前缀的指令,将这个变量所在缓存行的数据写回到系统内存。但是,就算写回到内存,如果其他处理器缓存的值还是旧的,再执行计算操作就会有问题。所以,在多处理器下,为了保证各个处理器的缓存是一致的,就会实现缓存一致性协议,每个处理器通过嗅探在总线上传播的数据来检查自己缓存的值是不是过期了,当处理器发现自己缓存行对应得内存地址被修改,就会将当前处理器的缓存行设置成无效状态,当处理器对这个数据进行修改操作的时候,会重新从系统内存中把数据读到处理器缓存里。

synchronized

synchronized用法:

  • 指定加锁对象:对给定对象加锁,进入同步代码前要获得给定对象的锁
  • 直接作用于实例方法:相当于对当前实例加锁,进入同步代码前要获得当前实例的锁
  • 直接作用于静态方法:相当于对当前类加锁,进入同步代码前要获得当前类的锁

指定加锁对象

对给定对象加锁,进入同步代码前要获得给定对象的锁

  1. public class AccountingSync implements Runnable {
  2. static AccountingSync instance = new AccountingSync();
  3. static int i = 0;
  4. @Override
  5. public void run() {
  6. for(int j = 0;j < 10000000;j++){
  7. synchronized (instance){
  8. i++;
  9. }
  10. }
  11. }
  12. public static void main(String[] args) throws InterruptedException {
  13. Thread t1 = new Thread(instance);
  14. Thread t2 = new Thread(instance);
  15. t1.start();
  16. t2.start();
  17. t1.join();
  18. t2.join();
  19. System.out.println(i);
  20. }
  21. }

synchronized作用于一个给定对象instance,因此,每次当线程进入被synchronized包裹的代码段,就都会要求请求instance实例的锁。如果当前有其他线程正持有这把锁,那么新到的线程就必须等待。这样,就保证了每次只能有一个线程执行i++操作。

直接作用于实例方法

相当于对当前实例加锁,进入同步代码前要获得当前实例的锁

  1. public class AccountingSync implements Runnable {
  2. static AccountingSync instance = new AccountingSync();
  3. static int i = 0;
  4. public synchronized void increase(){
  5. i++;
  6. }
  7. @Override
  8. public void run() {
  9. for(int j = 0;j < 10000000;j++){
  10. increase();
  11. }
  12. }
  13. public static void main(String[] args) throws InterruptedException {
  14. Thread t1 = new Thread(instance);
  15. Thread t2 = new Thread(instance);
  16. t1.start();
  17. t2.start();
  18. t1.join();
  19. t2.join();
  20. System.out.println(i);
  21. }
  22. }

直接作用于静态方法

相当于对当前类加锁,进入同步代码前要获得当前类的锁

  1. public class AccountingSync implements Runnable {
  2. static AccountingSync instance = new AccountingSync();
  3. static int i = 0;
  4. public static synchronized void increase(){
  5. i++;
  6. }
  7. @Override
  8. public void run() {
  9. for(int j = 0;j < 10000000;j++){
  10. increase();
  11. }
  12. }
  13. public static void main(String[] args) throws InterruptedException {
  14. Thread t1 = new Thread(new AccountingSync());
  15. Thread t2 = new Thread(new AccountingSync());
  16. t1.start();
  17. t2.start();
  18. t1.join();
  19. t2.join();
  20. System.out.println(i);
  21. }
  22. }

即使两个线程指向不同的Runnable对象,但由于方法块需要请求的是当前类的锁,而非当前实例,因此,线程间还是可以正确同步。

守护线程(Daemon)

和守护线程相对应得是用户线程,用户线程可以认为是系统的工作线程,他会完成这个程序应该要完成的业务操作。当用户线程结束了,那么守护线程要守护的对象就不存在了,那么守护线程也应该结束。

  1. public class DaemonDemo {
  2. public static class DaemonT extends Thread{
  3. @Override
  4. public void run() {
  5. while(true){
  6. System.out.println("I am alive");
  7. try {
  8. Thread.sleep(1000);
  9. } catch (InterruptedException e) {
  10. e.printStackTrace();
  11. }
  12. }
  13. }
  14. }
  15. public static void main(String[] args) throws InterruptedException {
  16. Thread t= new DaemonT();
  17. t.setDaemon(true);
  18. t.start();
  19. Thread.sleep(2000);
  20. }
  21. }

设置守护线程必须在线程start()之前设置

ReentrantLock重入锁

  1. public class ReenterLock implements Runnable {
  2. public static ReentrantLock lock = new ReentrantLock();
  3. public static int i = 0;
  4. @Override
  5. public void run() {
  6. for(int j = 0;j < 10000000;j++){
  7. lock.lock();
  8. try{
  9. i++;
  10. }finally {
  11. lock.unlock();
  12. }
  13. }
  14. }
  15. public static void main(String[] args) throws InterruptedException {
  16. ReenterLock l = new ReenterLock();
  17. Thread t1 = new Thread(l);
  18. Thread t2 = new Thread(l);
  19. t1.start();
  20. t2.start();
  21. t1.join();
  22. t2.join();
  23. System.out.println(i);
  24. }
  25. }

与synchronized相比,重入锁有着显示的操作过程。
重入锁对逻辑控制的灵活性要远远好于synchronized,但是值得注意的是,在退出临界区时,必须记得释放锁,否则其他线程就没有机会再访问临界区。
重入锁可以反复进入,这里的反复也只针对一个线程。

  1. public class ReenterLock implements Runnable {
  2. public static ReentrantLock lock = new ReentrantLock();
  3. public static int i = 0;
  4. @Override
  5. public void run() {
  6. for(int j = 0;j < 10000000;j++){
  7. lock.lock();
  8. lock.lock();
  9. try{
  10. i++;
  11. }finally {
  12. lock.unlock();
  13. lock.unlock();
  14. }
  15. }
  16. }
  17. public static void main(String[] args) throws InterruptedException {
  18. ReenterLock l = new ReenterLock();
  19. Thread t1 = new Thread(l);
  20. Thread t2 = new Thread(l);
  21. t1.start();
  22. t2.start();
  23. t1.join();
  24. t2.join();
  25. System.out.println(i);
  26. }
  27. }

中断响应

重入锁的lockInterruptibly()方法可以再等待锁的过程中,可以根据需要取消对锁的请求。

  1. public class IntLock implements Runnable {
  2. public static ReentrantLock lock1 = new ReentrantLock();
  3. public static ReentrantLock lock2 = new ReentrantLock();
  4. int lock;
  5. public IntLock(int lock) {
  6. this.lock = lock;
  7. }
  8. @Override
  9. public void run() {
  10. try{
  11. if (lock == 1) {
  12. lock1.lockInterruptibly();
  13. try {
  14. Thread.sleep(500);
  15. } catch (InterruptedException e) {}
  16. lock2.lockInterruptibly();
  17. }else{
  18. lock2.lockInterruptibly();
  19. try{
  20. Thread.sleep(500);
  21. }catch (InterruptedException e) {}
  22. lock1.lockInterruptibly();
  23. }
  24. }catch (InterruptedException e){
  25. e.printStackTrace();
  26. }finally {
  27. if(lock1.isHeldByCurrentThread()){
  28. lock1.unlock();
  29. }
  30. if(lock2.isHeldByCurrentThread()){
  31. lock2.unlock();
  32. }
  33. System.out.println(Thread.currentThread().getName()+":线程退出");
  34. }
  35. }
  36. public static void main(String[] args) throws InterruptedException {
  37. IntLock r1 = new IntLock(1);
  38. IntLock r2 = new IntLock(2);
  39. Thread t1 = new Thread(r1);
  40. Thread t2 = new Thread(r2);
  41. t1.start();
  42. t2.start();
  43. Thread.sleep(1000);
  44. t2.interrupt();
  45. }
  46. }

t1请求lock1,t2请求lock2,本来会形成死锁,但是t2中断了,t1就可以完成退出了。

锁申请等待限时

要避免死锁,除了外部通知形式的中断响应之外,还有一种方法就是限时等待。我们可以使用tryLock()方法进行一次限时的等待。

  1. public class TryLock implements Runnable {
  2. public static ReentrantLock lock1 = new ReentrantLock();
  3. public static ReentrantLock lock2 = new ReentrantLock();
  4. int lock;
  5. public TryLock(int lock){
  6. this.lock = lock;
  7. }
  8. @Override
  9. public void run() {
  10. if(lock == 1){
  11. while(true){
  12. System.out.println(Thread.currentThread().getName()+"尝试获得lock1");
  13. if(lock1.tryLock()){
  14. System.out.println(Thread.currentThread().getName()+"获得lock1");
  15. try{
  16. try{
  17. Thread.sleep(500);
  18. }catch (InterruptedException e){}
  19. System.out.println(Thread.currentThread().getName()+"尝试获得lock2");
  20. if(lock2.tryLock()){
  21. System.out.println(Thread.currentThread().getName()+"获得lock2");
  22. try{
  23. System.out.println(Thread.currentThread().getName()+"任务结束");
  24. return;
  25. }finally {
  26. lock2.unlock();
  27. }
  28. }
  29. } finally {
  30. lock1.unlock();
  31. }
  32. }
  33. }
  34. }else{
  35. while(true){
  36. System.out.println(Thread.currentThread().getName()+"尝试获得lock2");
  37. if(lock2.tryLock()){
  38. System.out.println(Thread.currentThread().getName()+"获得lock2");
  39. try{
  40. try {
  41. Thread.sleep(500);
  42. } catch (InterruptedException e) {}
  43. System.out.println(Thread.currentThread().getName()+"尝试获得lock1");
  44. if(lock1.tryLock()){
  45. System.out.println(Thread.currentThread().getName()+"获得lock1");
  46. try{
  47. System.out.println(Thread.currentThread().getName()+"任务结束");
  48. return;
  49. } finally{
  50. lock1.unlock();
  51. }
  52. }
  53. }finally {
  54. lock2.unlock();
  55. }
  56. }
  57. }
  58. }
  59. }
  60. public static void main(String[] args) {
  61. TryLock r1 = new TryLock(1);
  62. TryLock r2 = new TryLock(2);
  63. Thread t1 = new Thread(r1);
  64. Thread t2 = new Thread(r2);
  65. t1.start();
  66. t2.start();
  67. }
  68. }

t1会获取lock1,t2会获取lock2.t1会尝试获取lock2,如果失败了就放弃lock1进入下一个循环。此时可能t2正巧尝试获取lock1,正好t2结束了,此时死锁也解除了。

总结

  • lock():获得锁,如果锁已经被占用,则等待
  • lockInterruptibly():获得锁,但优先相应中断
  • tryLock():尝试获得锁,如果成功,返回true,失败返回false。该方法不等待,立即返回。
  • tryLock(long time,TimeUnit unit):在给定时间内尝试获得锁
  • unlock():释放锁

Condition条件

wait()和notify()方法是和synchronized合作使用,而Condition是与重入锁相关联。通过Lock接口的new Condition方法可以生成一个与当前重入锁绑定的Condition实例。利用Condition对象,我们就可以让线程在合适的时间等待,或者在某一个特定的时刻得到通知,继续执行。

  1. public class ReenterLockCondition implements Runnable {
  2. public static ReentrantLock lock = new ReentrantLock();
  3. public static Condition condition = lock.newCondition();
  4. @Override
  5. public void run() {
  6. try{
  7. lock.lock();
  8. System.out.println(Thread.currentThread().getName()+"获得锁");
  9. condition.await();
  10. System.out.println(Thread.currentThread().getName()+"正在运行");
  11. }catch (InterruptedException e) {
  12. e.printStackTrace();
  13. }finally {
  14. lock.unlock();
  15. System.out.println(Thread.currentThread().getName()+"解除锁");
  16. }
  17. }
  18. public static void main(String[] args) throws InterruptedException {
  19. ReenterLockCondition t1 = new ReenterLockCondition();
  20. ReenterLockCondition t2 = new ReenterLockCondition();
  21. Thread thread1 = new Thread(t1);
  22. Thread thread2 = new Thread(t2);
  23. thread1.start();
  24. thread2.start();
  25. Thread.sleep(2000);
  26. lock.lock();
  27. condition.signalAll();
  28. lock.unlock();
  29. }
  30. }

信号量(Semaphore)

信号量是对锁的拓展,无论是内部锁synchronized还是重入锁ReentrantLock,一次都只允许一个线程访问一个资源,而信号量却可以指定多个线程,同时访问某一个资源。
acquire()方法尝试获得一个准入的许可。若无法获得,则线程会等待,直到有线程释放一个许可或者当前线程被中断。release()用于在线程访问资源结束后,释放一个许可,以使其他等待许可的线程可以进行资源访问。

线程阻塞工具类:LockSupport

LockSupport是一个非常方便实用的线程阻塞工具,它可以在线程内任意位置让线程阻塞。和Thread.suspend相比,它弥补了由于resume在前发生,导致线程无法继续执行的情况。和Object.wait相比,它不需要先获得某个对象得锁,也不会抛出InterruptedException异常。

  1. public class BadSuspend {
  2. public static Object u = new Object();
  3. public static class ChangeObjectThread extends Thread{
  4. public ChangeObjectThread(String name){
  5. super.setName(name);
  6. }
  7. @Override
  8. public void run() {
  9. synchronized (u){
  10. System.out.println(Thread.currentThread().getName()+" in");
  11. LockSupport.park();
  12. System.out.println(Thread.currentThread().getName()+" out");
  13. }
  14. }
  15. }
  16. public static void main(String[] args) throws InterruptedException {
  17. ChangeObjectThread t1 = new ChangeObjectThread("t1");
  18. ChangeObjectThread t2 = new ChangeObjectThread("t2");
  19. t1.start();
  20. t2.start();
  21. LockSupport.unpark(t1);
  22. LockSupport.unpark(t2);
  23. t1.join();
  24. t2.join();
  25. }
  26. }

这里只是将原来得suspend和resume方法用park和unpark方法做了替换。当然,我们依然无法保证unpark方法发生在park方法之后。
由于LockSupport类使用类似信号量得机制,它为每一个线程准备了一个许可,如果许可可用,那么park函数会立即返回,并且消费这个许可,如果不可用,就会阻塞。
这个特点使得:即使unpark操作发生在park之前,它也可以使下一次得park操作立即返回。

LockSupport.park支持中断影响,但是不会抛出InterruptedException异常。它只会默默得返回。

  1. public class LockSupportIntDemo {
  2. public static Object u = new Object();
  3. public static class ChangeObjectThread extends Thread {
  4. public ChangeObjectThread(String name){
  5. super.setName(name);
  6. }
  7. @Override
  8. public void run() {
  9. synchronized (u){
  10. System.out.println("in "+getName());
  11. LockSupport.park();
  12. if(Thread.interrupted()){
  13. System.out.println(getName()+" 被中断了");
  14. }
  15. System.out.println(getName()+"do something");
  16. }
  17. System.out.println(getName()+"执行结束");
  18. }
  19. public static void main(String[] args) throws InterruptedException {
  20. ChangeObjectThread t1 = new ChangeObjectThread("t1");
  21. ChangeObjectThread t2 = new ChangeObjectThread("t2");
  22. t1.start();
  23. Thread.sleep(100);
  24. t2.start();
  25. t1.interrupt();
  26. LockSupport.unpark(t2);
  27. }
  28. }
  29. }

线程池

虽然与进程相比,线程是一种轻量级的工具,但其创建和关闭依然需要花费时间,如果为每一个小的任务都创建一个线程,很有可能出现创建和销毁线程所占用得时间大于该线程真实工作所消耗得时间。
为了避免系统频繁地创建和销毁线程,我们可以让创建得线程进行复用。

JDK Executor框架

image.png
Executor和ExecutorService抽象出线程池
Executors则扮演着线程池工厂得角色,获得ThreadPoolExecutor,ThreadPoolExecutor又继承了ExecutorService。

Executors

  1. // 返回一个固定线程数量的线程池
  2. public static ExecutorService newFixedThreadPool(int nThreads)
  3. // 返回一个只有一个线程的线程池
  4. public static ExecutorService newSingleThreadExecutor()
  5. // 返回一个可根据实际情况调整线程数量的线程池
  6. public static ExecutorService newCachedThreadPool()
  1. public static ExecutorService newFixedThreadPool(int nThreads) {
  2. return new ThreadPoolExecutor(nThreads, nThreads,
  3. 0L, TimeUnit.MILLISECONDS,
  4. new LinkedBlockingQueue<Runnable>());
  5. }
  6. public static ExecutorService newSingleThreadExecutor() {
  7. return new FinalizableDelegatedExecutorService
  8. (new ThreadPoolExecutor(1, 1,
  9. 0L, TimeUnit.MILLISECONDS,
  10. new LinkedBlockingQueue<Runnable>()));
  11. }
  12. public static ExecutorService newCachedThreadPool() {
  13. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  14. 60L, TimeUnit.SECONDS,
  15. new SynchronousQueue<Runnable>());
  16. }

由以上线程池的实现代码可以看到,他们都只是ThreadPoolExecutor类的封装。

  1. public ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue,
  6. ThreadFactory threadFactory,
  7. RejectedExecutionHandler handler) {
  8. }

函数的参数含义如下:

  • corePoolSize:指定了线程池中线程数量
  • maximumPoolSize:指定了线程池中的最大线程数量
  • keepAliveTime:当线程池线程数量超过corePoolSize时,多余的空闲线程的存活时间。即,超过corePoolSize的空闲线程,在多长时间内,会被销毁。
  • unit:keepAliveTime的单位
  • workQueue:任务队列,被提交但尚未被执行的任务
  • threadFactory:线程工厂,用于创建线程,一般用默认的即可。
  • handler:拒绝策略。当任务太多来不及处理,如何拒绝任务。

newFixedThreadPool方法,它返回一个corePoolSize和maximumPoolSize大小一样的,并且使用了LinkedBlockingQueue任务队列的线程池。LinkedBlockingQueue是无界的任务队列,newFixedThreadPool使用它来存放无法立即执行的任务,当任务提交的非常频繁时,该队列可能迅速膨胀,从而耗尽系统资源。

newSingleThreadExecutor方法就是返回一个容量为1的FixedThreadPool

newCachedThreadPool方法返回一个corePoolSize为0,maximumPoolSize无穷大的线程池,当线程池中的线程超过0时,多余的线程只存在60s.SynchronousQueue没有容量,每一次将要执行的任务插入到SynchronousQueue时,SynchronousQueue就会等待线程池来指派线程执行该任务。如果没有空闲线程,那么该线程池就会创建新的线程来执行该任务。所以如果有大量任务被提交,而任务的执行又不那么快,那么系统便会开启等量的线程处理,这样的做法可能会很快耗尽系统的资源。

拒绝策略

ThreadPoolExecutor的最后一个参数指定了拒绝策略,RejectedExecutionHandler类型

  1. public interface RejectedExecutionHandler {
  2. void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
  3. }

线程创建:ThreadFactory

ThreadPoolExecutor的最后第二个参数指定了线程创建,ThreadFactory类型

  1. public interface ThreadFactory {
  2. Thread newThread(Runnable r);
  3. }

拓展线程池

ThreadPoolExecutor是一个可以拓展的线程池。它提供了beforeExecute、afterExecute和terminated三个接口对线程池进行控制。

  1. public class RejectThreadPoolDemo {
  2. public static class MyTask implements Runnable{
  3. public String name;
  4. public MyTask(String name){
  5. this.name = name;
  6. }
  7. @Override
  8. public void run() {
  9. System.out.println(System.currentTimeMillis()+":Thread ID:"+Thread.currentThread().getId()+",Task Name="+name);
  10. try {
  11. Thread.sleep(100);
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. }
  15. }
  16. }
  17. public static void main(String[] args) throws InterruptedException {
  18. ExecutorService es = new ThreadPoolExecutor(5, 5,
  19. 0L, TimeUnit.MILLISECONDS,
  20. new LinkedBlockingQueue<Runnable>()){
  21. @Override
  22. protected void beforeExecute(Thread t, Runnable r) {
  23. System.out.println("执行准备:"+((MyTask)r).name);
  24. super.beforeExecute(t, r);
  25. }
  26. @Override
  27. protected void afterExecute(Runnable r, Throwable t) {
  28. System.out.println("执行完成:"+((MyTask)r).name);
  29. super.afterExecute(r, t);
  30. }
  31. };
  32. for(int i = 0;i < 5;i++){
  33. MyTask task = new MyTask("Task-geym-" + i);
  34. es.execute(task);
  35. Thread.sleep(10);
  36. }
  37. es.shutdown();
  38. }
  39. }

submit和execute方法

execute接受Runnable接口,没有返回值,会抛出异常

  1. public class DivTask implements Runnable {
  2. int a,b;
  3. public DivTask(int a, int b){
  4. this.a = a;
  5. this.b = b;
  6. }
  7. @Override
  8. public void run() {
  9. double re = a/b;
  10. System.out.println(re);
  11. }
  12. public static void main(String[] args) {
  13. ExecutorService executorService = Executors.newCachedThreadPool();
  14. for(int i = 0;i < 5;i++){
  15. executorService.execute(new DivTask(100,i));
  16. }
  17. }
  18. }
  19. // output:
  20. Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
  21. at fun.DivTask.run(DivTask.java:20)
  22. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  23. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  24. at java.lang.Thread.run(Thread.java:748)
  25. 100.0
  26. 33.0
  27. 25.0
  28. 50.0

submit接受Runnable或者Callable接口,有返回值Future,get之后才能得到异常栈

  1. public class DivTask implements Runnable {
  2. int a,b;
  3. public DivTask(int a, int b){
  4. this.a = a;
  5. this.b = b;
  6. }
  7. @Override
  8. public void run() {
  9. double re = a/b;
  10. System.out.println(re);
  11. }
  12. public static void main(String[] args) {
  13. ExecutorService executorService = Executors.newCachedThreadPool();
  14. for(int i = 0;i < 5;i++){
  15. Future future = executorService.submit(new DivTask(100,i));
  16. try {
  17. future.get();
  18. } catch (Exception e) {
  19. e.printStackTrace();
  20. }
  21. }
  22. }
  23. }
  24. output:
  25. java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
  26. at java.util.concurrent.FutureTask.report(FutureTask.java:122)
  27. at java.util.concurrent.FutureTask.get(FutureTask.java:192)
  28. at fun.DivTask.main(DivTask.java:30)
  29. Caused by: java.lang.ArithmeticException: / by zero
  30. at fun.DivTask.run(DivTask.java:21)
  31. at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
  32. at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  33. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  34. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  35. at java.lang.Thread.run(Thread.java:748)
  36. 100.0
  37. 50.0
  38. 33.0
  39. 25.0

并发集合

线程安全的HashMap

Collections.synchronizedMap

  1. public static Map m = Collections.synchronizedMap(new HashMap());

Collections.synchronizedMap()会生成一个名为SynchronizedMap的Map.

  1. private static class SynchronizedMap<K,V>
  2. implements Map<K,V>, Serializable {
  3. private static final long serialVersionUID = 1978198479659022715L;
  4. private final Map<K,V> m; // Backing Map
  5. final Object mutex; // Object on which to synchronize
  6. SynchronizedMap(Map<K,V> m) {
  7. this.m = Objects.requireNonNull(m);
  8. mutex = this;
  9. }
  10. public int size() {
  11. synchronized (mutex) {return m.size();}
  12. }

所有相关的Map操作都会使用这个mutex进行同步。从而实现线程安全。
这个包装的Map可以满足线程安全的要求,但是,它在多线程环境中性能表现并不算好。无论是对Map的读取或者写入,都需要获得mutex的锁,这会导致所有对Map的操作全部进入等待状态,直到mutex锁可用。

ConcurrentHashMap

对于HashMap来说,最重要的两个方法就是get()和put()。一种最自然的想法就是对整个HashMap加锁,必然可以得到一个线程安全的对象。但是这样做,我们就认为加锁粒度太大。对于ConcurrentHashMap,它内部进一步细分了若干个小的HashMap,称之为段(SEGEMENT)。
如果需要在ConcurrentHashMap中增加一个新的表项,并不是将整个HashMap加锁,而是首先根据hashcode得到该表项应该被存放到哪个段中,然后对该段加锁,并完成put()操作。在多线程环境中,如果多个线程同时进行put()操作,只要被加入的表项不存放在同一个段中,则线程间便可以做到真正的并行。

线程安全的List

ConcurrentLinkedQueue

这个队列使用链表作为其数据结构

CopyOnWriteArrayList

CopyOnWriteArrayList的读取完全不用加锁。

  1. public class CopyOnWriteArrayList<E>
  2. implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
  3. private static final long serialVersionUID = 8673264195747942595L;
  4. /** The lock protecting all mutators */
  5. final transient ReentrantLock lock = new ReentrantLock();
  6. /** The array, accessed only via getArray/setArray. */
  7. private transient volatile Object[] array;
  8. /**
  9. * Gets the array. Non-private so as to also be accessible
  10. * from CopyOnWriteArraySet class.
  11. */
  12. final Object[] getArray() {
  13. return array;
  14. }
  15. public E get(int index) {
  16. return get(getArray(), index);
  17. }

CopyOnWriteArrayList的写入会进行一次自我复制。也就是说当这个List需要修改时,我并不修改原有的内容。而是对原有的数据进行一次赋值,将修改的内容写入副本中。

  1. public boolean add(E e) {
  2. final ReentrantLock lock = this.lock;
  3. lock.lock();
  4. try {
  5. Object[] elements = getArray();
  6. int len = elements.length;
  7. Object[] newElements = Arrays.copyOf(elements, len + 1);
  8. newElements[len] = e;
  9. setArray(newElements);
  10. return true;
  11. } finally {
  12. lock.unlock();
  13. }
  14. }

BlockingQueue

BlockingQueue适合做数据共享的通道,他会让服务线程在队列为空时,进行等待,当有新的消息进入队列后,自动将线程唤醒,从队列中拿到数据,继续处理。
ArrayBlockingQueue基于数组实现,而LinkedBlockingQueue基于链表。ArrayBlockingQueue更适合做有界队列,因为队列中可容纳的最大元素需要在队列创建时指定。而LinkedBlockingQueue适合做无界队列,或者哪些边界值非常大的队列。

  1. final Object[] items;
  2. final ReentrantLock lock;
  3. private final Condition notEmpty;
  4. private final Condition notFull;
  1. public E take() throws InterruptedException {
  2. final ReentrantLock lock = this.lock;
  3. lock.lockInterruptibly();
  4. try {
  5. while (count == 0)
  6. notEmpty.await(); // 如果队列为空则让当前线程等待在notEmpty上
  7. return dequeue(); // 出队列
  8. } finally {
  9. lock.unlock();
  10. }
  11. }
  1. private E dequeue() {
  2. // assert lock.getHoldCount() == 1;
  3. // assert items[takeIndex] != null;
  4. final Object[] items = this.items;
  5. @SuppressWarnings("unchecked")
  6. E x = (E) items[takeIndex];
  7. items[takeIndex] = null;
  8. if (++takeIndex == items.length)
  9. takeIndex = 0;
  10. count--;
  11. if (itrs != null)
  12. itrs.elementDequeued();
  13. notFull.signal(); // 出队列然后唤醒在notFull队列上等待的线程
  14. return x;
  15. }
  1. public void put(E e) throws InterruptedException {
  2. checkNotNull(e);
  3. final ReentrantLock lock = this.lock;
  4. lock.lockInterruptibly();
  5. try {
  6. while (count == items.length) // 如果队列满了则让当前线程等待在notFull上
  7. notFull.await();
  8. enqueue(e); // 入队列
  9. } finally {
  10. lock.unlock();
  11. }
  12. }
  1. private void enqueue(E x) {
  2. // assert lock.getHoldCount() == 1;
  3. // assert items[putIndex] == null;
  4. final Object[] items = this.items;
  5. items[putIndex] = x;
  6. if (++putIndex == items.length)
  7. putIndex = 0;
  8. count++;
  9. notEmpty.signal();// 入队列然后唤醒在notEmpty队列上等待的线程
  10. }

LinkedBlockingQueue

在LinkedBlockingQueue的实现中,take()函数和put函数分别实现了从队列中取得数据和往队列中增加数据的功能。虽然两个函数都对当前队列进行了修改操作,但由于LinkedBlockingQueue是基于链表的,因此,两个操作分别作用于队列的前端和尾端,从理论上说,两者并不冲突。
如果使用独占锁,则要求在两个操作进行时获取当前队列的独占锁,那么take()和put()操作就不可能真正的并发,在运行时,它们会彼此等待对方释放锁资源。在这种情况下,锁竞争会相对比较激烈,从而影响程序在高并发时的性能。
因此在LinkedBlockingQueue的实现中,使用两把不同的锁,分离了take()和put()操作:

  1. /** Lock held by take, poll, etc */
  2. private final ReentrantLock takeLock = new ReentrantLock();
  3. /** Wait queue for waiting takes */
  4. private final Condition notEmpty = takeLock.newCondition();
  5. /** Lock held by put, offer, etc */
  6. private final ReentrantLock putLock = new ReentrantLock();
  7. /** Wait queue for waiting puts */
  8. private final Condition notFull = putLock.newCondition();

take实现:

  1. public E take() throws InterruptedException {
  2. E x;
  3. int c = -1;
  4. final AtomicInteger count = this.count;
  5. final ReentrantLock takeLock = this.takeLock;
  6. takeLock.lockInterruptibly(); //不能有两个线程同时取数据
  7. try {
  8. while (count.get() == 0) { //如果没有可用的数据
  9. notEmpty.await(); //释放锁,等待put操作之后唤醒
  10. }
  11. x = dequeue();
  12. c = count.getAndDecrement();
  13. if (c > 1) //如果有两个以上的数据,通知其他等待的线程
  14. notEmpty.signal();
  15. } finally {
  16. takeLock.unlock();
  17. }
  18. if (c == capacity)
  19. signalNotFull(); // 通知可以put()操作
  20. return x;
  21. }

put实现:

  1. public void put(E e) throws InterruptedException {
  2. if (e == null) throw new NullPointerException();
  3. int c = -1;
  4. Node<E> node = new Node<E>(e);
  5. final ReentrantLock putLock = this.putLock;
  6. final AtomicInteger count = this.count;
  7. putLock.lockInterruptibly(); //不能有两个线程同时进行put()
  8. try {
  9. while (count.get() == capacity) { //如果队列已经满了
  10. notFull.await(); //等待
  11. }
  12. enqueue(node);
  13. c = count.getAndIncrement();
  14. if (c + 1 < capacity) //有足够的空间 通知其他等待的线程
  15. notFull.signal();
  16. } finally {
  17. putLock.unlock(); //释放锁
  18. }
  19. if (c == 0)
  20. signalNotEmpty(); //通知take()操作取数据
  21. }

通过takeLock和putLock两把锁,LinkedBlockingQueue实现了取数据和写数据的分离,使两者在真正意义上成为可并发的操作。

ThreadLocal

除了控制资源的访问外,我们还可以通过增加资源来保证所有对象的线程安全。
ThreadLocal是一个线程的局部变量,也就是说,只有当前线程可以访问。既然是只有当前线程可以访问的数据,自然是线程安全的。

  1. static ThreadLocal<SimpleDateFormat> t1 = new ThreadLocal<>();
  2. static class ParseDate implements Runnable{
  3. int i = 0;
  4. public ParseDate(int i){
  5. this.i = i;
  6. }
  7. @Override
  8. public void run() {
  9. try {
  10. if(t1.get() == null){
  11. t1.set(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
  12. }
  13. Date t = t1.get().parse("2015-03-29 19:29:"+i%60);
  14. System.out.println(i+":"+t);
  15. } catch (ParseException e) {
  16. e.printStackTrace();
  17. }
  18. }
  19. }
  20. public static void main(String[] args){
  21. ExecutorService es = Executors.newFixedThreadPool(10);
  22. for(int i = 0;i < 1000;i++){
  23. es.execute(new ParseDate(i));
  24. }
  25. es.shutdown();
  26. }

如果当前线程不持有SimpleDateformat对象实例。那么就新建一个并把它设置到当前线程中,如果已经持有,则直接使用。

ThreadLocal的实现原理

set()方法:

  1. public void set(T value) {
  2. Thread t = Thread.currentThread();
  3. ThreadLocalMap map = getMap(t);
  4. if (map != null)
  5. map.set(this, value);
  6. else
  7. createMap(t, value);
  8. }
  9. ThreadLocalMap getMap(Thread t) {
  10. return t.threadLocals;
  11. }

在set时,首先获得当前线程对象,然后通过getMap()拿到当前线程中的对象ThreadLocalMap,并将值设入ThreadLocalMap中。而ThreadLocalMap可以理解为一个Map,但是它是定义在Thread内部的成员。

get():

  1. public T get() {
  2. Thread t = Thread.currentThread();
  3. ThreadLocalMap map = getMap(t);
  4. if (map != null) {
  5. ThreadLocalMap.Entry e = map.getEntry(this);
  6. if (e != null) {
  7. @SuppressWarnings("unchecked")
  8. T result = (T)e.value;
  9. return result;
  10. }
  11. }
  12. return setInitialValue();
  13. }

get方法也是先获取当前线程的ThreadLocalMap对象。然后,通过将ThreadLocal作为key取得内部的实际数据。

ThreadLocal就是一个工具壳,它通过set方法把value值放入调用线程的threadLocals里面并存放起来,当调用线程调用它的get方法时,再从当前线程的threadLocals变量里面将其拿出来使用。如果调用线程一直不终止,那么这个本地变量会一直存放在调用线程的threadLocals变量里面,所以当不需要使用本地变量时可以通过调用remove方法,从当前线程的threadLocals里面删除该本地变量。Thread里面的threadLocals被设计成map结构,因此每个线程可以关联多个ThreadLocal变量。

总结:每个线程内部都有一个名为threadLocals的成员变量,该变量的类型为HashMap,其中key为我们定义的ThreadLocal变量的this引用,value则为我们使用set方法设置的值。每个线程的本地变量存放在线程自己的内存变量threadLocals中,如果当前线程一直不消亡,那么这些本地变量会一直存在,所以可能会造成内存溢出。
image.png

无锁

比较交换(CAS)

与锁相比,使用比较交换会使程序看起来更加复杂一些。但由于其非阻塞性,它对死锁问题天生免疫,并且,线程间的相互影响也远远比基于锁的方式要小。更为重要的是,使用无锁的方式完全没有锁竞争带来的系统开销,也没有线程间频繁调度带来的开销,因此,它要比基于锁的方式拥有更优越的性能。

CAS算法过程:它包含三个参数CAS(V,E,N)。V表示要更新的变量,E表示预期值,N表示新值。仅当V值等于E值时,才会将V的值设为N,如果V值和E值不同,则说明已经有其他线程做了更新,则当前线程什么都不做。

简单来说,CAS需要你额外给出一个期望值,也就是你认为这个变量现在应该是什么样子的。如果变量不是你想象的那样,那说明它已经被别人修改过了。你就重新读取,再次尝试修改就好了。

AtomicInteger

AtomicInteger可以看做是一个整数。但是与Integer不同,它是可变的,并且是线程安全的。对其进行的修改等任何操作,都是CAS指令进行的。

  1. //实际值
  2. private volatile int value;
  3. // value字段在AtomicInteger对象中的偏移量
  4. private static final long valueOffset;
  1. public class AtomicIntegerDemo {
  2. static AtomicInteger i = new AtomicInteger();
  3. public static class AddThread implements Runnable {
  4. @Override
  5. public void run() {
  6. for(int k = 0;k < 10000;k++){
  7. i.incrementAndGet();
  8. }
  9. }
  10. }
  11. public static void main(String[] args) throws InterruptedException {
  12. Thread[] ts = new Thread[10];
  13. for(int k = 0;k < 10;k++){
  14. ts[k] = new Thread(new AddThread());
  15. }
  16. for(int k = 0;k < 10;k++){
  17. ts[k].start();
  18. }
  19. for(int k = 0;k < 10;k++){
  20. ts[k].join();
  21. }
  22. System.out.println(i);
  23. }
  24. }

AtomicInteger的incrementAndGet()方法会使用CAS操作将自己加一,同时也会返回当前值。

  1. public final int incrementAndGet() {
  2. return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
  3. }
  4. public final int getAndAddInt(Object var1, long var2, int var4) {
  5. int var5;
  6. do {
  7. var5 = this.getIntVolatile(var1, var2);
  8. } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
  9. return var5;
  10. }
  11. // 此方法是Java的native方法,并不由Java语言实现。
  12. // 方法的作用是,读取传入对象o在内存中偏移量为offset位置的值与期望值expected作比较。
  13. // 相等就把x值赋值给offset位置的值。方法返回true。
  14. // 不相等,就取消赋值,方法返回false。
  15. public final native boolean compareAndSwapInt(Object o, long offset,
  16. int expected,
  17. int x);

getAndAddInt()方法在一个循环中,直到compareAndSwapInt成功后才结束。

Unsafe类

Unsafe类封装了一些不安全的操作,也就是一些类似指针的操作。指针是不安全的,这也是Java中把指针去除的重要原因。如果指针指错了位置,或者计算指针偏移量出错,结果可能是灾难性的。

CAS的三大问题

ABA问题

因为CAS需要在操作值得时候,检查值有没有发生变化,如果没有发生变化则更新,但是如果一个值原来是A,变成了B,又变成了A,那么使用CAS进行检查时会发现的值没有发生变化,但是实际上却变化了。JDK的Atomic包提供了一个类AtomicStampedReference来解决ABA问题

循环时间长开销大

自旋CAS如果长时间不成功,会给CPU带来非常大的执行开销。

只能保证一个共享变量的原子操作

当对一个共享变量执行操作,我们可以使用循环CAS的方式来保证原子操作,对多个共享变量操作,循环CAS就无法保证操作的原子性。

Future模式

Future模式是多线程开发中非常常见的一种模式,它的核心思想是异步调用。当我们需要调用一个函数方法时,如果这个函数执行很慢,那么我们就要进行等待。有时候,我们可能并不急着要结果。因此,我们可以让被调者立即返回,让它再后台慢慢处理这个请求。对于调用者来说,则可以先处理一些其他任务,在真正需要数据的场合再去尝试获得需要的数据。
对于Future模式来说,虽然它无法立即给出你需要的数据,但是,它会返回给你一个契约,将来,你可以凭借这个契约去重新获取你需要的信息。

Future模式的主要参与者

image.png
image.png
当Main调用request时,会立马得到FutureData。当使用FutureData的getResult方法时,如果实际的数据没有准备好,那么程序就会阻塞,等待RealData准备好并注入到FutureData中,才拿到最终数据。

JDK中的future模式

image.png
RunnableFuture继承了Future和Runnable两个接口,其中run()方法用于构造真实的数据。它有一个具体的实现FutureTask类。FutureTask有一个内部类Sync,一些实质性的工作,会委托Sync类实现。而Sync类最终会调用Callable接口,完成实际数据的组装工作。

  1. public class RealData implements Callable<String> {
  2. private String para;
  3. public RealData(String para){
  4. this.para = para;
  5. }
  6. @Override
  7. public String call() throws Exception {
  8. StringBuffer sb = new StringBuffer();
  9. for(int i = 0;i < 10;i++){
  10. sb.append(para);
  11. Thread.sleep(100);
  12. }
  13. return sb.toString();
  14. }
  15. }
  1. public class FutureMain {
  2. public static void main(String[] args) throws ExecutionException, InterruptedException {
  3. FutureTask<String> future = new FutureTask<>(new RealData("a"));
  4. ExecutorService executor = Executors.newFixedThreadPool(1);
  5. executor.submit(future);
  6. System.out.println("请求完毕");
  7. System.out.println("数据 =" + future.get());
  8. }
  9. }

把实现了Callable接口的对象传给FutureTask实例,线程池创建线程执行FutureTask的run方法,FutureTask的run方法去执行实现了Callable接口的对象,此时如果主线程调用了FutureTask的get方法,但是Callable接口还没有结束,还没拿到最终数据,此时主线程就会阻塞,直到Callable方法结束。

单例模式

单例模式是设计模式中使用最为普遍的模式之一。它是一种对象创建模式,用于产生一个对象的具体实例,它可以确保系统中一个类只产生一个实例。

  1. public class Singleton {
  2. private Singleton(){
  3. System.out.println("Singleton is create");
  4. }
  5. private static Singleton instance = new Singleton();
  6. public static Singleton getInstance(){
  7. return instance;
  8. }
  9. }

把构造函数设置为private,就保证系统中不会有人意外创建多余的实例。
instance对象必须是private,因为可能一个小小的意外会使得instance变成null。
这个单例的性能非常好,因为getInstance()方法只是简单地返回instance,并没有任何锁操作,因此它在并行程序中,会有良好的表现。
但是Singleton实例在什么时候创建是不受控制的。对于静态成员instance,它会在类第一次初始化的时候被创建。这个时刻并不一定是getInstance方法第一次被调用的时候。
比如,如果你的单例中还有静态成员:

  1. public class Singleton {
  2. public static int STATUS = 1;
  3. private Singleton(){
  4. System.out.println("Singleton is create");
  5. }
  6. private static Singleton instance = new Singleton();
  7. public static Singleton getInstance(){
  8. return instance;
  9. }
  10. }

在任何地方引用这个静态变量都会导致instance实例被创建。
如果需要一种精确控制instance的创建时间的实现方式:

  1. public class LazySingleton {
  2. private LazySingleton(){
  3. System.out.println("LazySingleton is create");
  4. }
  5. private static LazySingleton instance = null;
  6. public static synchronized LazySingleton getInstance(){
  7. if(instance == null){
  8. instance = new LazySingleton();
  9. }
  10. return instance;
  11. }
  12. }

当getInstance方法被第一次调用时,创建单例对象。为了防止对象被多次创建,我们不得不使用synchronized进行方法同步。
这种实现好处是,充分利用了延迟加载,只在真正需要时创建对象。但坏处也很明显,并发环境下加锁,会对性能产生一定的影响。

  1. public class StaticSingleton {
  2. private StaticSingleton(){
  3. System.out.println("StaticSingleton is create");
  4. }
  5. private static class SingletonHolder{
  6. private static StaticSingleton instance = new StaticSingleton();
  7. }
  8. public static StaticSingleton getInstance(){
  9. return SingletonHolder.instance;
  10. }
  11. }

上述代码实现同时拥有前两种方式的优点。首先getInstance方法中没有锁,这使得在高并发环境下性能优越。其次,只有在getInstance方法被第一次调用时,StaticSingleton的实例才会被创建。

这种方法巧妙地使用了内部类和类的初始化方式,内部类SingletonHolder被声明为private,这使得我们不可能在外部访问并初始化它。我们只可能在getInstance内部对SingletonHolder类进行初始化。

生产者-消费者模式

生产者线程负责提交用户请求,消费者线程则负责具体处理生产者提交的任务。生产者和消费者之间则通过共享内存缓冲区进行通信。
生产者-消费者模式的核心组件是共享内存缓存区,它作为生产者和消费者之间的通信桥梁,避免了生产者和消费者的直接通信,从而将生产者和消费者进行解耦。上缠着不需要知道消费者的存在,消费者也不需要知道生产者的存在。
同时,由于内存缓冲区的存在,允许生产者和消费者在执行速度上存在时间差,无论是生产者在某一局部时间内速度高于消费者,还是消费者在局部时间内高于生产者,都可以通过共享内存缓冲区得到缓解,确保系统正常运行。
image.png