1.CAS

image.png
CAS(Compare and swap)比较和替换是一种通过硬件实现并发安全的常用技术,底层通过利用CPU的CAS指令对缓存加锁或总线加锁方式来实现多处理器之间的原子操作。
它是Java并发包的实现基础。它的实现过程是:有3个操作数,内存值V,旧的预期值E,要修改的新值N,当且仅当预期值E和内存值V相同时,才将内存值V修改为N,否则什么都不做。
image.png
CAS是一种系统原语,原语属于操作系统用语范畴,是由若干条指令组成的,用于完成某个功能的一个过程,并且原语的执行必须是连续的,在执行过程中不允许被中断,也就是说CAS是一条CPU的原子指令,不会造成所谓的数据不一致问题。
CAS获取共享变量时,为了保证该变量的可见性,需要使用volatile修饰,结合CAS和volatile可以实现无锁并发,适用于竞争不激烈,多核CPU的场景下。

2.Unsafe类

Java中CAS操作执行依赖于Unsafe类方法,Unsafe类中的所有方法都是native修饰的,也就是说Unsafe类中的方法都直接调用操作系统底层资源执行相应任务。
变量ValueOffset是该变量在内存中的偏移地址,根据内存偏移地址获取数据。
变量value被volatile修饰,保证了多线程之间的可见性。
image.png

基础操作

  1. //分配内存指定大小的内存
  2. public native long allocateMemory(long bytes);
  3. //设置给定内存地址的值
  4. public native void putAddress(long address, long x);
  5. //获取指定内存地址的值
  6. public native long getAddress(long address);
  7. /操作系统的内存页大小
  8. public native int pageSize();
  9. //传入一个对象的class并创建该实例对象,但不会调用构造方法
  10. public native Object allocateInstance(Class cls) throws InstantiationException;
  11. //获取字段f在实例对象中的偏移量
  12. public native long objectFieldOffset(Field f);
  13. //静态属性的偏移量,用于在对应的Class对象中读写静态属性
  14. public native long staticFieldOffset(Field f);
  15. //返回值就是f.getDeclaringClass()
  16. public native Object staticFieldBase(Field f);
  17. //获得给定对象偏移量上的引用类型的值
  18. public native Object getObject(Object o, long offset);
  19. //设置给定对象偏移量上的引用类型的值
  20. public native void putObject(Object o, long offset, Object x);
  21. //获取数组第一个元素的偏移地址
  22. public native int arrayBaseOffset(Class arrayClass);
  23. //数组中一个元素占据的内存空间,arrayBaseOffset与arrayIndexScale配合使用,可定位数组中每个元素在内存中的位置
  24. public native int arrayIndexScale(Class arrayClass);
public class UnSafeDemo {

    public  static  void main(String[] args) throws NoSuchFieldException, IllegalAccessException, InstantiationException {
        // 通过反射得到theUnsafe对应的Field对象
        Field field = Unsafe.class.getDeclaredField("theUnsafe");
        // 设置该Field为可访问
        field.setAccessible(true);
        // 通过Field得到该Field对应的具体对象,传入null是因为该Field为static的
        Unsafe unsafe = (Unsafe) field.get(null);
        System.out.println(unsafe);

        //通过allocateInstance直接创建对象
        User user = (User) unsafe.allocateInstance(User.class);

        Class userClass = user.getClass();
        Field name = userClass.getDeclaredField("name");
        Field age = userClass.getDeclaredField("age");
        Field id = userClass.getDeclaredField("id");

        //获取实例变量name和age在对象内存中的偏移量并设置值
        unsafe.putInt(user,unsafe.objectFieldOffset(age),18);
        unsafe.putObject(user,unsafe.objectFieldOffset(name),"android TV");

        // 这里返回 User.class,
        Object staticBase = unsafe.staticFieldBase(id);
        System.out.println("staticBase:"+staticBase);

        //获取静态变量id的偏移量staticOffset
        long staticOffset = unsafe.staticFieldOffset(userClass.getDeclaredField("id"));
        //获取静态变量的值
        System.out.println("设置前的ID:"+unsafe.getObject(staticBase,staticOffset));
        //设置值
        unsafe.putObject(staticBase,staticOffset,"SSSSSSSS");
        //获取静态变量的值
        System.out.println("设置前的ID:"+unsafe.getObject(staticBase,staticOffset));
        //输出USER
        System.out.println("输出USER:"+user.toString());

        long data = 1000;
        byte size = 1;//单位字节

        //调用allocateMemory分配内存,并获取内存地址memoryAddress
        long memoryAddress = unsafe.allocateMemory(size);
        //直接往内存写入数据
        unsafe.putAddress(memoryAddress, data);
        //获取指定内存地址的数据
        long addrData=unsafe.getAddress(memoryAddress);
        System.out.println("addrData:"+addrData);

        /**
         * 输出结果:
         sun.misc.Unsafe@6f94fa3e
         staticBase:class geym.conc.ch4.atomic.User
         设置前的ID:USER_ID
         设置前的ID:SSSSSSSS
         输出USER:User{name='android TV', age=18', id=SSSSSSSS'}
         addrData:1000
         */

    }
}

class User{
    public User(){
        System.out.println("user 构造方法被调用");
    }
    private String name;
    private int age;
    private static String id="USER_ID";

    @Override
    public String toString() {
        return "User{" +
                "name='" + name + '\'' +
                ", age=" + age +'\'' +
                ", id=" + id +'\'' +
                '}';
    }
}

CAS操作相关

在Java中无锁操作CAS基于以下3个方法实现,Atomic系列内部方法是基于下述方法的实现的。

//第一个参数o为给定对象,offset为对象内存的偏移量,通过这个偏移量迅速定位字段并设置或获取该字段的值,
//expected表示期望值,x表示要设置的值,下面3个方法都通过CAS原子指令执行操作。
public final native boolean compareAndSwapObject(Object o, long offset,Object expected, Object x);                                                                                                  

public final native boolean compareAndSwapInt(Object o, long offset,int expected,int x);

public final native boolean compareAndSwapLong(Object o, long offset,long expected,long x);

挂起与恢复

将一个线程进行挂起是通过park方法实现的,调用 park后,线程将一直阻塞直到超时或者中断等条件出现。unpark可以终止一个挂起的线程,使其恢复正常。
Java对线程的挂起操作被封装在 LockSupport类中。

//线程调用该方法,线程将一直阻塞直到超时,或者是中断条件出现。  
public native void park(boolean var1, long var2);  

//终止挂起的线程,恢复正常 java.util.concurrent包中挂起操作都是在LockSupport类实现的,其底层正是使用这两个方法,  
public native void unpark(Object var1);

内存屏障

包含loadFence、storeFence、fullFence等方法,这些方法是在Java 8新引入的,用于定义内存屏障,避免代码重排序。

//在该方法之前的所有读操作,一定在load屏障之前执行完成
public native void loadFence();
//在该方法之前的所有写操作,一定在store屏障之前执行完成
public native void storeFence();
/在该方法之前的所有读写操作,一定在full屏障之前执行完成,这个内存屏障相当于上面两个的合体功能
public native void fullFence();

3.原子操作类

从JDK 1.5开始提供了java.util.concurrent.atomic包,在该包中提供了许多基于CAS实现的原子操作类,用法方便,性能高效,主要分以下4种类型。

原子更新基本类型

  • AtomicBoolean:原子更新布尔类型
  • AtomicInteger:原子更新整型
  • AtomicLong:原子更新长整型

以AtomicInteger为例进行分析,AtomicInteger主要是针对int类型的数据执行原子操作,它提供了原子自增方法、原子自减方法以及原子赋值方法等。

public class AtomicInteger extends Number implements java.io.Serializable {
    private static final long serialVersionUID = 6214790243416807050L;

    // 获取指针类Unsafe
    private static final Unsafe unsafe = Unsafe.getUnsafe();

    //下述变量value在AtomicInteger实例对象内的内存偏移量
    private static final long valueOffset;

    static {
        try {
           //通过unsafe类的objectFieldOffset()方法,获取value变量在对象内存中的偏移
           //通过该偏移量valueOffset,unsafe类的内部方法可以获取到变量value对其进行取值或赋值操作
            valueOffset = unsafe.objectFieldOffset
                (AtomicInteger.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }
   //当前AtomicInteger封装的int变量value
    private volatile int value;

    public AtomicInteger(int initialValue) {
        value = initialValue;
    }
    public AtomicInteger() {
    }
   //获取当前最新值,
    public final int get() {
        return value;
    }
    //设置当前值,具备volatile效果,方法用final修饰是为了更进一步的保证线程安全。
    public final void set(int newValue) {
        value = newValue;
    }
    //最终会设置成newValue,使用该方法后可能导致其他线程在之后的一小段时间内可以获取到旧值,有点类似于延迟加载
    public final void lazySet(int newValue) {
        unsafe.putOrderedInt(this, valueOffset, newValue);
    }
   //设置新值并获取旧值,底层调用的是CAS操作即unsafe.compareAndSwapInt()方法
    public final int getAndSet(int newValue) {
        return unsafe.getAndSetInt(this, valueOffset, newValue);
    }
   //如果当前值为expect,则设置为update(当前值指的是value变量)
    public final boolean compareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }
    //当前值加1返回旧值,底层CAS操作
    public final int getAndIncrement() {
        return unsafe.getAndAddInt(this, valueOffset, 1);
    }
    //当前值减1,返回旧值,底层CAS操作
    public final int getAndDecrement() {
        return unsafe.getAndAddInt(this, valueOffset, -1);
    }
   //当前值增加delta,返回旧值,底层CAS操作
    public final int getAndAdd(int delta) {
        return unsafe.getAndAddInt(this, valueOffset, delta);
    }
    //当前值加1,返回新值,底层CAS操作
    public final int incrementAndGet() {
        return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
    }
    //当前值减1,返回新值,底层CAS操作
    public final int decrementAndGet() {
        return unsafe.getAndAddInt(this, valueOffset, -1) - 1;
    }
   //当前值增加delta,返回新值,底层CAS操作
    public final int addAndGet(int delta) {
        return unsafe.getAndAddInt(this, valueOffset, delta) + delta;
    }
   //省略一些不常用的方法....
}

可以发现AtomicInteger原子类的内部几乎是基于前面分析过Unsafe类中的CAS相关操作的方法实现的,这也同时证明AtomicInteger是基于无锁实现的。 AtomicInteger类中所有自增或自减的方法都间接调用Unsafe类中的getAndAddInt()方法实现了CAS操作,从而保证了线程安全,

//当前值加1,返回新值,底层CAS操作
public final int incrementAndGet() {
     return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
 }

//给定对象o,根据获取内存偏移量指向的字段,将其增加delta
//这是一个CAS操作过程,直到设置成功方能退出循环,返回旧值
 public final int getAndAddInt(Object o, long offset, int delta) {
     int v;
     do {
         //获取内存中最新值
         v = getIntVolatile(o, offset);
       //通过CAS操作
     } while (!compareAndSwapInt(o, offset, v, v + delta));
     return v;
 }
//给定对象o,根据获取内存偏移量对于字段,将其设置为新值newValue,
 public final int getAndSetInt(Object o, long offset, int newValue) {
     int v;
     do {
         v = getIntVolatile(o, offset);
     } while (!compareAndSwapInt(o, offset, v, newValue));
     return v;
 }

AtomicInteger使用方式

public class AtomicIntegerDemo {
    //创建AtomicInteger,用于自增操作
    static AtomicInteger i=new AtomicInteger();

    public static class AddThread implements Runnable{
        public void run(){
           for(int k=0;k<10000;k++)
               i.incrementAndGet();
        }

    }
    public static void main(String[] args) throws InterruptedException {
        Thread[] ts=new Thread[10];
        //开启10条线程同时执行i的自增操作
        for(int k=0;k<10;k++){
            ts[k]=new Thread(new AddThread());
        }
        //启动线程
        for(int k=0;k<10;k++){ts[k].start();}

        for(int k=0;k<10;k++){ts[k].join();}

        System.out.println(i);//输出结果:100000
    }
}

原子更新引用

AtomicReference原子类

public class AtomicReference<V> implements java.io.Serializable {
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;

    static {
        try {
            valueOffset = unsafe.objectFieldOffset
                (AtomicReference.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }
    //内部变量value,Unsafe类通过valueOffset内存偏移量即可获取该变量
    private volatile V value;

//CAS方法,间接调用unsafe.compareAndSwapObject(),它是一个
//实现了CAS操作的native方法
public final boolean compareAndSet(V expect, V update) {
        return unsafe.compareAndSwapObject(this, valueOffset, expect, update);
}

//设置并获取旧值
public final V getAndSet(V newValue) {
        return (V)unsafe.getAndSetObject(this, valueOffset, newValue);
    }
    //省略其他代码......
}

//Unsafe类中的getAndSetObject方法,实际调用还是CAS操作
public final Object getAndSetObject(Object o, long offset, Object newValue) {
      Object v;
      do {
          v = getObjectVolatile(o, offset);
      } while (!compareAndSwapObject(o, offset, v, newValue));
      return v;
  }

AtomicReference使用方式

public class AtomicReferenceDemo2 {

    public static AtomicReference<User> atomicUserRef = new AtomicReference<User>();

    public static void main(String[] args) {
        User user = new User("zejian", 18);
        atomicUserRef.set(user);
        User updateUser = new User("Shine", 25);
        atomicUserRef.compareAndSet(user, updateUser);
        //执行结果:User{name='Shine', age=25}
              System.out.println(atomicUserRef.get().toString());  
    }

    static class User {
        public String name;
        private int age;

        public User(String name, int age) {
            this.name = name;
            this.age = age;
        }

        public String getName() {
            return name;
        }

        @Override
        public String toString() {
            return "User{" +
                    "name='" + name + '\'' +
                    ", age=" + age +
                    '}';
        }
    }
}

原子更新数组

原子更新数组指的是通过原子的方式更新数组里的某个元素,主要有以下3个类

  • AtomicIntegerArray:原子更新整数数组里的元素
  • AtomicLongArray:原子更新长整数数组里的元素
  • AtomicReferenceArray:原子更新引用类型数组里的元素

    以AtomicIntegerArray为例进行分析

    public class AtomicIntegerArray implements java.io.Serializable {
      //获取unsafe类的实例对象
      private static final Unsafe unsafe = Unsafe.getUnsafe();
      //获取数组的第一个元素内存起始地址
      private static final int base = unsafe.arrayBaseOffset(int[].class);
    
      private static final int shift;
      //内部数组
      private final int[] array;
    
      static {
          //获取数组中一个元素占据的内存空间 int 4 
          int scale = unsafe.arrayIndexScale(int[].class);
          //判断是否为2的次幂,一般为2的次幂否则抛异常
          if ((scale & (scale - 1)) != 0)
              throw new Error("data type scale not a power of two");
          //计算出scale的前导零个数(必须是连续的) 31-29
          shift = 31 - Integer.numberOfLeadingZeros(scale);
      }
    
      private long checkedByteOffset(int i) {
          if (i < 0 || i >= array.length)
              throw new IndexOutOfBoundsException("index " + i);
    
          return byteOffset(i);
      }
      //计算数组中每个元素的的内存地址
      /*
      address = base + 0 * 4 即address= base + 0 << 2
      ddress = base + 1 * 4 即address= base + 1 << 2
      address = base + 2 * 4 即address= base + 2 << 2
      */
      private static long byteOffset(int i) {
          return ((long) i << shift) + base;
      }
    
      //执行自增操作,返回旧值,i是指数组元素下标
      public final int getAndIncrement(int i) {
          return getAndAdd(i, 1);
      }
      //指定下标元素执行自增操作,并返回新值
      public final int incrementAndGet(int i) {
          return getAndAdd(i, 1) + 1;
      }
    
      //指定下标元素执行自减操作,并返回新值
      public final int decrementAndGet(int i) {
          return getAndAdd(i, -1) - 1;
      }
    
      //间接调用unsafe.getAndAddInt()方法
      public final int getAndAdd(int i, int delta) {
          return unsafe.getAndAddInt(array, checkedByteOffset(i), delta);
      }
    }
    

    AtomicIntegerArray 使用方式

    public class AtomicIntegerArrayDemo {
      static AtomicIntegerArray arr = new AtomicIntegerArray(10);
    
      public static class AddThread implements Runnable{
          public void run(){
             for(int k=0;k<10000;k++)
                 //执行数组中元素自增操作,参数为index,即数组下标
                 arr.getAndIncrement(k%arr.length());
          }
      }
      public static void main(String[] args) throws InterruptedException {
    
          Thread[] ts=new Thread[10];
          //创建10条线程
          for(int k=0;k<10;k++){
              ts[k]=new Thread(new AddThread());
          }
          //启动10条线程
          for(int k=0;k<10;k++){ts[k].start();}
          for(int k=0;k<10;k++){ts[k].join();}
          //执行结果
          //[10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000]
          System.out.println(arr);
      }
    }
    

    原子更新属性

    普通的变量也享受原子操作,可以使用原子更新字段类。
    Atomic并发包提供了以下三个类:

  • AtomicIntegerFieldUpdater:原子更新整型的字段的更新器。

  • AtomicLongFieldUpdater:原子更新长整型字段的更新器。
  • AtomicReferenceFieldUpdater:原子更新引用类型里的字段。

原子更新器的使用存在以下条件

  • 操作的字段不能是static类型。
  • 操作的字段不能是final类型的,因为final根本没法修改。
  • 字段必须是volatile修饰的,也就是数据本身是读一致的。
  • 属性必须对当前的Updater所在的区域是可见的,如果不是当前类内部进行原子更新器操作不能使用private,protected子类操作父类时修饰符必须是protect权限及以上,如果在同一个package下则必须是default权限及以上,也就是说无论何时都应该保证操作类与被操作类间的可见性。

AtomicIntegerFieldUpdater的实现原理是反射和Unsafe类结合,AtomicIntegerFieldUpdater是个抽象类,实际实现类为AtomicIntegerFieldUpdaterImpl。

public abstract class AtomicIntegerFieldUpdater<T> {

    public static <U> AtomicIntegerFieldUpdater<U> newUpdater(Class<U> tclass,String fieldName) {
         //实际实现类AtomicIntegerFieldUpdaterImpl                                          
        return new AtomicIntegerFieldUpdaterImpl<U>
            (tclass, fieldName, Reflection.getCallerClass());
    }
 }
 private static class AtomicIntegerFieldUpdaterImpl<T>
            extends AtomicIntegerFieldUpdater<T> {
        private static final Unsafe unsafe = Unsafe.getUnsafe();
        private final long offset;//内存偏移量
        private final Class<T> tclass;
        private final Class<?> cclass;

        AtomicIntegerFieldUpdaterImpl(final Class<T> tclass,
                                      final String fieldName,
                                      final Class<?> caller) {
            final Field field;//要修改的字段
            final int modifiers;//字段修饰符
            try {
                field = AccessController.doPrivileged(
                    new PrivilegedExceptionAction<Field>() {
                        public Field run() throws NoSuchFieldException {
                            return tclass.getDeclaredField(fieldName);//反射获取字段对象
                        }
                    });
                    //获取字段修饰符
                modifiers = field.getModifiers();
            //对字段的访问权限进行检查,不在访问范围内抛异常
                sun.reflect.misc.ReflectUtil.ensureMemberAccess(
                    caller, tclass, null, modifiers);
                ClassLoader cl = tclass.getClassLoader();
                ClassLoader ccl = caller.getClassLoader();
                if ((ccl != null) && (ccl != cl) &&
                    ((cl == null) || !isAncestor(cl, ccl))) {
              sun.reflect.misc.ReflectUtil.checkPackageAccess(tclass);
                }
            } catch (PrivilegedActionException pae) {
                throw new RuntimeException(pae.getException());
            } catch (Exception ex) {
                throw new RuntimeException(ex);
            }

            Class<?> fieldt = field.getType();
            //判断是否为int类型
            if (fieldt != int.class)
                throw new IllegalArgumentException("Must be integer type");
            //判断是否被volatile修饰
            if (!Modifier.isVolatile(modifiers))
                throw new IllegalArgumentException("Must be volatile type");

            this.cclass = (Modifier.isProtected(modifiers) &&
                           caller != tclass) ? caller : null;
            this.tclass = tclass;
            //获取该字段的在对象内存的偏移量,通过内存偏移量可以获取或者修改该字段的值
            offset = unsafe.objectFieldOffset(field);
        }
   }
public int incrementAndGet(T obj) {
        int prev, next;
        do {
            prev = get(obj);
            next = prev + 1;
            //CAS操作
        } while (!compareAndSet(obj, prev, next));
        return next;
}

//最终调用的还是unsafe.compareAndSwapInt()方法
public boolean compareAndSet(T obj, int expect, int update) {
            if (obj == null || obj.getClass() != tclass || cclass != null) fullCheck(obj);
            return unsafe.compareAndSwapInt(obj, offset, expect, update);
        }

使用方式

/*
使用AtomicIntegerFieldUpdater更新候选人(Candidate)的分数score,开启了10000条线程投票,当随机值大于0.4时算一票,分数自增一次,其中allScore用于验证分数是否正确(其实用于验证AtomicIntegerFieldUpdater更新的字段是否线程安全),当allScore与score相同时,则说明投票结果无误,也代表AtomicIntegerFieldUpdater能正确更新字段score的值,是线程安全的。

AtomicReferenceFieldUpdater注明泛型时需要两个泛型参数,一个是修改的类类型,一个修改字段的类型。
*/
public class AtomicIntegerFieldUpdaterDemo {
    public static class Candidate{
        int id;
        volatile int score;
    }

    public static class Game{
        int id;
        volatile String name;

        public Game(int id, String name) {
            this.id = id;
            this.name = name;
        }

        @Override
        public String toString() {
            return "Game{" +
                    "id=" + id +
                    ", name='" + name + '\'' +
                    '}';
        }
    }

    static AtomicIntegerFieldUpdater<Candidate> atIntegerUpdater
        = AtomicIntegerFieldUpdater.newUpdater(Candidate.class, "score");

    static AtomicReferenceFieldUpdater<Game,String> atRefUpdate =
            AtomicReferenceFieldUpdater.newUpdater(Game.class,String.class,"name");


    //用于验证分数是否正确
    public static AtomicInteger allScore=new AtomicInteger(0);


    public static void main(String[] args) throws InterruptedException {
        final Candidate stu=new Candidate();
        Thread[] t=new Thread[10000];
        //开启10000个线程
        for(int i = 0 ; i < 10000 ; i++) {
            t[i]=new Thread() {
                public void run() {
                    if(Math.random()>0.4){
                        atIntegerUpdater.incrementAndGet(stu);
                        allScore.incrementAndGet();
                    }
                }
            };
            t[i].start();
        }

        for(int i = 0 ; i < 10000 ; i++) {  t[i].join();}
        System.out.println("最终分数score="+stu.score);
        System.out.println("校验分数allScore="+allScore);

        //AtomicReferenceFieldUpdater 简单的使用
        Game game = new Game(2,"zh");
        atRefUpdate.compareAndSet(game,game.name,"JAVA-HHH");
        System.out.println(game.toString());

        /**
         * 输出结果:
         * 最终分数score=5976
           校验分数allScore=5976
           Game{id=2, name='JAVA-HHH'}
         */
    }
}

LongAdder

ABA

CAS是当且仅当旧的预期值E和内存值V相同时,才将内存值V修改为U,也就是如果内存值V没有发生变化则更新,但是有可能发生内存值原来是A,中间被改成B,后来又被改成A,此时再使用CAS进行检查时发现没有变化,但是实际上发生了变化,这就是ABA问题。
image.png
Java并发包下的AtomicStampedReference、AtomicMarkableReference可以解决ABA问题。

AtomicStampedReference

AtomicStampedReference内部实现上添加了一个类似于版本号作用的stamp属性,它是被自动更新的。实现上首先检查当前引用是否等于预期引用、当前stamp是否等于预期stamp,如果全部相等,则以原子方式将该引用和该stamp的值设置为给定的更新值。
实现原理

public class AtomicStampedReference<V> {
    //通过Pair内部类存储数据和时间戳
    private static class Pair<T> {
        final T reference;
        final int stamp;
        private Pair(T reference, int stamp) {
            this.reference = reference;
            this.stamp = stamp;
        }
        static <T> Pair<T> of(T reference, int stamp) {
            return new Pair<T>(reference, stamp);
        }
    }
    //存储数值和时间的内部类
    private volatile Pair<V> pair;

    //构造器,创建时需传入初始值和时间初始值
    public AtomicStampedReference(V initialRef, int initialStamp) {
        pair = Pair.of(initialRef, initialStamp);
    }

    //compareAndSet方法
    public boolean compareAndSet(V   expectedReference,
                                 V   newReference,
                                 int expectedStamp,
                                 int newStamp) {
        Pair<V> current = pair;
        return
            expectedReference == current.reference &&
            expectedStamp == current.stamp &&
            ((newReference == current.reference &&
              newStamp == current.stamp) ||
             casPair(current, Pair.of(newReference, newStamp)));
    }

    //casPair方法
    private boolean casPair(Pair<V> cmp, Pair<V> val) {
        return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
    }
}

使用方式

public class ABADemo {

    static AtomicInteger atIn = new AtomicInteger(100);

    //初始化时需要传入一个初始值和初始时间
    static AtomicStampedReference<Integer> atomicStampedR =
            new AtomicStampedReference<Integer>(100,0);


    static Thread t1 = new Thread(new Runnable() {
        @Override
        public void run() {
            //更新为200
            atIn.compareAndSet(100, 200);
            //更新为100
            atIn.compareAndSet(200, 100);
        }
    });


    static Thread t2 = new Thread(new Runnable() {
        @Override
        public void run() {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            boolean flag=atIn.compareAndSet(100,500);
            System.out.println("flag:"+flag+",newValue:"+atIn);
        }
    });


    static Thread t3 = new Thread(new Runnable() {
        @Override
        public void run() {
            int time=atomicStampedR.getStamp();
            //更新为200
            atomicStampedR.compareAndSet(100, 200,time,time+1);
            //涉及到自动拆装箱,而Integer类的静态方法valueOf中大于127的数(如200和200)是不等的
            Integer value2 = atomicStampedR.getReference(); 
            //更新为100
            int time2=atomicStampedR.getStamp();
            atomicStampedR.compareAndSet(value2, 100,time2,time2+1);
        }
    });


    static Thread t4 = new Thread(new Runnable() {
        @Override
        public void run() {
            int time = atomicStampedR.getStamp();
            System.out.println("sleep 前 t4 time:"+time);
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            boolean flag=atomicStampedR.compareAndSet(100,500,time,time+1);
            System.out.println("flag:"+flag+",newValue:"+atomicStampedR.getReference());
        }
    });

    public static  void  main(String[] args) throws InterruptedException {
        t1.start();
        t2.start();
        t1.join();
        t2.join();

        t3.start();
        t4.start();
        /**
         * 输出结果:
         flag:true,newValue:500
         sleep 前 t4 time:0
         flag:false,newValue:200
         */
    }
}

AtomicMarkableReference

AtomicStampedReference内部维护的是boolean类型的标识,也就代表着它并不像之前的AtomicStampedReference内部的维护的时间戳一样值会不断的递增。只能在一定程度上减少ABA问题的出现,它并不能完全的杜绝ABA问题。

public class ABAIssue {
    static AtomicMarkableReference<Integer> amRef = new AtomicMarkableReference<Integer>(100, false);

    private static Thread t1 = new Thread(() -> {
        boolean mark = amRef.isMarked();
        System.out.println("线程T1:修改前标志 Mrak:" + mark + "....");
        // 将值更新为200
        System.out.println("线程T1:100 --> 200.... 修改后返回值 Result:" + amRef.compareAndSet(amRef.getReference(), 200, mark, !mark));
    });

    private static Thread t2 = new Thread(() -> {
        boolean mark = amRef.isMarked();
        System.out.println("线程T2:修改前标志 Mrak:" + mark + "....");
        // 将值更新回100
        System.out.println("线程T2:200 --> 100.... 修改后返回值 Result:" + amRef.compareAndSet(amRef.getReference(), 100, mark, !mark));
    });

    private static Thread t3 = new Thread(() -> {
        boolean mark = amRef.isMarked();
        System.out.println("线程T3:休眠前标志 Mrak:" + mark + "....");
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        boolean flag = amRef.compareAndSet(100, 500, mark, !mark);
        System.out.println("线程T3: 100 --> 500.... flag:" + flag + ",newValue:" + amRef.getReference());
    });

    public static void main(String[] args) throws InterruptedException {
        t1.start();
        t1.join();
        t2.start();
        t2.join();
        t3.start();

        /**
         * 输出结果如下:
         *    线程T1:修改前标志 Mrak:false....
         *    线程T1:100 --> 200.... 修改后返回值 Result:true
         *    线程T2:修改前标志 Mrak:true....
         *    线程T2:200 --> 100.... 修改后返回值 Result:true
         *    线程T3:休眠前标志 Mrak:false....
         *    线程T3: 100 --> 500.... flag:true,newValue:500  
         */

         /* t3线程执行完成后结果还是成功更新为500,代表t1、t2
          线程所做的修改操作对于t3线程来说还是不可见的 */
    }
}

4.底层原理

最终调用Unsafe类的compareAndSwapInt方法,进入这个方法发现使用了native修饰,也就是这里将会通过JNI调用非Java实现的代码。
需要进入OpenJDK源码查看c++的代码,代码跟踪顺序是:unsafe.cpp、atomic.cpp,接下来会根据操作系统和处理器的不同来选择对应的调用代码,这里以Windows和x86处理器为例进入atomic_window_x86.inline.hpp,重要代码片段如下

inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) {    
    int mp = os::isMP(); 
    // 判断处理器的类型是否是多处理器    
    _asm {        
        mov edx, dest         
        mov ecx, exchange_value        
        mov eax, compare_value         
        LOCK_IF_MP(mp)        
            cmpxchg dword ptr [edx], ecx   
        } 
}

从上面代码可以看到,如果是多处理器,将会为cmpxchg指令添加lock前缀,否则不添加(单处理器自身会维护执行顺序)。对于lock前缀,下面是intel手册的说明:

  • 确保对内存读改写操作的原子执行。 在Pentium及之前的处理器中,带有lock前缀的指令在执行期间会锁住总线,使得其它处理器暂时无法通过总线访问内存,很显然,这个开销很大。在新的处理器中,Intel使用缓存锁定来保证指令执行的原子性。缓存锁定将大大降低lock前缀指令的执行开销。
  • 禁止该指令,与前面和后面的读写指令重排序。
  • 把写缓冲区的所有数据刷新到内存中。

也就是说,如果是多处理器,通过带lock前缀的cmpxchg指令对缓存加锁或总线加锁的方式来实现多处理器之间的原子操作;如果是单处理器,通过cmpxchg指令完成原子操作。

手写自旋锁

参考

无锁CAS与Unsafe类及其并发包Atomic
深入理解Java并发编程之无锁CAS机制、魔法类Unsafe、原子包Atomic