3.1 线程同步机制简介

线程同步机制是一套用于协调线程之间的数据访问的机制,该机制可以保障线程安全。
Java平台提供的线程同步机制包括:锁、volatile、final、static以及相关的API,如Object.wait()、Object.notify()等。

3.2 锁概述

将多个线程对共享数据的并发访问转换为串行访问,即一个共享数据一次只能被一个线程访问。锁可以理解为对共享数据进行保护的一个许可证,任何线程想要访问共享数据必须持有许可证,此许可证只能被一个线程持有,并且在访问结束后,线程必须释放持有的许可证。
锁的持有线程在获得锁之后和释放锁之前这段时间执行的代码称为临界区(Critical Section)。一个锁一次只能被一个线程持有的锁称为排它锁(Exclusive)或者互斥锁(Mutex)。JVM把锁分为内部锁和显示锁两种。

  1. 内部锁通过synchronized关键字实现;
  2. 显示锁通过java.concurent.locks.Lock接口的实现类实现。

    3.2.1 锁的作用

    锁可以实现对共享数据的安全访问,保障线程的原子性、可见性与有序性。

  3. 锁是通过互斥保障原子性,一个锁只能被一个线程持有,这就保证临界区的代码一次只能被一个线程执行。

  4. 锁通过写线程处理器的缓存和读线程刷新处理器缓存这两个动作保障可见性。在Java平台中,锁的获得隐含着刷新处理器缓存动作,锁的释放隐含着冲刷处理器缓存的动作。
  5. 锁能够保障有序性,写线程在临界区所执行的在读线程所执行的临界区看起来像是完全按照源码顺序执行的。

    使用锁保障线程的安全性,必须满足以下条件:

    1. 这些线程在访问共享数据时必须使用同一个锁;
    2. 即使是读共享数据的线程也需要使用同步锁,避免脏读,例如:在写线程未完成任务时,读线程读取了这些不完整信息。

3.2.2 锁相关的概念

  1. 可重入性(Reentrancy):一个线程持有该锁时能再次(多次)申请该锁。
  2. 锁的争用与调度:Java平台中内部锁属于非公平锁,显示Lock锁既支持公平锁,又支持非公平锁。
  3. 锁的粒度:一个锁可以保护的共享数据的数量大小。保护共享数据量大,称该锁的粒度粗,反之为粒度细。过粗会导致不必要的等待,过细会增加开销。

    3.3 内部锁:synchronized关键字

    Java中的每个对象都有一个与之关联的内部锁(Intrinsic lock)。这种锁也称为监视器(Monitor),这种内部锁是一种排他锁,可以保障原子性、可见性与有序性。
    内部锁通过synchronized关键字实现,它可以修饰代码块、方法。修饰方法就称为同步实例方法,修饰静态方法称为同步静态方法。
    同步过程中,线程出现异常,线程会释放锁。

    3.3.1 修饰代码块

    1. public class Test01 {
    2. public static void main(String[] args) {
    3. Test01 obj = new Test01();
    4. // 当前的锁对象this就是obj对象
    5. new Thread(obj::doSomething).start();
    6. // 当前的锁对象this也是obj对象
    7. new Thread(obj::doSomething).start();
    8. }
    9. public void doSomething() {
    10. // 使用this当前对象作为锁对象
    11. synchronized (this) {
    12. int num = 100;
    13. for (int i = 1; i <= num; i++) {
    14. System.out.println(Thread.currentThread().getName() + " ->" + i);
    15. }
    16. }
    17. }
    18. }

    经常使用 this 作为锁对象,也可以定义一个 final 常量作为锁对象。

    3.3.2 修饰实例方法

    1. public class Test02 {
    2. public static void main(String[] args) {
    3. Test02 obj = new Test02();
    4. // 当前的锁对象this就是obj对象
    5. new Thread(obj::doSomething).start();
    6. new Thread(obj::doSomething).start();
    7. }
    8. /**
    9. * 使用synchronized修饰实例方法同步实例方法,默认this作为锁对象
    10. */
    11. public synchronized void doSomething() {
    12. int num = 100;
    13. for (int i = 1; i <= num; i++) {
    14. System.out.println(Thread.currentThread().getName() + " ->" + i);
    15. }
    16. }
    17. }

    同步方法锁的粒度比同步代码块粗,执行效率较低。

    3.3.3 修饰静态方法

    1. public class Test03 {
    2. public static void main(String[] args) {
    3. // 当前的锁对象是Test03.class
    4. new Thread(Test03::doSomething).start();
    5. new Thread(Test03::doSomething).start();
    6. }
    7. /**
    8. * 使用synchronized修饰静态方法,默认当前类的运行时类对象为锁对象,可以理解为以下代码:
    9. * public static void doSomething(){
    10. * synchronized(Test03.class){
    11. * ......
    12. * }
    13. * }
    14. */
    15. public synchronized static void doSomething() {
    16. int num = 100;
    17. for (int i = 1; i <= num; i++) {
    18. System.out.println(Thread.currentThread().getName() + " ->" + i);
    19. }
    20. }
    21. }

    可以称这种锁为类锁。

    3.3.4 死锁

    ```java /**

    • 死锁:在多线程中,同步时可能需要使用多个锁,如果获得锁的顺序不一致,可能导致死锁 */ public class Test04 {

      public static void main(String[] args) { SubThread t1 = new SubThread(); t1.setName(SubThread.thread1); t1.start();

      SubThread t2 = new SubThread(); t2.setName(SubThread.thread2); t2.start();

      }

      static class SubThread extends Thread{ private static final Object lock1 = new Object(); private static final Object lock2 = new Object();

      private static String thread1 = “a”; private static String thread2 = “b”;

      @Override public void run() {

      1. if (thread1.equals(Thread.currentThread().getName())){
      2. synchronized (lock1) {
      3. System.out.println("a线程获得了lock1锁,还需要获得lock2锁");
      4. synchronized (lock2){
      5. System.out.println("a线程获得lock1后又获得了lock2,可以想干任何想干的事");
      6. }
      7. }
      8. }
      9. if (thread2.equals(Thread.currentThread().getName())){
      10. synchronized (lock2) {
      11. System.out.println("b线程获得了lock2锁,还需要获得lock1锁");
      12. synchronized (lock1){
      13. System.out.println("b线程获得lock2后又获得了lock1,可以想干任何想干的事");
      14. }
      15. }
      16. }

      } } }

  1. <a name="7nnNQ"></a>
  2. ## 3.4 轻量级同步机制:volatile关键字
  3. <a name="D1iDt"></a>
  4. ### 3.4.1 volatile的作用
  5. 使变量在多个线程之间可见。
  6. ```java
  7. public class Test01 {
  8. public static void main(String[] args) {
  9. PrintString printString = new PrintString();
  10. // 在子线程中打印
  11. new Thread(printString::doSomething).start();
  12. try {
  13. Thread.sleep(1000);
  14. } catch (InterruptedException e) {
  15. e.printStackTrace();
  16. }
  17. System.out.println("在main线程中修改打印标志");
  18. printString.setContinuePrint(false);
  19. /*
  20. 现象:子线程依然未结束(未添加volatile)
  21. 原因:main线程修改了printString对象的打印标志后,子线程读不到
  22. 解决办法:使用volatile关键字修饰printString对象的打印标志,
  23. volatile的作用可以强制线程从公共内存读取变量的值,而不是从工作内存中读取。
  24. */
  25. }
  26. static class PrintString{
  27. private volatile boolean continuePrint = true;
  28. public PrintString setContinuePrint(boolean continuePrint) {
  29. this.continuePrint = continuePrint;
  30. return this;
  31. }
  32. public void doSomething(){
  33. System.out.println("开始……");
  34. while (continuePrint){
  35. }
  36. System.out.println("结束……");
  37. }
  38. }
  39. }

volatile与synchronized比较

  1. volatile关键字是线程同步的轻量级实现,所以volatile性能肯定比synchronized要好;
  2. volatile只能修饰变量,而synchronized可以修饰方法、代码块;
  3. 多线程访问volatile变量不会发生阻塞,而synchronized可能会阻塞;
  4. volatile保证数据的可见性,不能保证原子性,而synchronized可以保证原子性,也可以保证可见性;
  5. volatile解决的是变量在多个线程之间的可见性,synchronized解决的是多个线程之间访问公共资源的同步性。

    3.4.2 volatile非原子特性

    volatile增加了实例变量在多个线程的可见性,但不具备原子性。

    1. public class Test02 {
    2. public static void main(String[] args) {
    3. int threadNum = 100;
    4. for (int i = 0; i < threadNum; i++){
    5. // count != threadNum * sum;
    6. new MyThread().start();
    7. }
    8. }
    9. static class MyThread extends Thread {
    10. // volatile关键字仅仅表示所有线程从主内存中读取count变量的值
    11. volatile public static int count;
    12. // 这段代码不是线程安全的,需要添加synchronized关键字,它保证原子性的同时,还保证了可见性,也就不需要volatile
    13. public static void addCount(){
    14. int sum = 1000;
    15. for (int i = 0; i < sum; i++){
    16. // count++不是原子操作
    17. count++;
    18. }
    19. System.out.println(Thread.currentThread().getName() + " count = " + count);
    20. }
    21. @Override
    22. public void run() {
    23. addCount();
    24. }
    25. }
    26. }

    3.4.3 利用原子类进行自增自减

    由于 i++ 不是原子操作,除了使用synchronized进行同步外,也可以使用 AtomicInteger/AtomicLong 原子类进行实现。

    1. public class Test03 {
    2. public static void main(String[] args) throws InterruptedException {
    3. int threadNum = 100;
    4. for (int i = 0; i < threadNum; i++){
    5. new MyThread().start();
    6. }
    7. Thread.sleep(1000);
    8. System.out.println(MyThread.count.get());
    9. }
    10. static class MyThread extends Thread {
    11. private static AtomicInteger count = new AtomicInteger();
    12. public static void addCount(){
    13. int sum = 1000;
    14. for (int i = 0; i < sum; i++){
    15. count.getAndIncrement();
    16. }
    17. System.out.println(Thread.currentThread().getName() + " count = " + count);
    18. }
    19. @Override
    20. public void run() {
    21. addCount();
    22. }
    23. }
    24. }

    3.5 CAS

    CAS(Compare and Swap)是由硬件实现的。CAS将read-modify-write这类操作转换为原子操作。 i++ 自增包括三个子操作:

  6. 从主内存读取i变量值;

  7. i的值加1
  8. 再把加1之后的值保存到主内存。

CAS原理:在把数据更新到主内存时,再次读取主内存变量的值,如现在变量的值与期望值(操作起始时读取的值)一样就更新。

  1. public class CASTest {
  2. public static void main(String[] args) {
  3. CASCounter casCounter = new CASCounter();
  4. long sum = 10000;
  5. for (int i = 0; i < sum; i++){
  6. new Thread(()->System.out.println(casCounter.incrementAndGet())).start();
  7. }
  8. }
  9. }
  10. class CASCounter {
  11. /**
  12. * 使用volatile修饰value值,使线程可见
  13. */
  14. volatile private long value;
  15. public long getValue() {
  16. return value;
  17. }
  18. /**
  19. * 定义compare and swap方法
  20. */
  21. private boolean compareAndSwap(long expectedValue, long newValue) {
  22. // 如果当前value的值与期望的expectedValue值一样,就把当前的value字段替换为newValue值
  23. synchronized (this) {
  24. if (value == expectedValue) {
  25. value = newValue;
  26. return true;
  27. } else {
  28. return false;
  29. }
  30. }
  31. }
  32. /**
  33. * 定义自增方法
  34. */
  35. public long incrementAndGet() {
  36. long oldValue;
  37. long newValue;
  38. do {
  39. oldValue = value;
  40. newValue = oldValue + 1;
  41. } while (!compareAndSwap(oldValue, newValue));
  42. return newValue;
  43. }
  44. }

CAS实现原子操作背后有一个假设:共享变量的当前值与当前线程提供的期望值相同,就认为这个变量没有被其他线程修改过。
实际上这种假设不一定总是成立,可能存在 ABA 问题。

有共享变量count=0 A线程对count值修改为10 B线程对count值修改为20 C线程对count值修改为0 当前线程看到count变量的值,现在是0,现在是否认为count变量的值没有被其他线程更新呢?

如果要规避ABA问题,可以为共享变量引入一个修订号(时间戳),每次修改共享变量时,相应的修订号就会增加1,ABA变量更新过程:[A, 0] -> [B, 1] -> [A, 2],通过修改号可以准确判断变量是否被其他线程修改,AtomicStampedReference类就是基于这种思想产生的。

3.6 原子变量类

原子变量类基于CAS实现的,当前共享变量进行read-modify-write更新操作时,通过原子变量类可以保障操作的原子性与可见性,对变量的read-modify-write更新操作是指当前操作不是一个简单的赋值,而是变量的新值依赖变量的旧值,如自增操作。由于volatile能保证可见性,原子变量类内部就是借助一个volatile变量保证可见性的同时,借助CAS保证了原子性,有时把原子变量类看作是增强的volatile变量,原子变量类有12个。

分组 原子变量类
基础数据型 AtomicInteger、AtomicLong、AtomicBoolean
数组型 AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray
字段更新器 AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、AtomicReferenceFieldUpdater
引用型 AtomicReference、AtomicStampedReference、AtomicMarkableReference

3.6.1 AtomicLong

  1. /**
  2. * 使用原子变量类定义一个计数器
  3. * 该计数器在整个程序中都能使用,并且所有的地方都使用这一个计算器,这个计算器可以设计为单例
  4. * @author 王游
  5. * @date 2021/3/2 19:55
  6. */
  7. public class Indicator {
  8. /**
  9. * 构造方法私有化
  10. */
  11. private Indicator(){}
  12. /**
  13. * 定义一个私有本类静态对象
  14. */
  15. private static final Indicator INSTANCE = new Indicator();
  16. /**
  17. * 提供一个公共静态方法返回该类唯一实例
  18. */
  19. public static Indicator getInstance(){
  20. return INSTANCE;
  21. }
  22. /**
  23. * 使用原子变量类保存请求总数、成功数、失败娄
  24. */
  25. private final AtomicLong requestCount = new AtomicLong(0);
  26. private final AtomicLong successCount = new AtomicLong(0);
  27. private final AtomicLong failureCount = new AtomicLong(0);
  28. /**
  29. * 有新的请求
  30. */
  31. public void newRequestReceive(){
  32. requestCount.incrementAndGet();
  33. }
  34. /**
  35. * 处理成功
  36. */
  37. public void requestProcessSuccess(){
  38. successCount.incrementAndGet();
  39. }
  40. /**
  41. * 处理失败
  42. */
  43. public void requestProcessFailure(){
  44. failureCount.incrementAndGet();
  45. }
  46. /**
  47. * 查看请求量
  48. */
  49. public long getRequestCount(){
  50. return requestCount.get();
  51. }
  52. /**
  53. * 查看成功量
  54. */
  55. public long getSuccessCount(){
  56. return successCount.get();
  57. }
  58. /**
  59. * 查看失败量
  60. */
  61. public long getFailureCount(){
  62. return failureCount.get();
  63. }
  64. }
  65. /**
  66. * 模拟服务器的请求总数,处理成功/失败的数量
  67. */
  68. public class Test {
  69. public static void main(String[] args) {
  70. int threadNum = 10000;
  71. for (int i = 0; i < threadNum; i++) {
  72. new Thread(() -> {
  73. Indicator.getInstance().newRequestReceive();
  74. int num = new Random().nextInt();
  75. if (num % 2 == 0) {
  76. Indicator.getInstance().requestProcessSuccess();
  77. } else {
  78. Indicator.getInstance().requestProcessFailure();
  79. }
  80. }).start();
  81. }
  82. try {
  83. Thread.sleep(1000);
  84. } catch (InterruptedException e) {
  85. e.printStackTrace();
  86. }
  87. System.out.println("总数:" + Indicator.getInstance().getRequestCount());
  88. System.out.println("成功:" + Indicator.getInstance().getSuccessCount());
  89. System.out.println("失败:" + Indicator.getInstance().getFailureCount());
  90. }
  91. }

3.6.2 AtomicIntegerArray

  1. /**
  2. * AtomicIntegerArray的基本操作
  3. */
  4. public class Test {
  5. public static void main(String[] args) {
  6. // 1) 创建一个指定长度的原子数组:[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
  7. AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(10);
  8. System.out.println("1:" + atomicIntegerArray);
  9. // 2) 返回指定位置的元素:0
  10. System.out.println("2:" + atomicIntegerArray.get(0));
  11. // 3) 设置指定位置的元素
  12. atomicIntegerArray.set(0, 10);
  13. // 4) 设置新值时,返回数组元素的旧值:10
  14. System.out.println("4:" + atomicIntegerArray.getAndSet(0, 20));
  15. // 5) 修改数组元素的值,把数组元素加上某个值:42、42
  16. System.out.println("5:" + atomicIntegerArray.addAndGet(0, 22));
  17. System.out.println("5:" + atomicIntegerArray.getAndAdd(0, 33));
  18. // 6) CAS操作,如果数组中索引值为0的元素的值是32,就修改为222:true
  19. System.out.println("6:" + atomicIntegerArray.compareAndSet(0, 75, 222));
  20. // 6) 自增自减:1、1、1
  21. System.out.println("6:" + atomicIntegerArray.incrementAndGet(1));
  22. System.out.println("6:" + atomicIntegerArray.getAndIncrement(1));
  23. System.out.println("6:" + atomicIntegerArray.decrementAndGet(1));
  24. }
  25. }

3.6.3 AtomicIntegerFieldUpdater

AtomicIntegerFieldUpdater可以对原子整数字段进行更新,要求:

  1. 字符必须使用volatile修饰,使线程之间可见;
  2. 只有是实例变量,不能是静态变量,也不能是使用final修饰的字段。 ```java public class User { int id; /**

    • 使用AtomicIntegerFieldUpdater更新的字段必须使用volatile修饰 */ volatile int age;

      public User(int id, int age) { this.id = id; this.age = age; }

      @Override public String toString() { return “User{“ +

      1. "id=" + id +
      2. ", age=" + age +
      3. '}';

      } }

public class SubThread extends Thread{ private User user; /**

  1. * 创建age字段更新器
  2. */
  3. private AtomicIntegerFieldUpdater<User> updater = AtomicIntegerFieldUpdater.newUpdater(User.class, "age");
  4. public SubThread(User user){
  5. this.user = user;
  6. }
  7. @Override
  8. public void run() {
  9. int age = 10;
  10. for (int i = 0; i < age; i++){
  11. System.out.println(updater.getAndIncrement(user));
  12. }
  13. }

}

public class Test { public static void main(String[] args) { User user = new User(1234, 10);

  1. int threadNum = 10;
  2. for (int i = 0; i < threadNum; i++){
  3. new SubThread(user).start();
  4. }
  5. try {
  6. Thread.sleep(1000);
  7. } catch (InterruptedException e) {
  8. e.printStackTrace();
  9. }
  10. System.out.println(user);
  11. }

}

  1. <a name="EjVFK"></a>
  2. ### 3.6.4 AtomicReference
  3. ```java
  4. public class Test {
  5. /**
  6. * static AtomicStampedReference<String> atomicStampedReference = new AtomicStampedReference<>("abc", 0);
  7. * atomicStampedReference.compareAndSet("abc", "def", oldStamp(取值时的版本号), newStamp);
  8. */
  9. static AtomicReference<String> atomicReference = new AtomicReference<>("abc");
  10. public static void main(String[] args) {
  11. int threadNum = 100;
  12. for(int i = 0; i < threadNum; i++){
  13. new Thread(()->{
  14. if (atomicReference.compareAndSet("abc","def")){
  15. System.out.println(Thread.currentThread().getName() + "把字符串abc更改为def");
  16. }
  17. }).start();
  18. }
  19. for (int i = 0; i < threadNum; i++){
  20. new Thread(()->{
  21. if (atomicReference.compareAndSet("def","abc")){
  22. System.out.println(Thread.currentThread().getName() + "把字符串def更改为abc");
  23. }
  24. }).start();
  25. }
  26. }
  27. }

AtomicReference可能导致ABA问题。AtomicStampedReference能通过时间戳解决ABA问题,AtomicMarkableReference能通过标志位解决ABA问题。