概念
学习多线程知识肯定听说过CAS,那何为CAS操作呢?
CAS:
- 处理器提供的内存操作指令,保证原子性。
- 操作需要两个参数:一个旧值和一个目标值,修改前先比较旧值是否改变,如果没变,将新值赋给变量,否则不做改变 通过unsafe类去修改内存中的值,同一个内存地址值仅有一个线程修改对应的值
提到原子性我们知道:原子操作可以是一个步骤,也可以是多个操作步骤,但是其顺序是不可以被打乱的,也不可以被切割只执行其中的一部分(不可中断性),将整个操作视为一个整体,资源在该次操作中保持一致,这时原子的核心特征。并且i++它就不是原子操作,存在竟态条件时,线程不安全,需要转变为原子操作才安全,所以有了我们的原子安全类AtomicInteger
CAS举例说明:
比如:i=0 如果有很多线程同时来修改这个i的值,那么首先线程A(0,1)来修改,发现0与旧的值一样,则i就被修改成了1。当线程B(0,1)也来修改,比较不通过,则该线程就会进入重试,重试机制需要自己实现,则线程B现在的旧值会变成1,预期值就是2,然后再去重试,直到成功为止!
手写原子类AtomicInteger
Java中实现原子类用到了魔法类unsafe,它可以直接操作我们的内存条来修改底层的值。ok,我们首先自己的原子类成员变量中肯定要有unsafe类,但这个类是那么好创建的吗?
public class MyAtomicInteger {
static final Unsafe unsafe = Unsafe.getUnsafe();
int i;
public static void main(String[] args) {
System.out.println(unsafe);
}
}
--报错
Exception in thread "main" java.lang.ExceptionInInitializerError
Caused by: java.lang.SecurityException: Unsafe
at sun.misc.Unsafe.getUnsafe(Unsafe.java:90)
at com.dongnaoedu.network.humm.多线程.CAS.MyAtomicInteger.<clinit>(MyAtomicInteger.java:14)
既然没那么好创建,那我们肯定要用我们的黑科技反射了,单例都能被反射破解对吧。而且我们在运用unsafe类的时候需要注意一个概念就是:此类要让我们通过硬件层面去修改某个对象的值,那么你就必须告诉我这个对象是谁?你要修改这个对象的成员变量是哪个?这个对象的成员变量对应在内存的地址值,也就是术语中得偏移量是多少?我们知道我们要修改的对象是MyAtomicInteger,成员变量是i的偏移量是多少?这个得用到unsafe类的一个方法_unsafe_.objectFieldOffset(iFiled)来拿到。ok所以代码大概是这个样子。
public class MyAtomicInteger {
private static Unsafe unsafe = null;
private int i;
private static long valueOffset;
static {
Class<Unsafe> unsafeClass = Unsafe.class;
try {
Field unsafeField = unsafeClass.getDeclaredField("theUnsafe");
unsafeField.setAccessible(true);
unsafe = (Unsafe) unsafeField.get(unsafe);//无任何成员变量
//指定要修改的字段
Field iField = MyAtomicInteger.class.getDeclaredField("i");
valueOffset = unsafe.objectFieldOffset(iField);
} catch (NoSuchFieldException | IllegalAccessException e) {
System.out.println("实例化MyAtomicInteger异常");
}
}
public void add(){
for (;;){
//指定你要修改的对象和该对象的成员变量的偏移量,成员变量的旧值,成员变量的新值
if (unsafe.compareAndSwapInt(this, valueOffset, i, i + 1)){
//成功替换,则跳出循环,否则一直重试
return;
}
}
}
public static void main(String[] args) throws InterruptedException {
//开启10个线程去对int进行i++,加至10000次,看是否成功
MyAtomicInteger myAtomicInteger = new MyAtomicInteger();
IntStream.range(0,10).forEach(value -> {
new Thread(() -> {
for (int j = 0; j < 1000; j++) {
myAtomicInteger.add();
}
System.out.println(Thread.currentThread().getName() + "done");
}).start();
});
TimeUnit.SECONDS.sleep(3);
System.out.println(myAtomicInteger.i);
}
}
//控制台
Thread-0done
Thread-1done
Thread-2done
Thread-3done
Thread-4done
Thread-5done
Thread-6done
Thread-7done
Thread-8done
Thread-9done
10000
CAS存在问题
- 仅针对单个变量的操作,不能用于单个变量来实现原子操作
- 循环+CAS,自选的实现让所有的线程都处于高频运行,争抢cpu执行时间的状态,如果操作长时间不成功,会带来很大的cpu资源消耗
- ABA问题(无法体现数据的变动)
ABA问题举例:
- 假设i=0,假设线程A线程B同时读取到i的值,现在它们都想通过CAS来修改i的值·
- 假设线程A先执行(0,1),线程A执行完成后。那我们线程B预期是会失败的。但一定会失败吗?
- 答案是不一定,因为当A线程执行完成后,可能存在其它线程又执行了(1,0)操作,此时线程B就会执行成功。
此时这会引发什么问题?
当然我们的原子类是不会有啥问题,比如线程A做了+1,然后又做了-1,然后线程B再去做其他操作,这是没啥影响的。但是其它的情况就会出现问题。通过如下代码可以看出其中的端倪,首先是我自定义实现了一个栈Stack,这个栈是一个后入后出,也可以说是先入先出的一个栈。
// 存储在栈里面元素 -- 对象
public class Node {
public final String value;
public Node next;
public Node(String value) {
this.value = value;
}
@Override
public String toString() {
return "value=" + value;
}
}
// 实现一个 栈(后进先出)
public class Stack {
// top cas无锁修改
AtomicReference<Node> top = new AtomicReference<Node>();
public void push(Node node) { // 入栈
Node oldTop;
do {
oldTop = top.get();
node.next = oldTop;
}
while (!top.compareAndSet(oldTop, node)); // CAS 替换栈顶
}
// 出栈 -- 取出栈顶 ,为了演示ABA效果, 增加一个CAS操作的延时
public Node pop(int time) {
Node newTop;
Node oldTop;
do {
oldTop = top.get();
if (oldTop == null) { //如果没有值,就返回null
return null;
}
newTop = oldTop.next;
if (time != 0) { //模拟延时
try {
TimeUnit.SECONDS.sleep(time);
}catch (InterruptedException e){
}
}
}
//将下一个节点设置为top
while (!top.compareAndSet(oldTop, newTop));
return oldTop; //将旧的Top作为值返回
}
}
比如stack分别push三个元素,如下。
Stack stack = new Stack(); stack.push(new Node("a")); stack.push(new Node("b")); stack.push(new Node("c"));
假设
AtomicReference<Node> top
取出一个元素,那么这个元素则是nodeC,那么此NodeC指向的就是nodeB,则通过CAS操作会去比较top当前的引用与nodeC比较,如果一致则将top的引用指向nodeB,同时nodeC被丢弃。同理假设添加一个元素添加一个nodeD,先取出当前引用的元素也就是nodeC,则通过CAS操作去比较当前元素NodeC是不是top的引用,是则修改替换top的引用。
问题引出
ok,上面的文字描述大概就是上面代码的意思,但我们现在定义这么一种情况,现在有两个线程A,B。
线程A想把c出栈,让stack指向nodeB,即threadA—-> stack.pop(800);//睡眠800ms,此时A线程以为stack的旧的值是c,新值是b
- 由于线程A处于睡眠状态,线程B做了一些动作,它依次取出c,b 然后添加d,c。所以此时stack的顺序是a->d->c
- 此时a睡眠结束,它看到旧的值是c,但这时候的c也就是oldNode 是c->d->a,而不是之前看到的c->b>a了,因为他被另外一个线程改了,这时候也就更新了它的地址值。所以既然top对象的引用的oldNode是一样的,但是newTop并不是更新后的新值,它保存的是旧的值,也就是一开始的d->a。然后很高兴做了CAS操作,也成功了。
- 于是乎top的由之前的c->d->a 变为了 b->a但此时它替换新的值确实b,但实际情况是b已经不存在了,就是说原本A线程以为我去掉c,然后stack指向b。但实际情况是此时a下面的小弟已经不是之前的b了,而是换了个d。此时问题得以出现。
所以我们写一个测试类来验证下到底stack剩下的是不是b和a了。
public class Test {
public static void main(String[] args) throws InterruptedException {
// Stack stack = new Stack();
ConcurrentStack stack = new ConcurrentStack();
stack.push(new Node("A"));
stack.push(new Node("B"));
stack.push(new Node("C"));
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "运行");
Node node = stack.pop(3);
System.out.println(Thread.currentThread().getName()+"done...");
},"线程A").start();
TimeUnit.MILLISECONDS.sleep(500);
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "运行");
Node nodeC = stack.pop(0); //取出C
stack.pop(0); //取出B,之后B处于游离状态,可能被回收
stack.push(new Node("D"));//D入栈
stack.push(nodeC); //C入栈
System.out.println(Thread.currentThread().getName()+"done...");
},"线程B").start();
//当活动线程数大于1,则暂停main线程
while (Thread.activeCount() > 1) {
Thread.yield();
}
System.out.println("开始遍历Stack:");
Node node = null;
while ((node = stack.pop(0))!=null){
System.out.println(node.value);
}
}
}
线程A运行
线程B运行
线程Bdone...
线程Adone...
开始遍历Stack:
B
A
可以看出剩下的只剩下b a了,并不是我们想要的d a。那针对上面出现的问题该如何解决呢?很简单,加一个版本号就可以了,因为我们现在CAS比较的只是去比较旧的值,也就是单单只把node变量作为比较对象是不够的,因为我们现在看到的node可能是被别人替换上来的假的node,就像上面看到的那样,你当时看到的node是真的a,但是后面看到的a时a'了,那加一个版本号就可以唯一确定是不是node a了。<br /> 在java中通过CAS操作家版本号的可以通过AtomicStampedReference来取代之前的AtomicReference,就是每比较成功一次CAS则版本+1,代码如下:
public class ConcurrentStack {
// top cas无锁修改
//AtomicReference<Node> top = new AtomicReference<Node>();
AtomicStampedReference<Node> top =
new AtomicStampedReference<>(null, 0);
public void push(Node node) { // 入栈
Node oldTop;
int v;
do {
v = top.getStamp();
oldTop = top.getReference();
node.next = oldTop;
}
while (!top.compareAndSet(oldTop, node, v, v+1)); // CAS 替换栈顶
}
// 出栈 -- 取出栈顶 ,为了演示ABA效果, 增加一个CAS操作的延时
public Node pop(int time) {
Node newTop;
Node oldTop;
int v;
do {
v = top.getStamp();
oldTop = top.getReference();
if (oldTop == null) { //如果没有值,就返回null
return null;
}
newTop = oldTop.next;
if (time != 0) { //模拟延时
try {
TimeUnit.SECONDS.sleep(time);
}catch (Exception e){
}
}
}
while (!top.compareAndSet(oldTop, newTop, v, v+1)); //将下一个节点设置为top
return oldTop; //将旧的Top作为值返回
}
}
线程A运行
线程B运行
线程Bdone...
线程Adone...
开始遍历Stack:
D
A