ReentrantLock
// ReentrantLock 的基本使用
public class T02_ReentrantLock2 {
Lock lock = new ReentrantLock();
void m1() {
try {
lock.lock(); //synchronized(this)
for (int i = 0; i < 10; i++) {
TimeUnit.SECONDS.sleep(1);
System.out.println(i);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
void m2() {
try {
lock.lock();
System.out.println("m2 ...");
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
T02_ReentrantLock2 rl = new T02_ReentrantLock2();
new Thread(rl::m1).start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(rl::m2).start();
}
}
// tryLock 的使用
public class T03_ReentrantLock3 {
Lock lock = new ReentrantLock();
void m1() {
try {
lock.lock();
for (int i = 0; i < 3; i++) {
TimeUnit.SECONDS.sleep(1);
System.out.println(i);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
/**
* 使用tryLock进行尝试锁定,不管锁定与否都立即返回,可以根据tryLock的返回值来判定是否锁定
* 也可以指定tryLock的时间,但是这个重载版本会抛出异常,所以必须在finally中 unlock
*/
void m2() {
boolean locked = lock.tryLock();
System.out.println("m2 ..." + locked);
if (locked) lock.unlock();
// boolean locked = false;
// try {
// locked = lock.tryLock(5, TimeUnit.SECONDS);
// System.out.println("m2 ..." + locked);
// } catch (InterruptedException e) {
// e.printStackTrace();
// } finally {
// if (locked)
// lock.unlock();
// }
}
public static void main(String[] args) {
T03_ReentrantLock3 rl = new T03_ReentrantLock3();
new Thread(rl::m1).start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(rl::m2).start();
}
}
// lockInterruptibly() 可以响应中断,这是 synchronized 做不到的
public class T04_ReentrantLock4 {
public static void main(String[] args) {
Lock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
try {
lock.lock();
System.out.println("t1 start");
TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
System.out.println("t1 end");
} catch (InterruptedException e) {
System.out.println("t1 interrupted!");
} finally {
lock.unlock();
}
});
t1.start();
Thread t2 = new Thread(() -> {
try {
// lock.lock();
lock.lockInterruptibly(); // 可以对interrupt()方法做出响应
System.out.println("t2 start");
TimeUnit.SECONDS.sleep(5);
System.out.println("t2 end");
} catch (InterruptedException e) {
System.out.println("t2 interrupted!");
} finally {
lock.unlock();
}
});
t2.start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
t1.interrupt();
// t2.interrupt(); // 打断线程2的等待
}
}
// 设置 ReentrantLock 为公平锁之后,线程依次获取临界资源。公平锁是先来先服务原则。
public class T05_ReentrantLock5 extends Thread {
private static ReentrantLock lock = new ReentrantLock(false);
public void run() {
for (int i = 0; i < 100; i++) {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "获得锁");
} finally {
lock.unlock();
}
}
}
public static void main(String[] args) {
T05_ReentrantLock5 rl = new T05_ReentrantLock5();
Thread th1 = new Thread(rl);
Thread th2 = new Thread(rl);
th1.start();
th2.start();
}
}
ReadWriteLock
public class T10_TestReadWriteLock {
static Lock lock = new ReentrantLock();
private static int value;
static ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
static Lock readLock = readWriteLock.readLock();
static Lock writeLock = readWriteLock.writeLock();
public static void read(Lock lock) {
try {
lock.lock();
Thread.sleep(1000);
System.out.println("read over!");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public static void write(Lock lock, int v) {
try {
lock.lock();
Thread.sleep(1000);
value = v;
System.out.println("write over!");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
// Runnable readR = ()-> read(lock);
Runnable readR = () -> read(readLock);
// Runnable writeR = ()->write(lock, new Random().nextInt());
Runnable writeR = () -> write(writeLock, new Random().nextInt());
for (int i = 0; i < 18; i++) new Thread(readR).start();
for (int i = 0; i < 2; i++) new Thread(writeR).start();
}
}
Semaphore
synchronized 和 ReentrantLock 都是一次只允许一个线程访问某个资源,Semaphore(信号量)可以指定多个线程同时访问某个资源。示例代码如下:
public class SemaphoreExample1 {
// 请求的数量
private static final int threadCount = 550;
public static void main(String[] args) throws InterruptedException {
ExecutorService threadPool = Executors.newFixedThreadPool(300);
// 一次只能允许执行的线程数量。
final Semaphore semaphore = new Semaphore(20);
// 公平锁的输出是按 20 有序的。
// final Semaphore semaphore = new Semaphore(20, true);
for (int i = 0; i < threadCount; i++) {
final int threadnum = i;
threadPool.execute(() -> {
try {
semaphore.acquire(); // 获取一个许可,所以可运行线程数量为20/1=20
test(threadnum);
semaphore.release(); // 释放一个许可
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
threadPool.shutdown();
System.out.println("finish");
}
public static void test(int threadnum) throws InterruptedException {
Thread.sleep(1000); // 模拟请求的耗时操作
System.out.println("threadnum:" + threadnum);
Thread.sleep(1000) ; // 模拟请求的耗时操作
}
}
Semaphore 有两种模式,公平模式和非公平模式。
- 公平模式: 调用acquire的顺序就是获取许可证的顺序,遵循FIFO;
- 非公平模式: 抢占式的。
Semaphore 对应的两个构造方法如下:
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
CountDownLatch
CountDownLatch 是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程的操作执行完后再执行。
CountDownLatch 的三种典型用法
- 某一线程在开始运行前等待n个线程执行完毕。将 CountDownLatch 的计数器初始化为n :
new CountDownLatch(n)
,每当一个任务线程执行完毕,就将计数器减1countdownlatch.countDown()
,当计数器的值变为0时,在CountDownLatch上 await()
的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。 - 实现多个线程开始执行任务的最大并行性。注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行。类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。做法是初始化一个共享的
CountDownLatch
对象,将其计数器初始化为 1 :new CountDownLatch(1)
,多个线程在开始执行任务前首先coundownlatch.await()
,当主线程调用countDown()
时,计数器变为0,多个线程同时被唤醒。 - 死锁检测:一个非常方便的使用场景是,你可以使用 n 个线程访问共享资源,在每次测试阶段的线程数目是不同的,并尝试产生死锁。
CountDownLatch 的使用示例
public class CountDownLatchExample1 {
// 请求的数量
private static final int threadCount = 550;
public static void main(String[] args) throws InterruptedException {
// 创建一个具有固定线程数量的线程池对象
ExecutorService threadPool = Executors.newFixedThreadPool(300);
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
final int threadnum = i;
threadPool.execute(() -> {
try {
test(threadnum);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
countDownLatch.countDown(); // 表示一个请求已经被完成
}
});
}
countDownLatch.await();
threadPool.shutdown();
System.out.println("finish");
}
public static void test(int threadnum) throws InterruptedException {
Thread.sleep(1000);// 模拟请求的耗时操作
System.out.println("threadnum:" + threadnum);
Thread.sleep(1000);// 模拟请求的耗时操作
}
}
上面的代码中,我们定义了请求的数量为550,当这550个请求被处理完成之后,才会输出 finish。
与CountDownLatch的第一次交互是主线程等待其他线程。主线程必须在启动其他线程后立即调用CountDownLatch.await()
方法。这样主线程的操作就会在这个方法上阻塞,直到其他线程完成各自的任务。
其他N个线程必须引用闭锁对象,因为他们需要通知 CountDownLatch 对象,他们已经完成了各自的任务。这种通知机制是通过 CountDownLatch.countDown()
方法来完成的;每调用一次这个方法,在构造函数中初始化的count值就减1。所以当N个线程都调 用了这个方法,count的值等于0,然后主线程就能通过await()方法,恢复执行自己的任务。
CountDownLatch 的不足
CountDownLatch是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当 CountDownLatch 使用完毕后,它不能再次被使用。
CyclicBarrier
CyclicBarrier 的可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。CyclicBarrier 默认的构造方法是 CyclicBarrier(int parties)
,其参数表示屏障拦截的线程数量,每个线程调用await
方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞。
CyclicBarrier 的应用场景
CyclicBarrier 可以用于多线程计算数据,最后合并计算结果的应用场景。比如我们用一个 Excel 保存了用户所有银行流水,每个 Sheet 保存一个帐户近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个 Sheet 里的银行流水,都执行完之后,得到每个 Sheet 的日均银行流水,最后计算出整个Excel的日均流水。
CyclicBarrier 的使用示例
public class CyclicBarrierExample2 {
// 请求的数量
private static final int threadCount = 550;
// 需要同步的线程数量
private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
public static void main(String[] args) {
// 创建线程池
ExecutorService threadPool = Executors.newFixedThreadPool(10);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
Thread.sleep(1000);
threadPool.execute(() -> {
try {
test(threadNum);
} catch (Exception e) {
e.printStackTrace();
}
});
}
threadPool.shutdown();
}
public static void test(int threadnum) {
System.out.println("threadnum:" + threadnum + "is ready");
try {
cyclicBarrier.await();
} catch (Exception e) {
System.out.println("-----CyclicBarrierException------");
}
System.out.println("threadnum:" + threadnum + "is finish");
}
}
可以看到当线程数量也就是请求数量达到我们定义的 5 个的时候, await
方法之后的方法才被执行。
另外,CyclicBarrier还提供一个更高级的构造函数 CyclicBarrier(int parties, Runnable barrierAction)
,用于在线程到达屏障时,优先执行 barrierAction
,方便处理更复杂的业务场景。示例代码如下:
public class CyclicBarrierExample3 {
// 请求的数量
private static final int threadCount = 550;
// 需要同步的线程数量
private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> {
System.out.println("------当线程数达到之后,优先执行此方法------");
});
public static void main(String[] args) throws InterruptedException {
// 创建线程池
ExecutorService threadPool = Executors.newFixedThreadPool(10);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
Thread.sleep(1000);
threadPool.execute(() -> {
try {
test(threadNum);
} catch (Exception e) {
e.printStackTrace();
}
});
}
threadPool.shutdown();
}
public static void test(int threadnum) {
System.out.println("threadnum:" + threadnum + "is ready");
try {
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("threadnum:" + threadnum + "is finish");
}
}
CyclicBarrier和CountDownLatch的区别
CountDownLatch是计数器,只能使用一次,而CyclicBarrier的计数器提供 reset 功能,可以多次使用。但是我不那么认为它们之间的区别仅仅就是这么简单的一点。从jdk作者设计的目的来看,javadoc是这么描述它们的:
CountDownLatch: A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes. (CountDownLatch: 一个或者多个线程,等待其他多个线程完成某件事情之后才能执行;)
CyclicBarrier : A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.(CyclicBarrier : 多个线程互相等待,直到到达同一个同步点,再继续一起执行。)
对于CountDownLatch来说,重点是“一个线程(多个线程)等待”,而其他的N个线程在完成“某件事情”之后,可以终止,也可以等待。而对于 CyclicBarrier,重点是多个线程,在任意一个线程没有完成,所有的线程都必须等待。CountDownLatch是计数器,线程完成一个记录一个,只不过计数不是递增而是递减,而CyclicBarrier更像是一个阀门,需要所有线程都到达,阀门才能打开,然后继续执行。
代码中的体现就是 CountDownLatch 在主线程中 await()
,CyclicBarrier 在子线程中 await()
。
Phaser
phaser 不是基于 AQS 模板的同步器。但是我们也放在这里解释。
public class T09_TestPhaser2 {
static Random r = new Random();
static MarriagePhaser phaser = new MarriagePhaser();
static void milliSleep(int milli) {
try {
TimeUnit.MILLISECONDS.sleep(milli);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
// 绑定每个阶段都有七个部分
phaser.bulkRegister(7);
for (int i = 0; i < 5; i++) {
new Thread(new Person("p" + i)).start();
}
new Thread(new Person("新郎")).start();
new Thread(new Person("新娘")).start();
}
static class MarriagePhaser extends Phaser {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
switch (phase) {
case 0:
System.out.println("所有人到齐了!" + registeredParties);
System.out.println();
return false;
case 1:
System.out.println("所有人吃完了!" + registeredParties);
System.out.println();
return false;
case 2:
System.out.println("所有人离开了!" + registeredParties);
System.out.println();
return false;
case 3:
System.out.println("婚礼结束!新郎新娘抱抱!" + registeredParties);
return true;
default:
return true;
}
}
}
static class Person implements Runnable {
String name;
public Person(String name) {
this.name = name;
}
public void arrive() {
milliSleep(r.nextInt(1000));
System.out.printf("%s 到达现场!\n", name);
// 七个部分都完成才能到下一步
phaser.arriveAndAwaitAdvance();
}
public void eat() {
milliSleep(r.nextInt(1000));
System.out.printf("%s 吃完!\n", name);
// 七个部分都完成才能到下一步
phaser.arriveAndAwaitAdvance();
}
public void leave() {
milliSleep(r.nextInt(1000));
System.out.printf("%s 离开!\n", name);
// 七个部分都完成才能到下一步
phaser.arriveAndAwaitAdvance();
}
private void hug() {
if (name.equals("新郎") || name.equals("新娘")) {
milliSleep(r.nextInt(1000));
System.out.printf("%s 洞房!\n", name);
// 由于5个线程解除注册,所以这个地方只需要等待新娘新郎两个人就可以放行
phaser.arriveAndAwaitAdvance();
} else {
// 不是新娘新郎就解除注册
phaser.arriveAndDeregister();
}
}
@Override
public void run() {
arrive();
eat();
leave();
hug();
}
}
}
Exchanger
phaser 不是基于 AQS 模板的同步器。但是我们也放在这里解释。
public class T12_TestExchanger {
static Exchanger<String> exchanger = new Exchanger<>();
public static void main(String[] args) throws Exception {
new Thread(()->{
String s = "T1";
try {
// 线程会阻塞在这,知道有另一个线程和它进行交换
s = exchanger.exchange(s);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " " + s);
}, "t1").start();
Thread.sleep(10000);
new Thread(()->{
String s = "T2";
try {
s = exchanger.exchange(s);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " " + s);
}, "t2").start();
}
}
CAS
CAS 指令需要有3个操作数,分别是内存位置(在Java中可一般看见的是偏移地址,用V表示)、 旧的预期值(用A表示)和新值(用B表示)。 CAS 指令执行时,当且仅当V符合旧预期值A时,处理器用新值B更新V的值,否则它就不执行更新,但是无论是否更新了V的值,都会返回V的旧值,上述的处理过程是一个原子操作。在JDK5 之后,Java程序中才可以使用 CAS 操作,该操作由 sun.misc.Unsafe
类里面的 compareAndSwapInt()
和compareAndSwapLong()
等几个方法包装提供,虚拟机在内部对这些方法做了特殊处理,即时编译出来的结果就是一条平台相关的处理器 CAS 指令,没有方法调用的过程。
ABA问题
尽管CAS看起来很美,但显然这种操作无法涵盖互斥同步的所有使用场景,并且 CAS 从语义上来说并不是完美的,存在这样的一个逻辑漏洞:如果一个变量V初次读取的时候是A值,并且在准备赋值的时候检查到它仍然为A值,那我们就能说它的值没有被其他线程改变过了吗?如果在这段期间它的值曾经被改成了B,后来又被改回为A,那 CAS 操作就会误认为它从来没有被改变过,也就是说此时线程会认为没有其他线程和它争抢,实际上是有线程和它争抢的,进而就会引发线程安全问题。这个漏洞称为 CAS 操作的“ABA”问题。 JUC包为了解决这个问题,提供了一个带有标记的原子引用类 AtomicStampedReference
,它可以通过控制变量值的版本来保证 CAS 的正确性。 不过目前来说这个类比较“鸡肋”,大部分情况下 ABA 问题不会影响程序并发的正确性,如果需要解决 ABA 问题,改用传统的互斥同步可能会比原子类更高效。
以AtomicInteger
为例:
public class AtomicIntegerDefectDemo {
public static void main(String[] args) throws Exception {
defectOfABA();
}
static void defectOfABA() throws Exception {
final AtomicInteger atomicInteger = new AtomicInteger(1);
new Thread(() -> {
final int currentValue = atomicInteger.get();
System.out.println(Thread.currentThread().getName() +
" ------ currentValue=" + currentValue);
// 这段目的:模拟处理其他业务花费的时间
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
boolean casResult = atomicInteger.compareAndSet(1, 2);
// 此处输出的true是ABA造成的
System.out.println(Thread.currentThread().getName()
+ " ------ currentValue=" + currentValue
+ ", finalValue=" + atomicInteger.get()
+ ", compareAndSet Result=" + casResult);
}).start();
// 让上面的线程先跑起来
Thread.sleep(100);
new Thread(() -> {
int currentValue = atomicInteger.get();
boolean casResult = atomicInteger.compareAndSet(1, 2);
System.out.println(Thread.currentThread().getName()
+ " ------ currentValue=" + currentValue
+ ", finalValue=" + atomicInteger.get()
+ ", compareAndSet Result=" + casResult);
currentValue = atomicInteger.get();
casResult = atomicInteger.compareAndSet(2, 1);
System.out.println(Thread.currentThread().getName()
+ " ------ currentValue=" + currentValue
+ ", finalValue=" + atomicInteger.get()
+ ", compareAndSet Result=" + casResult);
}).start();
}
}
原子类
Atomic 翻译成中文是原子的意思。在化学上,我们知道原子是构成一般物质的最小单位,在化学反应中是不可分割的。在我们这里 Atomic 是指一个操作是不可中断的。即使是在多个线程一起执行的时候,一个操作一旦开始,就不会被其他线程干扰。
所以,所谓原子类说简单点就是具有原子/原子操作特征的类。
并发包 java.util.concurrent
的原子类都存放在java.util.concurrent.atomic
下,如下图所示。
根据操作的数据类型,可以将JUC包中的原子类分为4类。
基本类型
使用原子的方式更新基本类型
- AtomicInteger:整型原子类
- AtomicLong:长整型原子类
- AtomicBoolean :布尔型原子类
上面三个类提供的方法几乎相同,所以我们这里以 AtomicInteger 为例子来介绍。
AtomicInteger 类常用方法
public final int get(); // 获取当前的值
public final int getAndSet(int newValue); // 获取当前的值,并设置新的值
public final int getAndIncrement(); // 获取当前的值,并自增
public final int getAndDecrement(); // 获取当前的值,并自减
public final int getAndAdd(int delta); // 获取当前的值,并加上预期的值
boolean compareAndSet(int expect, int update);// 如果输入的数值等于预期值,则以将该值设置为输入值
public final void lazySet(int newValue); // 最终设置为newValue。
AtomicInteger 常见方法使用
public class AtomicIntegerTest {
public static void main(String[] args) {
int temvalue = 0;
AtomicInteger i = new AtomicInteger(0);
temvalue = i.getAndSet(3);
System.out.println("temvalue:" + temvalue + "; i:" + i); // temvalue:0; i:3
temvalue = i.getAndIncrement();
System.out.println("temvalue:" + temvalue + "; i:" + i); // temvalue:3; i:4
temvalue = i.getAndAdd(5);
System.out.println("temvalue:" + temvalue + "; i:" + i); // temvalue:4; i:9
}
}
内部的简单原理就是 调用getAndAddInt
:
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
AtomicInteger 线程安全原理简单分析
// setup to use Unsafe.compareAndSwapInt for updates(更新操作时提供“比较并替换”的作用)
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
private volatile int value;
AtomicInteger 类主要利用 CAS (compare and swap) + volatile 和 native 方法来保证原子操作,从而避免 synchronized 的高开销,执行效率大为提升。
CAS 的原理是拿期望的值和原本的一个值作比较,如果相同则更新成新的值。UnSafe
的 objectFieldOffset()
方法是一个本地方法,这个方法是用来拿到“原来的值”的内存地址。另外 value 是一个 volatile 变量,在内存中可见,因此 JVM 可以保证任何时刻任何线程总能拿到该变量的最新值。
数组类型
使用原子的方式更新数组里的某个元素
- AtomicIntegerArray:整型数组原子类
- AtomicLongArray:长整型数组原子类
- AtomicReferenceArray :引用类型数组原子类
AtomicIntegerArray 类常用方法
public final int get(int i); // 获取 index=i 位置元素的值
public final int getAndSet(int i, int newValue); // 返回 index=i 位置的当前的值,并将其更新
public final int getAndIncrement(int i); // 获取 index=i 位置元素的值,并让该位置的元素自增
public final int getAndDecrement(int i); // 获取 index=i 位置元素的值,并让该位置的元素自减
public final int getAndAdd(int delta); // 获取 index=i 位置元素的值,并加上预期的值
boolean compareAndSet(int expect, int update); // 如果输入的数值等于预期值,则进行原子更新
public final void lazySet(int i, int newValue); // 最终 将index=i 位置的元素设置为newValue
AtomicIntegerArray 常见方法使用
public class AtomicIntegerArrayTest {
public static void main(String[] args) {
int temvalue = 0;
int[] nums = { 1, 2, 3, 4, 5, 6 };
AtomicIntegerArray i = new AtomicIntegerArray(nums);
for (int j = 0; j < nums.length; j++) {
System.out.println(i.get(j));
}
temvalue = i.getAndSet(0, 2);
System.out.println("temvalue:" + temvalue + "; i:" + i);
temvalue = i.getAndIncrement(0);
System.out.println("temvalue:" + temvalue + "; i:" + i);
temvalue = i.getAndAdd(0, 5);
System.out.println("temvalue:" + temvalue + "; i:" + i);
}
}
引用类型
- AtomicReference:引用类型原子类
- AtomicStampedReference:原子更新引用类型里的字段原子类
- AtomicMarkableReference :原子更新带有标记位的引用类型
基本类型原子类只能更新一个变量,如果需要原子更新多个变量,需要使用引用类型原子类。
AtomicReference 类使用示例
public class AtomicReferenceTest {
public static void main(String[] args) {
AtomicReference<Person> ar = new AtomicReference<Person>();
Person person = new Person("SnailClimb", 22);
ar.set(person);
Person updatePerson = new Person("Daisy", 20);
ar.compareAndSet(person, updatePerson);
System.out.println(ar.get().getName());
System.out.println(ar.get().getAge());
}
}
class Person {
private String name;
private int age;
public Person(String name, int age) {
super();
this.name = name;
this.age = age;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}
上述代码首先创建了一个 Person 对象,然后把 Person 对象设置进 AtomicReference 对象中,然后调用 compareAndSet 方法,该方法就是通过通过 CAS 操作设置 ar。如果 ar 的值为 person 的话,则将其设置为 updatePerson。CAS 操作中比较的是地址。
AtomicStampedReference 类使用示例
AtomicStampedReference
遇到 ABA 问题会更新失败。
// 基本的API
public class AtomicStampedReferenceDemo {
public static void main(String[] args) {
// 实例化、取当前值和 stamp 值
final Integer initialRef = 0, initialStamp = 0;
final AtomicStampedReference<Integer> asr =
new AtomicStampedReference<>(initialRef, initialStamp);
System.out.println("currentValue=" + asr.getReference() +
", currentStamp=" + asr.getStamp());
// compare and set
final Integer newReference = 666, newStamp = 999;
final boolean casResult =
asr.compareAndSet(initialRef, newReference, initialStamp, newStamp);
System.out.println("currentValue=" + asr.getReference()
+ ", currentStamp=" + asr.getStamp()
+ ", casResult=" + casResult);
// 获取当前的值和当前的 stamp 值
int[] arr = new int[1];
final Integer currentValue = asr.get(arr);
final int currentStamp = arr[0];
System.out.println("currentValue=" +
currentValue + ", currentStamp=" + currentStamp);
// 单独设置 stamp 值
final boolean attemptStampResult = asr.attemptStamp(newReference, 88);
System.out.println("currentValue=" + asr.getReference()
+ ", currentStamp=" + asr.getStamp()
+ ", attemptStampResult=" + attemptStampResult);
// 重新设置当前值和 stamp 值
asr.set(initialRef, initialStamp);
System.out.println("currentValue="
+ asr.getReference() + ", currentStamp=" + asr.getStamp());
}
}
/*
currentValue=0, currentStamp=0
currentValue=666, currentStamp=999, casResult=true
currentValue=666, currentStamp=999
currentValue=666, currentStamp=88, attemptStampResult=true
currentValue=0, currentStamp=0
*/
// ABA问题解决
public class ABA {
private static AtomicInteger atomicInt = new AtomicInteger(100);
private static AtomicStampedReference atomicStampedRef =
new AtomicStampedReference(100, 0);
public static void main(String[] args) throws InterruptedException {
Thread intT1 = new Thread(() -> {
atomicInt.compareAndSet(100, 101);
atomicInt.compareAndSet(101, 100);
});
Thread intT2 = new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
boolean c3 = atomicInt.compareAndSet(100, 101);
System.out.println(c3); // true
});
intT1.start();
intT2.start();
intT1.join();
intT2.join();
Thread refT1 = new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) { }
atomicStampedRef.compareAndSet(100, 101, atomicStampedRef.getStamp(),
atomicStampedRef.getStamp() + 1);
atomicStampedRef.compareAndSet(101, 100, atomicStampedRef.getStamp(),
atomicStampedRef.getStamp() + 1);
});
Thread refT2 = new Thread(() -> {
int stamp = atomicStampedRef.getStamp();
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) { }
boolean c3 = atomicStampedRef.compareAndSet(100, 101, stamp, stamp + 1);
System.out.println(c3); // false
});
refT1.start();
refT2.start();
}
}
对象的属性修改类型
- AtomicIntegerFieldUpdater:原子更新整型字段的更新器
- AtomicLongFieldUpdater:原子更新长整型字段的更新器
- AtomicReferenceFieldUpdater:原子更新引用类型字段的更新器
要想原子地更新对象的属性需要两步。
- 第一步,因为对象的属性修改类型原子类都是抽象类,所以每次使用都必须使用静态方法
newUpdater()
创建一个更新器,并且需要设置想要更新的类和属性。 - 第二步,更新的对象属性必须使用 public volatile 修饰符。
上面三个类提供的方法几乎相同,所以我们这里以 AtomicIntegerFieldUpdater
为例子来介绍。
AtomicIntegerFieldUpdater 类使用示例
public class AtomicIntegerFieldUpdaterTest {
public static void main(String[] args) {
AtomicIntegerFieldUpdater<User> a =
AtomicIntegerFieldUpdater.newUpdater(User.class, "age");
User user = new User("Java", 22);
System.out.println(a.getAndIncrement(user)); // 22
System.out.println(a.get(user)); // 23
}
}
class User {
private String name;
public volatile int age;
public User(String name, int age) {
super();
this.name = name;
this.age = age;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}