ReentrantLock

  1. // ReentrantLock 的基本使用
  2. public class T02_ReentrantLock2 {
  3. Lock lock = new ReentrantLock();
  4. void m1() {
  5. try {
  6. lock.lock(); //synchronized(this)
  7. for (int i = 0; i < 10; i++) {
  8. TimeUnit.SECONDS.sleep(1);
  9. System.out.println(i);
  10. }
  11. } catch (InterruptedException e) {
  12. e.printStackTrace();
  13. } finally {
  14. lock.unlock();
  15. }
  16. }
  17. void m2() {
  18. try {
  19. lock.lock();
  20. System.out.println("m2 ...");
  21. } finally {
  22. lock.unlock();
  23. }
  24. }
  25. public static void main(String[] args) {
  26. T02_ReentrantLock2 rl = new T02_ReentrantLock2();
  27. new Thread(rl::m1).start();
  28. try {
  29. TimeUnit.SECONDS.sleep(1);
  30. } catch (InterruptedException e) {
  31. e.printStackTrace();
  32. }
  33. new Thread(rl::m2).start();
  34. }
  35. }
  1. // tryLock 的使用
  2. public class T03_ReentrantLock3 {
  3. Lock lock = new ReentrantLock();
  4. void m1() {
  5. try {
  6. lock.lock();
  7. for (int i = 0; i < 3; i++) {
  8. TimeUnit.SECONDS.sleep(1);
  9. System.out.println(i);
  10. }
  11. } catch (InterruptedException e) {
  12. e.printStackTrace();
  13. } finally {
  14. lock.unlock();
  15. }
  16. }
  17. /**
  18. * 使用tryLock进行尝试锁定,不管锁定与否都立即返回,可以根据tryLock的返回值来判定是否锁定
  19. * 也可以指定tryLock的时间,但是这个重载版本会抛出异常,所以必须在finally中 unlock
  20. */
  21. void m2() {
  22. boolean locked = lock.tryLock();
  23. System.out.println("m2 ..." + locked);
  24. if (locked) lock.unlock();
  25. // boolean locked = false;
  26. // try {
  27. // locked = lock.tryLock(5, TimeUnit.SECONDS);
  28. // System.out.println("m2 ..." + locked);
  29. // } catch (InterruptedException e) {
  30. // e.printStackTrace();
  31. // } finally {
  32. // if (locked)
  33. // lock.unlock();
  34. // }
  35. }
  36. public static void main(String[] args) {
  37. T03_ReentrantLock3 rl = new T03_ReentrantLock3();
  38. new Thread(rl::m1).start();
  39. try {
  40. TimeUnit.SECONDS.sleep(1);
  41. } catch (InterruptedException e) {
  42. e.printStackTrace();
  43. }
  44. new Thread(rl::m2).start();
  45. }
  46. }
  1. // lockInterruptibly() 可以响应中断,这是 synchronized 做不到的
  2. public class T04_ReentrantLock4 {
  3. public static void main(String[] args) {
  4. Lock lock = new ReentrantLock();
  5. Thread t1 = new Thread(() -> {
  6. try {
  7. lock.lock();
  8. System.out.println("t1 start");
  9. TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
  10. System.out.println("t1 end");
  11. } catch (InterruptedException e) {
  12. System.out.println("t1 interrupted!");
  13. } finally {
  14. lock.unlock();
  15. }
  16. });
  17. t1.start();
  18. Thread t2 = new Thread(() -> {
  19. try {
  20. // lock.lock();
  21. lock.lockInterruptibly(); // 可以对interrupt()方法做出响应
  22. System.out.println("t2 start");
  23. TimeUnit.SECONDS.sleep(5);
  24. System.out.println("t2 end");
  25. } catch (InterruptedException e) {
  26. System.out.println("t2 interrupted!");
  27. } finally {
  28. lock.unlock();
  29. }
  30. });
  31. t2.start();
  32. try {
  33. TimeUnit.SECONDS.sleep(1);
  34. } catch (InterruptedException e) {
  35. e.printStackTrace();
  36. }
  37. t1.interrupt();
  38. // t2.interrupt(); // 打断线程2的等待
  39. }
  40. }
  1. // 设置 ReentrantLock 为公平锁之后,线程依次获取临界资源。公平锁是先来先服务原则。
  2. public class T05_ReentrantLock5 extends Thread {
  3. private static ReentrantLock lock = new ReentrantLock(false);
  4. public void run() {
  5. for (int i = 0; i < 100; i++) {
  6. lock.lock();
  7. try {
  8. System.out.println(Thread.currentThread().getName() + "获得锁");
  9. } finally {
  10. lock.unlock();
  11. }
  12. }
  13. }
  14. public static void main(String[] args) {
  15. T05_ReentrantLock5 rl = new T05_ReentrantLock5();
  16. Thread th1 = new Thread(rl);
  17. Thread th2 = new Thread(rl);
  18. th1.start();
  19. th2.start();
  20. }
  21. }

ReadWriteLock

  1. public class T10_TestReadWriteLock {
  2. static Lock lock = new ReentrantLock();
  3. private static int value;
  4. static ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
  5. static Lock readLock = readWriteLock.readLock();
  6. static Lock writeLock = readWriteLock.writeLock();
  7. public static void read(Lock lock) {
  8. try {
  9. lock.lock();
  10. Thread.sleep(1000);
  11. System.out.println("read over!");
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. } finally {
  15. lock.unlock();
  16. }
  17. }
  18. public static void write(Lock lock, int v) {
  19. try {
  20. lock.lock();
  21. Thread.sleep(1000);
  22. value = v;
  23. System.out.println("write over!");
  24. } catch (InterruptedException e) {
  25. e.printStackTrace();
  26. } finally {
  27. lock.unlock();
  28. }
  29. }
  30. public static void main(String[] args) {
  31. // Runnable readR = ()-> read(lock);
  32. Runnable readR = () -> read(readLock);
  33. // Runnable writeR = ()->write(lock, new Random().nextInt());
  34. Runnable writeR = () -> write(writeLock, new Random().nextInt());
  35. for (int i = 0; i < 18; i++) new Thread(readR).start();
  36. for (int i = 0; i < 2; i++) new Thread(writeR).start();
  37. }
  38. }

Semaphore

synchronized 和 ReentrantLock 都是一次只允许一个线程访问某个资源,Semaphore(信号量)可以指定多个线程同时访问某个资源。示例代码如下:

  1. public class SemaphoreExample1 {
  2. // 请求的数量
  3. private static final int threadCount = 550;
  4. public static void main(String[] args) throws InterruptedException {
  5. ExecutorService threadPool = Executors.newFixedThreadPool(300);
  6. // 一次只能允许执行的线程数量。
  7. final Semaphore semaphore = new Semaphore(20);
  8. // 公平锁的输出是按 20 有序的。
  9. // final Semaphore semaphore = new Semaphore(20, true);
  10. for (int i = 0; i < threadCount; i++) {
  11. final int threadnum = i;
  12. threadPool.execute(() -> {
  13. try {
  14. semaphore.acquire(); // 获取一个许可,所以可运行线程数量为20/1=20
  15. test(threadnum);
  16. semaphore.release(); // 释放一个许可
  17. } catch (InterruptedException e) {
  18. e.printStackTrace();
  19. }
  20. });
  21. }
  22. threadPool.shutdown();
  23. System.out.println("finish");
  24. }
  25. public static void test(int threadnum) throws InterruptedException {
  26. Thread.sleep(1000); // 模拟请求的耗时操作
  27. System.out.println("threadnum:" + threadnum);
  28. Thread.sleep(1000) ; // 模拟请求的耗时操作
  29. }
  30. }

Semaphore 有两种模式,公平模式和非公平模式。

  • 公平模式: 调用acquire的顺序就是获取许可证的顺序,遵循FIFO;
  • 非公平模式: 抢占式的。

Semaphore 对应的两个构造方法如下:

  1. public Semaphore(int permits) {
  2. sync = new NonfairSync(permits);
  3. }
  4. public Semaphore(int permits, boolean fair) {
  5. sync = fair ? new FairSync(permits) : new NonfairSync(permits);
  6. }

CountDownLatch

CountDownLatch 是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程的操作执行完后再执行。

CountDownLatch 的三种典型用法

  • 某一线程在开始运行前等待n个线程执行完毕。将 CountDownLatch 的计数器初始化为n :new CountDownLatch(n),每当一个任务线程执行完毕,就将计数器减1 countdownlatch.countDown(),当计数器的值变为0时,在CountDownLatch上 await() 的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。
  • 实现多个线程开始执行任务的最大并行性。注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行。类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。做法是初始化一个共享的 CountDownLatch 对象,将其计数器初始化为 1 :new CountDownLatch(1),多个线程在开始执行任务前首先 coundownlatch.await(),当主线程调用 countDown() 时,计数器变为0,多个线程同时被唤醒。
  • 死锁检测:一个非常方便的使用场景是,你可以使用 n 个线程访问共享资源,在每次测试阶段的线程数目是不同的,并尝试产生死锁。

CountDownLatch 的使用示例

  1. public class CountDownLatchExample1 {
  2. // 请求的数量
  3. private static final int threadCount = 550;
  4. public static void main(String[] args) throws InterruptedException {
  5. // 创建一个具有固定线程数量的线程池对象
  6. ExecutorService threadPool = Executors.newFixedThreadPool(300);
  7. final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
  8. for (int i = 0; i < threadCount; i++) {
  9. final int threadnum = i;
  10. threadPool.execute(() -> {
  11. try {
  12. test(threadnum);
  13. } catch (InterruptedException e) {
  14. e.printStackTrace();
  15. } finally {
  16. countDownLatch.countDown(); // 表示一个请求已经被完成
  17. }
  18. });
  19. }
  20. countDownLatch.await();
  21. threadPool.shutdown();
  22. System.out.println("finish");
  23. }
  24. public static void test(int threadnum) throws InterruptedException {
  25. Thread.sleep(1000);// 模拟请求的耗时操作
  26. System.out.println("threadnum:" + threadnum);
  27. Thread.sleep(1000);// 模拟请求的耗时操作
  28. }
  29. }

上面的代码中,我们定义了请求的数量为550,当这550个请求被处理完成之后,才会输出 finish。

与CountDownLatch的第一次交互是主线程等待其他线程。主线程必须在启动其他线程后立即调用CountDownLatch.await() 方法。这样主线程的操作就会在这个方法上阻塞,直到其他线程完成各自的任务。

其他N个线程必须引用闭锁对象,因为他们需要通知 CountDownLatch 对象,他们已经完成了各自的任务。这种通知机制是通过 CountDownLatch.countDown() 方法来完成的;每调用一次这个方法,在构造函数中初始化的count值就减1。所以当N个线程都调 用了这个方法,count的值等于0,然后主线程就能通过await()方法,恢复执行自己的任务。

CountDownLatch 的不足

CountDownLatch是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当 CountDownLatch 使用完毕后,它不能再次被使用。

CyclicBarrier

CyclicBarrier 的可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。CyclicBarrier 默认的构造方法是 CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞。

CyclicBarrier 的应用场景

CyclicBarrier 可以用于多线程计算数据,最后合并计算结果的应用场景。比如我们用一个 Excel 保存了用户所有银行流水,每个 Sheet 保存一个帐户近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个 Sheet 里的银行流水,都执行完之后,得到每个 Sheet 的日均银行流水,最后计算出整个Excel的日均流水。

CyclicBarrier 的使用示例

  1. public class CyclicBarrierExample2 {
  2. // 请求的数量
  3. private static final int threadCount = 550;
  4. // 需要同步的线程数量
  5. private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
  6. public static void main(String[] args) {
  7. // 创建线程池
  8. ExecutorService threadPool = Executors.newFixedThreadPool(10);
  9. for (int i = 0; i < threadCount; i++) {
  10. final int threadNum = i;
  11. Thread.sleep(1000);
  12. threadPool.execute(() -> {
  13. try {
  14. test(threadNum);
  15. } catch (Exception e) {
  16. e.printStackTrace();
  17. }
  18. });
  19. }
  20. threadPool.shutdown();
  21. }
  22. public static void test(int threadnum) {
  23. System.out.println("threadnum:" + threadnum + "is ready");
  24. try {
  25. cyclicBarrier.await();
  26. } catch (Exception e) {
  27. System.out.println("-----CyclicBarrierException------");
  28. }
  29. System.out.println("threadnum:" + threadnum + "is finish");
  30. }
  31. }

可以看到当线程数量也就是请求数量达到我们定义的 5 个的时候, await方法之后的方法才被执行。

另外,CyclicBarrier还提供一个更高级的构造函数 CyclicBarrier(int parties, Runnable barrierAction),用于在线程到达屏障时,优先执行 barrierAction,方便处理更复杂的业务场景。示例代码如下:

  1. public class CyclicBarrierExample3 {
  2. // 请求的数量
  3. private static final int threadCount = 550;
  4. // 需要同步的线程数量
  5. private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> {
  6. System.out.println("------当线程数达到之后,优先执行此方法------");
  7. });
  8. public static void main(String[] args) throws InterruptedException {
  9. // 创建线程池
  10. ExecutorService threadPool = Executors.newFixedThreadPool(10);
  11. for (int i = 0; i < threadCount; i++) {
  12. final int threadNum = i;
  13. Thread.sleep(1000);
  14. threadPool.execute(() -> {
  15. try {
  16. test(threadNum);
  17. } catch (Exception e) {
  18. e.printStackTrace();
  19. }
  20. });
  21. }
  22. threadPool.shutdown();
  23. }
  24. public static void test(int threadnum) {
  25. System.out.println("threadnum:" + threadnum + "is ready");
  26. try {
  27. cyclicBarrier.await();
  28. } catch (Exception e) {
  29. e.printStackTrace();
  30. }
  31. System.out.println("threadnum:" + threadnum + "is finish");
  32. }
  33. }

CyclicBarrier和CountDownLatch的区别

CountDownLatch是计数器,只能使用一次,而CyclicBarrier的计数器提供 reset 功能,可以多次使用。但是我不那么认为它们之间的区别仅仅就是这么简单的一点。从jdk作者设计的目的来看,javadoc是这么描述它们的:

CountDownLatch: A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes. (CountDownLatch: 一个或者多个线程,等待其他多个线程完成某件事情之后才能执行;)

CyclicBarrier : A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.(CyclicBarrier : 多个线程互相等待,直到到达同一个同步点,再继续一起执行。)

对于CountDownLatch来说,重点是“一个线程(多个线程)等待”,而其他的N个线程在完成“某件事情”之后,可以终止,也可以等待。而对于 CyclicBarrier,重点是多个线程,在任意一个线程没有完成,所有的线程都必须等待。CountDownLatch是计数器,线程完成一个记录一个,只不过计数不是递增而是递减,而CyclicBarrier更像是一个阀门,需要所有线程都到达,阀门才能打开,然后继续执行。

代码中的体现就是 CountDownLatch 在主线程中 await(),CyclicBarrier 在子线程中 await()

Java并发-04-AQS实现类&原子类 - 图1

Phaser

phaser 不是基于 AQS 模板的同步器。但是我们也放在这里解释。

  1. public class T09_TestPhaser2 {
  2. static Random r = new Random();
  3. static MarriagePhaser phaser = new MarriagePhaser();
  4. static void milliSleep(int milli) {
  5. try {
  6. TimeUnit.MILLISECONDS.sleep(milli);
  7. } catch (InterruptedException e) {
  8. e.printStackTrace();
  9. }
  10. }
  11. public static void main(String[] args) {
  12. // 绑定每个阶段都有七个部分
  13. phaser.bulkRegister(7);
  14. for (int i = 0; i < 5; i++) {
  15. new Thread(new Person("p" + i)).start();
  16. }
  17. new Thread(new Person("新郎")).start();
  18. new Thread(new Person("新娘")).start();
  19. }
  20. static class MarriagePhaser extends Phaser {
  21. @Override
  22. protected boolean onAdvance(int phase, int registeredParties) {
  23. switch (phase) {
  24. case 0:
  25. System.out.println("所有人到齐了!" + registeredParties);
  26. System.out.println();
  27. return false;
  28. case 1:
  29. System.out.println("所有人吃完了!" + registeredParties);
  30. System.out.println();
  31. return false;
  32. case 2:
  33. System.out.println("所有人离开了!" + registeredParties);
  34. System.out.println();
  35. return false;
  36. case 3:
  37. System.out.println("婚礼结束!新郎新娘抱抱!" + registeredParties);
  38. return true;
  39. default:
  40. return true;
  41. }
  42. }
  43. }
  44. static class Person implements Runnable {
  45. String name;
  46. public Person(String name) {
  47. this.name = name;
  48. }
  49. public void arrive() {
  50. milliSleep(r.nextInt(1000));
  51. System.out.printf("%s 到达现场!\n", name);
  52. // 七个部分都完成才能到下一步
  53. phaser.arriveAndAwaitAdvance();
  54. }
  55. public void eat() {
  56. milliSleep(r.nextInt(1000));
  57. System.out.printf("%s 吃完!\n", name);
  58. // 七个部分都完成才能到下一步
  59. phaser.arriveAndAwaitAdvance();
  60. }
  61. public void leave() {
  62. milliSleep(r.nextInt(1000));
  63. System.out.printf("%s 离开!\n", name);
  64. // 七个部分都完成才能到下一步
  65. phaser.arriveAndAwaitAdvance();
  66. }
  67. private void hug() {
  68. if (name.equals("新郎") || name.equals("新娘")) {
  69. milliSleep(r.nextInt(1000));
  70. System.out.printf("%s 洞房!\n", name);
  71. // 由于5个线程解除注册,所以这个地方只需要等待新娘新郎两个人就可以放行
  72. phaser.arriveAndAwaitAdvance();
  73. } else {
  74. // 不是新娘新郎就解除注册
  75. phaser.arriveAndDeregister();
  76. }
  77. }
  78. @Override
  79. public void run() {
  80. arrive();
  81. eat();
  82. leave();
  83. hug();
  84. }
  85. }
  86. }

Exchanger

phaser 不是基于 AQS 模板的同步器。但是我们也放在这里解释。

  1. public class T12_TestExchanger {
  2. static Exchanger<String> exchanger = new Exchanger<>();
  3. public static void main(String[] args) throws Exception {
  4. new Thread(()->{
  5. String s = "T1";
  6. try {
  7. // 线程会阻塞在这,知道有另一个线程和它进行交换
  8. s = exchanger.exchange(s);
  9. } catch (InterruptedException e) {
  10. e.printStackTrace();
  11. }
  12. System.out.println(Thread.currentThread().getName() + " " + s);
  13. }, "t1").start();
  14. Thread.sleep(10000);
  15. new Thread(()->{
  16. String s = "T2";
  17. try {
  18. s = exchanger.exchange(s);
  19. } catch (InterruptedException e) {
  20. e.printStackTrace();
  21. }
  22. System.out.println(Thread.currentThread().getName() + " " + s);
  23. }, "t2").start();
  24. }
  25. }

CAS

CAS 指令需要有3个操作数,分别是内存位置(在Java中可一般看见的是偏移地址,用V表示)、 旧的预期值(用A表示)和新值(用B表示)。 CAS 指令执行时,当且仅当V符合旧预期值A时,处理器用新值B更新V的值,否则它就不执行更新,但是无论是否更新了V的值,都会返回V的旧值,上述的处理过程是一个原子操作。在JDK5 之后,Java程序中才可以使用 CAS 操作,该操作由 sun.misc.Unsafe 类里面的 compareAndSwapInt()compareAndSwapLong() 等几个方法包装提供,虚拟机在内部对这些方法做了特殊处理,即时编译出来的结果就是一条平台相关的处理器 CAS 指令,没有方法调用的过程。

ABA问题

尽管CAS看起来很美,但显然这种操作无法涵盖互斥同步的所有使用场景,并且 CAS 从语义上来说并不是完美的,存在这样的一个逻辑漏洞:如果一个变量V初次读取的时候是A值,并且在准备赋值的时候检查到它仍然为A值,那我们就能说它的值没有被其他线程改变过了吗?如果在这段期间它的值曾经被改成了B,后来又被改回为A,那 CAS 操作就会误认为它从来没有被改变过,也就是说此时线程会认为没有其他线程和它争抢,实际上是有线程和它争抢的,进而就会引发线程安全问题。这个漏洞称为 CAS 操作的“ABA”问题。 JUC包为了解决这个问题,提供了一个带有标记的原子引用类 AtomicStampedReference,它可以通过控制变量值的版本来保证 CAS 的正确性。 不过目前来说这个类比较“鸡肋”,大部分情况下 ABA 问题不会影响程序并发的正确性,如果需要解决 ABA 问题,改用传统的互斥同步可能会比原子类更高效。

AtomicInteger为例:

  1. public class AtomicIntegerDefectDemo {
  2. public static void main(String[] args) throws Exception {
  3. defectOfABA();
  4. }
  5. static void defectOfABA() throws Exception {
  6. final AtomicInteger atomicInteger = new AtomicInteger(1);
  7. new Thread(() -> {
  8. final int currentValue = atomicInteger.get();
  9. System.out.println(Thread.currentThread().getName() +
  10. " ------ currentValue=" + currentValue);
  11. // 这段目的:模拟处理其他业务花费的时间
  12. try {
  13. Thread.sleep(300);
  14. } catch (InterruptedException e) {
  15. e.printStackTrace();
  16. }
  17. boolean casResult = atomicInteger.compareAndSet(1, 2);
  18. // 此处输出的true是ABA造成的
  19. System.out.println(Thread.currentThread().getName()
  20. + " ------ currentValue=" + currentValue
  21. + ", finalValue=" + atomicInteger.get()
  22. + ", compareAndSet Result=" + casResult);
  23. }).start();
  24. // 让上面的线程先跑起来
  25. Thread.sleep(100);
  26. new Thread(() -> {
  27. int currentValue = atomicInteger.get();
  28. boolean casResult = atomicInteger.compareAndSet(1, 2);
  29. System.out.println(Thread.currentThread().getName()
  30. + " ------ currentValue=" + currentValue
  31. + ", finalValue=" + atomicInteger.get()
  32. + ", compareAndSet Result=" + casResult);
  33. currentValue = atomicInteger.get();
  34. casResult = atomicInteger.compareAndSet(2, 1);
  35. System.out.println(Thread.currentThread().getName()
  36. + " ------ currentValue=" + currentValue
  37. + ", finalValue=" + atomicInteger.get()
  38. + ", compareAndSet Result=" + casResult);
  39. }).start();
  40. }
  41. }

原子类

Atomic 翻译成中文是原子的意思。在化学上,我们知道原子是构成一般物质的最小单位,在化学反应中是不可分割的。在我们这里 Atomic 是指一个操作是不可中断的。即使是在多个线程一起执行的时候,一个操作一旦开始,就不会被其他线程干扰。

所以,所谓原子类说简单点就是具有原子/原子操作特征的类。

并发包 java.util.concurrent 的原子类都存放在java.util.concurrent.atomic下,如下图所示。

Java并发-04-AQS实现类&原子类 - 图2

根据操作的数据类型,可以将JUC包中的原子类分为4类。

基本类型

使用原子的方式更新基本类型

  • AtomicInteger:整型原子类
  • AtomicLong:长整型原子类
  • AtomicBoolean :布尔型原子类

上面三个类提供的方法几乎相同,所以我们这里以 AtomicInteger 为例子来介绍。

AtomicInteger 类常用方法

  1. public final int get(); // 获取当前的值
  2. public final int getAndSet(int newValue); // 获取当前的值,并设置新的值
  3. public final int getAndIncrement(); // 获取当前的值,并自增
  4. public final int getAndDecrement(); // 获取当前的值,并自减
  5. public final int getAndAdd(int delta); // 获取当前的值,并加上预期的值
  6. boolean compareAndSet(int expect, int update);// 如果输入的数值等于预期值,则以将该值设置为输入值
  7. public final void lazySet(int newValue); // 最终设置为newValue。

AtomicInteger 常见方法使用

  1. public class AtomicIntegerTest {
  2. public static void main(String[] args) {
  3. int temvalue = 0;
  4. AtomicInteger i = new AtomicInteger(0);
  5. temvalue = i.getAndSet(3);
  6. System.out.println("temvalue:" + temvalue + "; i:" + i); // temvalue:0; i:3
  7. temvalue = i.getAndIncrement();
  8. System.out.println("temvalue:" + temvalue + "; i:" + i); // temvalue:3; i:4
  9. temvalue = i.getAndAdd(5);
  10. System.out.println("temvalue:" + temvalue + "; i:" + i); // temvalue:4; i:9
  11. }
  12. }

内部的简单原理就是 调用getAndAddInt

  1. public final int getAndAddInt(Object var1, long var2, int var4) {
  2. int var5;
  3. do {
  4. var5 = this.getIntVolatile(var1, var2);
  5. } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
  6. return var5;
  7. }

AtomicInteger 线程安全原理简单分析

  1. // setup to use Unsafe.compareAndSwapInt for updates(更新操作时提供“比较并替换”的作用)
  2. private static final Unsafe unsafe = Unsafe.getUnsafe();
  3. private static final long valueOffset;
  4. static {
  5. try {
  6. valueOffset = unsafe.objectFieldOffset
  7. (AtomicInteger.class.getDeclaredField("value"));
  8. } catch (Exception ex) { throw new Error(ex); }
  9. }
  10. private volatile int value;

AtomicInteger 类主要利用 CAS (compare and swap) + volatile 和 native 方法来保证原子操作,从而避免 synchronized 的高开销,执行效率大为提升。

CAS 的原理是拿期望的值和原本的一个值作比较,如果相同则更新成新的值。UnSafeobjectFieldOffset() 方法是一个本地方法,这个方法是用来拿到“原来的值”的内存地址。另外 value 是一个 volatile 变量,在内存中可见,因此 JVM 可以保证任何时刻任何线程总能拿到该变量的最新值。

数组类型

使用原子的方式更新数组里的某个元素

  • AtomicIntegerArray:整型数组原子类
  • AtomicLongArray:长整型数组原子类
  • AtomicReferenceArray :引用类型数组原子类

AtomicIntegerArray 类常用方法

  1. public final int get(int i); // 获取 index=i 位置元素的值
  2. public final int getAndSet(int i, int newValue); // 返回 index=i 位置的当前的值,并将其更新
  3. public final int getAndIncrement(int i); // 获取 index=i 位置元素的值,并让该位置的元素自增
  4. public final int getAndDecrement(int i); // 获取 index=i 位置元素的值,并让该位置的元素自减
  5. public final int getAndAdd(int delta); // 获取 index=i 位置元素的值,并加上预期的值
  6. boolean compareAndSet(int expect, int update); // 如果输入的数值等于预期值,则进行原子更新
  7. public final void lazySet(int i, int newValue); // 最终 将index=i 位置的元素设置为newValue

AtomicIntegerArray 常见方法使用

  1. public class AtomicIntegerArrayTest {
  2. public static void main(String[] args) {
  3. int temvalue = 0;
  4. int[] nums = { 1, 2, 3, 4, 5, 6 };
  5. AtomicIntegerArray i = new AtomicIntegerArray(nums);
  6. for (int j = 0; j < nums.length; j++) {
  7. System.out.println(i.get(j));
  8. }
  9. temvalue = i.getAndSet(0, 2);
  10. System.out.println("temvalue:" + temvalue + "; i:" + i);
  11. temvalue = i.getAndIncrement(0);
  12. System.out.println("temvalue:" + temvalue + "; i:" + i);
  13. temvalue = i.getAndAdd(0, 5);
  14. System.out.println("temvalue:" + temvalue + "; i:" + i);
  15. }
  16. }

引用类型

  • AtomicReference:引用类型原子类
  • AtomicStampedReference:原子更新引用类型里的字段原子类
  • AtomicMarkableReference :原子更新带有标记位的引用类型

基本类型原子类只能更新一个变量,如果需要原子更新多个变量,需要使用引用类型原子类。

AtomicReference 类使用示例

  1. public class AtomicReferenceTest {
  2. public static void main(String[] args) {
  3. AtomicReference<Person> ar = new AtomicReference<Person>();
  4. Person person = new Person("SnailClimb", 22);
  5. ar.set(person);
  6. Person updatePerson = new Person("Daisy", 20);
  7. ar.compareAndSet(person, updatePerson);
  8. System.out.println(ar.get().getName());
  9. System.out.println(ar.get().getAge());
  10. }
  11. }
  12. class Person {
  13. private String name;
  14. private int age;
  15. public Person(String name, int age) {
  16. super();
  17. this.name = name;
  18. this.age = age;
  19. }
  20. public String getName() {
  21. return name;
  22. }
  23. public void setName(String name) {
  24. this.name = name;
  25. }
  26. public int getAge() {
  27. return age;
  28. }
  29. public void setAge(int age) {
  30. this.age = age;
  31. }
  32. }

上述代码首先创建了一个 Person 对象,然后把 Person 对象设置进 AtomicReference 对象中,然后调用 compareAndSet 方法,该方法就是通过通过 CAS 操作设置 ar。如果 ar 的值为 person 的话,则将其设置为 updatePerson。CAS 操作中比较的是地址。

AtomicStampedReference 类使用示例

AtomicStampedReference 遇到 ABA 问题会更新失败。

  1. // 基本的API
  2. public class AtomicStampedReferenceDemo {
  3. public static void main(String[] args) {
  4. // 实例化、取当前值和 stamp 值
  5. final Integer initialRef = 0, initialStamp = 0;
  6. final AtomicStampedReference<Integer> asr =
  7. new AtomicStampedReference<>(initialRef, initialStamp);
  8. System.out.println("currentValue=" + asr.getReference() +
  9. ", currentStamp=" + asr.getStamp());
  10. // compare and set
  11. final Integer newReference = 666, newStamp = 999;
  12. final boolean casResult =
  13. asr.compareAndSet(initialRef, newReference, initialStamp, newStamp);
  14. System.out.println("currentValue=" + asr.getReference()
  15. + ", currentStamp=" + asr.getStamp()
  16. + ", casResult=" + casResult);
  17. // 获取当前的值和当前的 stamp 值
  18. int[] arr = new int[1];
  19. final Integer currentValue = asr.get(arr);
  20. final int currentStamp = arr[0];
  21. System.out.println("currentValue=" +
  22. currentValue + ", currentStamp=" + currentStamp);
  23. // 单独设置 stamp 值
  24. final boolean attemptStampResult = asr.attemptStamp(newReference, 88);
  25. System.out.println("currentValue=" + asr.getReference()
  26. + ", currentStamp=" + asr.getStamp()
  27. + ", attemptStampResult=" + attemptStampResult);
  28. // 重新设置当前值和 stamp 值
  29. asr.set(initialRef, initialStamp);
  30. System.out.println("currentValue="
  31. + asr.getReference() + ", currentStamp=" + asr.getStamp());
  32. }
  33. }
  34. /*
  35. currentValue=0, currentStamp=0
  36. currentValue=666, currentStamp=999, casResult=true
  37. currentValue=666, currentStamp=999
  38. currentValue=666, currentStamp=88, attemptStampResult=true
  39. currentValue=0, currentStamp=0
  40. */
  1. // ABA问题解决
  2. public class ABA {
  3. private static AtomicInteger atomicInt = new AtomicInteger(100);
  4. private static AtomicStampedReference atomicStampedRef =
  5. new AtomicStampedReference(100, 0);
  6. public static void main(String[] args) throws InterruptedException {
  7. Thread intT1 = new Thread(() -> {
  8. atomicInt.compareAndSet(100, 101);
  9. atomicInt.compareAndSet(101, 100);
  10. });
  11. Thread intT2 = new Thread(() -> {
  12. try {
  13. TimeUnit.SECONDS.sleep(1);
  14. } catch (InterruptedException e) {
  15. }
  16. boolean c3 = atomicInt.compareAndSet(100, 101);
  17. System.out.println(c3); // true
  18. });
  19. intT1.start();
  20. intT2.start();
  21. intT1.join();
  22. intT2.join();
  23. Thread refT1 = new Thread(() -> {
  24. try {
  25. TimeUnit.SECONDS.sleep(1);
  26. } catch (InterruptedException e) { }
  27. atomicStampedRef.compareAndSet(100, 101, atomicStampedRef.getStamp(),
  28. atomicStampedRef.getStamp() + 1);
  29. atomicStampedRef.compareAndSet(101, 100, atomicStampedRef.getStamp(),
  30. atomicStampedRef.getStamp() + 1);
  31. });
  32. Thread refT2 = new Thread(() -> {
  33. int stamp = atomicStampedRef.getStamp();
  34. try {
  35. TimeUnit.SECONDS.sleep(2);
  36. } catch (InterruptedException e) { }
  37. boolean c3 = atomicStampedRef.compareAndSet(100, 101, stamp, stamp + 1);
  38. System.out.println(c3); // false
  39. });
  40. refT1.start();
  41. refT2.start();
  42. }
  43. }

对象的属性修改类型

  • AtomicIntegerFieldUpdater:原子更新整型字段的更新器
  • AtomicLongFieldUpdater:原子更新长整型字段的更新器
  • AtomicReferenceFieldUpdater:原子更新引用类型字段的更新器

要想原子地更新对象的属性需要两步。

  • 第一步,因为对象的属性修改类型原子类都是抽象类,所以每次使用都必须使用静态方法 newUpdater() 创建一个更新器,并且需要设置想要更新的类和属性。
  • 第二步,更新的对象属性必须使用 public volatile 修饰符。

上面三个类提供的方法几乎相同,所以我们这里以 AtomicIntegerFieldUpdater为例子来介绍。

AtomicIntegerFieldUpdater 类使用示例

  1. public class AtomicIntegerFieldUpdaterTest {
  2. public static void main(String[] args) {
  3. AtomicIntegerFieldUpdater<User> a =
  4. AtomicIntegerFieldUpdater.newUpdater(User.class, "age");
  5. User user = new User("Java", 22);
  6. System.out.println(a.getAndIncrement(user)); // 22
  7. System.out.println(a.get(user)); // 23
  8. }
  9. }
  10. class User {
  11. private String name;
  12. public volatile int age;
  13. public User(String name, int age) {
  14. super();
  15. this.name = name;
  16. this.age = age;
  17. }
  18. public String getName() {
  19. return name;
  20. }
  21. public void setName(String name) {
  22. this.name = name;
  23. }
  24. public int getAge() {
  25. return age;
  26. }
  27. public void setAge(int age) {
  28. this.age = age;
  29. }
  30. }