概念

学习多线程知识肯定听说过CAS,那何为CAS操作呢?
CAS:

  1. 处理器提供的内存操作指令,保证原子性。
  2. 操作需要两个参数:一个旧值和一个目标值,修改前先比较旧值是否改变,如果没变,将新值赋给变量,否则不做改变 通过unsafe类去修改内存中的值,同一个内存地址值仅有一个线程修改对应的值

提到原子性我们知道:原子操作可以是一个步骤,也可以是多个操作步骤,但是其顺序是不可以被打乱的,也不可以被切割只执行其中的一部分(不可中断性),将整个操作视为一个整体,资源在该次操作中保持一致,这时原子的核心特征。并且i++它就不是原子操作,存在竟态条件时,线程不安全,需要转变为原子操作才安全,所以有了我们的原子安全类AtomicInteger

CAS举例说明:

比如:i=0 如果有很多线程同时来修改这个i的值,那么首先线程A(0,1)来修改,发现0与旧的值一样,则i就被修改成了1。当线程B(0,1)也来修改,比较不通过,则该线程就会进入重试,重试机制需要自己实现,则线程B现在的旧值会变成1,预期值就是2,然后再去重试,直到成功为止!

手写原子类AtomicInteger

Java中实现原子类用到了魔法类unsafe,它可以直接操作我们的内存条来修改底层的值。ok,我们首先自己的原子类成员变量中肯定要有unsafe类,但这个类是那么好创建的吗?

public class MyAtomicInteger {

    static final Unsafe unsafe = Unsafe.getUnsafe();
    int i;

    public static void main(String[] args) {
        System.out.println(unsafe);
    }
}
--报错
Exception in thread "main" java.lang.ExceptionInInitializerError
Caused by: java.lang.SecurityException: Unsafe
    at sun.misc.Unsafe.getUnsafe(Unsafe.java:90)
    at com.dongnaoedu.network.humm.多线程.CAS.MyAtomicInteger.<clinit>(MyAtomicInteger.java:14)
既然没那么好创建,那我们肯定要用我们的黑科技反射了,单例都能被反射破解对吧。而且我们在运用unsafe类的时候需要注意一个概念就是:此类要让我们通过硬件层面去修改某个对象的值,那么你就必须告诉我这个对象是谁?你要修改这个对象的成员变量是哪个?这个对象的成员变量对应在内存的地址值,也就是术语中得偏移量是多少?我们知道我们要修改的对象是MyAtomicInteger,成员变量是i的偏移量是多少?这个得用到unsafe类的一个方法_unsafe_.objectFieldOffset(iFiled)来拿到。ok所以代码大概是这个样子。
public class MyAtomicInteger {

    private static Unsafe unsafe = null;
    private int i;
    private static long valueOffset;
    static {
        Class<Unsafe> unsafeClass = Unsafe.class;
        try {
            Field unsafeField = unsafeClass.getDeclaredField("theUnsafe");
            unsafeField.setAccessible(true);
            unsafe = (Unsafe) unsafeField.get(unsafe);//无任何成员变量
            //指定要修改的字段
            Field iField = MyAtomicInteger.class.getDeclaredField("i");
            valueOffset = unsafe.objectFieldOffset(iField);
        } catch (NoSuchFieldException | IllegalAccessException e) {
            System.out.println("实例化MyAtomicInteger异常");
        }
    }




    public void add(){
        for (;;){
            //指定你要修改的对象和该对象的成员变量的偏移量,成员变量的旧值,成员变量的新值
            if (unsafe.compareAndSwapInt(this, valueOffset, i, i + 1)){
                //成功替换,则跳出循环,否则一直重试
                return;
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        //开启10个线程去对int进行i++,加至10000次,看是否成功
        MyAtomicInteger myAtomicInteger = new MyAtomicInteger();
        IntStream.range(0,10).forEach(value -> {
            new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    myAtomicInteger.add();
                }
                System.out.println(Thread.currentThread().getName() + "done");
            }).start();
        });
        TimeUnit.SECONDS.sleep(3);
        System.out.println(myAtomicInteger.i);
    }

}
//控制台
Thread-0done
Thread-1done
Thread-2done
Thread-3done
Thread-4done
Thread-5done
Thread-6done
Thread-7done
Thread-8done
Thread-9done
10000

CAS存在问题

  1. 仅针对单个变量的操作,不能用于单个变量来实现原子操作
  2. 循环+CAS,自选的实现让所有的线程都处于高频运行,争抢cpu执行时间的状态,如果操作长时间不成功,会带来很大的cpu资源消耗
  3. ABA问题(无法体现数据的变动)

ABA问题举例:

  1. 假设i=0,假设线程A线程B同时读取到i的值,现在它们都想通过CAS来修改i的值·
  2. 假设线程A先执行(0,1),线程A执行完成后。那我们线程B预期是会失败的。但一定会失败吗?
  3. 答案是不一定,因为当A线程执行完成后,可能存在其它线程又执行了(1,0)操作,此时线程B就会执行成功。

此时这会引发什么问题?
当然我们的原子类是不会有啥问题,比如线程A做了+1,然后又做了-1,然后线程B再去做其他操作,这是没啥影响的。但是其它的情况就会出现问题。通过如下代码可以看出其中的端倪,首先是我自定义实现了一个栈Stack,这个栈是一个后入后出,也可以说是先入先出的一个栈。

// 存储在栈里面元素 -- 对象
public class Node {
    public final String value;
    public Node next;

    public Node(String value) {
        this.value = value;
    }

    @Override
    public String toString() {
        return "value=" + value;
    }
}
// 实现一个 栈(后进先出)
public class Stack {
    // top cas无锁修改
    AtomicReference<Node> top = new AtomicReference<Node>();

    public void push(Node node) { // 入栈
        Node oldTop;
        do {
            oldTop = top.get();
            node.next = oldTop;
        }
        while (!top.compareAndSet(oldTop, node)); // CAS 替换栈顶
    }


    // 出栈 -- 取出栈顶 ,为了演示ABA效果, 增加一个CAS操作的延时
    public Node pop(int time)  {

        Node newTop;
        Node oldTop;
        do {
            oldTop = top.get();
            if (oldTop == null) {   //如果没有值,就返回null
                return null;
            }
            newTop = oldTop.next;
            if (time != 0) {    //模拟延时
                try {
                    TimeUnit.SECONDS.sleep(time);
                }catch (InterruptedException e){
                }
            }
        }
        //将下一个节点设置为top
        while (!top.compareAndSet(oldTop, newTop));
        return oldTop;      //将旧的Top作为值返回
    }
}

屏幕快照 2020-02-22 下午10.28.24.png

  1. 比如stack分别push三个元素,如下。

    Stack stack = new Stack();
    stack.push(new Node("a"));
    stack.push(new Node("b"));
    stack.push(new Node("c"));
    
  2. 假设AtomicReference<Node> top 取出一个元素,那么这个元素则是nodeC,那么此NodeC指向的就是nodeB,则通过CAS操作会去比较top当前的引用与nodeC比较,如果一致则将top的引用指向nodeB,同时nodeC被丢弃。

  3. 同理假设添加一个元素添加一个nodeD,先取出当前引用的元素也就是nodeC,则通过CAS操作去比较当前元素NodeC是不是top的引用,是则修改替换top的引用。

    问题引出

    ok,上面的文字描述大概就是上面代码的意思,但我们现在定义这么一种情况,现在有两个线程A,B。

  4. 线程A想把c出栈,让stack指向nodeB,即threadA—-> stack.pop(800);//睡眠800ms,此时A线程以为stack的旧的值是c,新值是b

  5. 由于线程A处于睡眠状态,线程B做了一些动作,它依次取出c,b 然后添加d,c。所以此时stack的顺序是a->d->c
  6. 此时a睡眠结束,它看到旧的值是c,但这时候的c也就是oldNode 是c->d->a,而不是之前看到的c->b>a了,因为他被另外一个线程改了,这时候也就更新了它的地址值。所以既然top对象的引用的oldNode是一样的,但是newTop并不是更新后的新值,它保存的是旧的值,也就是一开始的d->a。然后很高兴做了CAS操作,也成功了。
  7. 于是乎top的由之前的c->d->a 变为了 b->a但此时它替换新的值确实b,但实际情况是b已经不存在了,就是说原本A线程以为我去掉c,然后stack指向b。但实际情况是此时a下面的小弟已经不是之前的b了,而是换了个d。此时问题得以出现。

屏幕快照 2020-02-23 下午2.31.39.png
所以我们写一个测试类来验证下到底stack剩下的是不是b和a了。

public class Test {
    public static void main(String[] args) throws InterruptedException {
       // Stack stack = new Stack();
        ConcurrentStack stack = new ConcurrentStack();

        stack.push(new Node("A"));
        stack.push(new Node("B"));
        stack.push(new Node("C"));

        new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + "运行");
                Node node = stack.pop(3);
                System.out.println(Thread.currentThread().getName()+"done...");
        },"线程A").start();

        TimeUnit.MILLISECONDS.sleep(500);

        new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + "运行");
            Node nodeC = stack.pop(0); //取出C
            stack.pop(0);              //取出B,之后B处于游离状态,可能被回收
            stack.push(new Node("D"));//D入栈
            stack.push(nodeC);              //C入栈
            System.out.println(Thread.currentThread().getName()+"done...");
        },"线程B").start();
        //当活动线程数大于1,则暂停main线程
        while (Thread.activeCount() > 1) {
            Thread.yield();
        }
        System.out.println("开始遍历Stack:");
        Node node = null;
        while ((node = stack.pop(0))!=null){
            System.out.println(node.value);
        }
    }
}
线程A运行
线程B运行
线程Bdone...
线程Adone...
开始遍历Stack:
B
A
可以看出剩下的只剩下b a了,并不是我们想要的d a。那针对上面出现的问题该如何解决呢?很简单,加一个版本号就可以了,因为我们现在CAS比较的只是去比较旧的值,也就是单单只把node变量作为比较对象是不够的,因为我们现在看到的node可能是被别人替换上来的假的node,就像上面看到的那样,你当时看到的node是真的a,但是后面看到的a时a'了,那加一个版本号就可以唯一确定是不是node a了。<br />    在java中通过CAS操作家版本号的可以通过AtomicStampedReference来取代之前的AtomicReference,就是每比较成功一次CAS则版本+1,代码如下:
public class ConcurrentStack {
    // top cas无锁修改
    //AtomicReference<Node> top = new AtomicReference<Node>();
    AtomicStampedReference<Node> top =
            new AtomicStampedReference<>(null, 0);

    public void push(Node node) { // 入栈
        Node oldTop;
        int v;
        do {
            v = top.getStamp();
            oldTop = top.getReference();
            node.next = oldTop;
        }
        while (!top.compareAndSet(oldTop, node, v, v+1)); // CAS 替换栈顶
    }


    // 出栈 -- 取出栈顶 ,为了演示ABA效果, 增加一个CAS操作的延时
    public Node pop(int time) {

        Node newTop;
        Node oldTop;
        int v;

        do {
            v = top.getStamp();
            oldTop = top.getReference();
            if (oldTop == null) {   //如果没有值,就返回null
                return null;
            }
            newTop = oldTop.next;
            if (time != 0) {    //模拟延时
                try {
                    TimeUnit.SECONDS.sleep(time);
                }catch (Exception e){

                }
            }
        }
        while (!top.compareAndSet(oldTop, newTop, v, v+1));     //将下一个节点设置为top
        return oldTop;      //将旧的Top作为值返回
    }
}
线程A运行
线程B运行
线程Bdone...
线程Adone...
开始遍历Stack:
D
A