一、无锁解决线程安全问题

1.引例

原子整数使用:

  1. AtomicInteger balance = new AtomicInteger();
  1. package panw.unlocked;
  2. import lombok.extern.slf4j.Slf4j;
  3. import java.util.ArrayList;
  4. import java.util.concurrent.atomic.AtomicInteger;
  5. public class AcountTest {
  6. public static void main(String[] args) {
  7. UnsafeAccount unsafeAccount = new UnsafeAccount(10000);
  8. Account.demo(unsafeAccount);
  9. // SafeAccount safeAccount = new SafeAccount(10000);
  10. // Account.demo(safeAccount);
  11. }
  12. }
  13. interface Account{
  14. Integer getBalance();
  15. void withDraw(Integer amount);
  16. static void demo(Account account){
  17. ArrayList<Thread> list = new ArrayList<>();
  18. long start = System.nanoTime();
  19. for (int i = 0; i < 1000; i++) {
  20. list.add(new Thread(()->{
  21. account.withDraw(10);
  22. }));
  23. }
  24. list.forEach(Thread::start);
  25. list.forEach((t)->{
  26. try {
  27. t.join();
  28. } catch (InterruptedException e) {
  29. throw new RuntimeException(e);
  30. }
  31. });
  32. long end = System.nanoTime();
  33. System.out.println(account.getBalance() + " cost: " + (end - start) / 1000_000 + " ms");
  34. }
  35. }
  36. class UnsafeAccount implements Account{
  37. private Integer balance;
  38. public UnsafeAccount(Integer balance) {
  39. this.balance = balance;
  40. }
  41. @Override
  42. public Integer getBalance() {
  43. return this.balance;
  44. }
  45. @Override
  46. public void withDraw(Integer amount) {
  47. synchronized (this){
  48. this.balance -= amount;
  49. }
  50. }
  51. }
  52. class SafeAccount implements Account{
  53. private AtomicInteger balance;
  54. public SafeAccount(int balance) {
  55. this.balance = new AtomicInteger(balance);
  56. }
  57. @Override
  58. public Integer getBalance() {
  59. return balance.get();
  60. }
  61. @Override
  62. public void withDraw(Integer amount) {
  63. // balance.getAndAdd(-amount);
  64. while (true){
  65. int prev = balance.get();
  66. int next = prev - amount;
  67. if (balance.compareAndSet(prev,next)){
  68. break;
  69. }
  70. }
  71. }
  72. }

以上的代码UnsafeAccount只有加上了synchronized代码块才能保证操作的原子性,但是这样相比于CAS(compare and swap)效率查了很多,那么CAS到底是什么呢?下面我们来看看

二、CAS与volatile

CAS是原子操作的一种,可用于在多线程编程中实现不被打断的数据交换操作,从而避免多线程同时改写某一数据时由于执行顺序不确定性以及中断的不可预知性产生的数据不一致问题。

1.三个操作参数

  1. 内存位置V(它的值是我们想要去更新的)
  2. 预期原值A(上一次从内存中读取的值)
  3. 新值B(应该写入的新值)

    2.CAS的操作过程

    将内存位置V的值与A比较(compare)
  • 如果相等,则说明没有其它线程来修改过这个值,所以把内存V的的值更新成B(swap)
  • 如果不相等,说明V上的值被修改过了,不更新,而是返回当前V的值,再重新执行一次任务再继续这个过程。

    3.volatile

    volatile 仅仅保证了共享变量的可见性,让其它线程能够看到新值,但不能解决指令交错问题(不能保证原子性)
    CAS 必须借助 volatile 才能读取到共享变量的新值来实现【比较并交换】的效果

    4.CAS特点

    结合 CAS 和 volatile 可以实现无锁并发,适用于线程数少、多核 CPU 的场景下。

  • CAS 是基于乐观锁的思想:乐观的估计,不怕别的线程来修改共享变量,就算改了也没关系,我再重试。

  • synchronized 是基于悲观锁的思想:悲观的估计,得防着其它线程来修改共享变量,我上了锁你们都别想改,我改完了解开锁,你们才有机会。
  • CAS 体现的是无锁并发、无阻塞并发

    • 因为没有使用 synchronized,所以线程不会陷入阻塞,这是效率提升的因素之一
    • 但如果竞争激烈,可以想到重试必然频繁发生,反而效率会受影响

      三、原子整数

      JUC并发包里面提供了
  • AtomicBoolean

  • AtomicInteger
  • AtomicLong

具体使用差不多,我们以AtomicInteger举例

AtomicInteger

  1. package panw.JUC;
  2. import java.util.concurrent.atomic.AtomicInteger;
  3. public class AtomicIntegerTest {
  4. public static void main(String[] args) {
  5. AtomicInteger ai = new AtomicInteger(1);
  6. ai.getAndIncrement();
  7. ai.incrementAndGet();
  8. ai.getAndDecrement();
  9. ai.decrementAndGet();
  10. ai.getAndAdd(5);
  11. ai.getAndAdd(-5);
  12. ai.getAndUpdate((a)->a*10);
  13. for (int i = 0; i < 3; i++) {
  14. int andAccumulate = ai.getAndAccumulate(5, (a, b) -> a * b);
  15. System.out.println(andAccumulate);
  16. }
  17. }
  18. }

四、原子引用

AtomicReference和AtomicInteger非常类似,不同之处就在于AtomicInteger是对整数的封装,而AtomicReference则对应普通的对象引用。也就是它可以保证你在修改对象引用时的线程安全性。

  1. package panw.JUC;
  2. import java.math.BigDecimal;
  3. import java.util.ArrayList;
  4. import java.util.concurrent.atomic.AtomicReference;
  5. public class AtomicReferenceTest {
  6. public static void main(String[] args) {
  7. SafeAccount safeAccount = new SafeAccount(new BigDecimal(10000));
  8. Account.demo(safeAccount);
  9. }
  10. }
  11. interface Account {
  12. BigDecimal getBalance();
  13. void withDraw(BigDecimal amount);
  14. static void demo(Account account) {
  15. ArrayList<Thread> list = new ArrayList<>();
  16. long start = System.nanoTime();
  17. for (int i = 0; i < 1000; i++) {
  18. list.add(new Thread(() -> {
  19. account.withDraw(BigDecimal.TEN);
  20. }));
  21. }
  22. list.forEach(Thread::start);
  23. list.forEach((t) -> {
  24. try {
  25. t.join();
  26. } catch (InterruptedException e) {
  27. throw new RuntimeException(e);
  28. }
  29. });
  30. long end = System.nanoTime();
  31. System.out.println(account.getBalance() + " cost: " + (end - start) / 1000_000 + " ms");
  32. }
  33. }
  34. class SafeAccount implements Account {
  35. private AtomicReference<BigDecimal> balance;
  36. public SafeAccount(BigDecimal balance) {
  37. this.balance = new AtomicReference<BigDecimal>(balance);
  38. }
  39. @Override
  40. public BigDecimal getBalance() {
  41. return balance.get();
  42. }
  43. @Override
  44. public void withDraw(BigDecimal amount) {
  45. // balance.getAndAdd(-amount);
  46. while (true){
  47. BigDecimal prev = balance.get();
  48. BigDecimal next = prev.subtract(amount);
  49. if (balance.compareAndSet(prev,next)){
  50. break;
  51. }
  52. }
  53. }
  54. }

五、ABA问题

1.示例

  1. package panw.JUC;
  2. import lombok.extern.slf4j.Slf4j;
  3. import java.util.concurrent.atomic.AtomicReference;
  4. @Slf4j
  5. public class ABATest {
  6. static AtomicReference<String> atomicReference = new AtomicReference<>("A");
  7. public static void main(String[] args) throws InterruptedException {
  8. Thread t1 = new Thread(() -> {
  9. String s = atomicReference.get();
  10. other();
  11. try {
  12. Thread.sleep(1000);
  13. } catch (InterruptedException e) {
  14. throw new RuntimeException(e);
  15. }
  16. log.debug("A->C "+ atomicReference.compareAndSet("A","C"));
  17. }, "t1");
  18. t1.start();
  19. t1.join();
  20. log.debug("最终:"+atomicReference.get());
  21. }
  22. static void other() {
  23. new Thread(()->{
  24. log.debug("A->B "+ atomicReference.compareAndSet("A","B"));
  25. }).start();
  26. try {
  27. Thread.sleep(500);
  28. } catch (InterruptedException e) {
  29. throw new RuntimeException(e);
  30. }
  31. new Thread(()->{
  32. log.debug("B->A "+ atomicReference.compareAndSet("B","A"));
  33. }).start();
  34. }
  35. }

主线程仅能判断出共享变量的值与初值 A 是否相同,不能感知到这种从 A 改为 B 又 改回 A 的情况,如果主线程希望:
只要有其它线程【动过了】共享变量,那么自己的 cas 就算失败,这时,仅比较值是不够的,需要再加一个版本号,JUC提供了以下的实现

2.AtomicStampedReference

  1. package panw.JUC;
  2. import lombok.extern.slf4j.Slf4j;
  3. import java.util.concurrent.TimeUnit;
  4. import java.util.concurrent.atomic.AtomicStampedReference;
  5. @Slf4j
  6. public class AtomicStampedReferenceTest {
  7. private static AtomicStampedReference<String> str = new AtomicStampedReference<>("A",0);
  8. public static void main(String[] args) throws InterruptedException {
  9. Thread t1 = new Thread(() -> {
  10. String pre = str.getReference();
  11. int stamp = str.getStamp();
  12. other();
  13. try {
  14. TimeUnit.SECONDS.sleep(2);
  15. } catch (InterruptedException e) {
  16. throw new RuntimeException(e);
  17. }
  18. log.debug("change A->C stamp " + stamp + str.compareAndSet(pre, "C", stamp, stamp+1));
  19. });
  20. t1.start();
  21. t1.join();
  22. }
  23. static void other() {
  24. new Thread(()->{
  25. int stamp = str.getStamp();
  26. log.debug("A->B stamp " + stamp + str.compareAndSet("A","B",stamp,stamp+1));
  27. }).start();
  28. try {
  29. Thread.sleep(500);
  30. } catch (InterruptedException e) {
  31. throw new RuntimeException(e);
  32. }
  33. new Thread(()->{
  34. int stamp = str.getStamp();
  35. log.debug("B->A stamp " + stamp + str.compareAndSet("B","A",stamp,stamp+1));
  36. }).start();
  37. }
  38. }

3.AtomicMarkableReference

  1. package panw.JUC;
  2. import lombok.extern.slf4j.Slf4j;
  3. import java.util.concurrent.atomic.AtomicMarkableReference;
  4. @Slf4j
  5. public class AtomicMarkableReferenceTest {
  6. static AtomicMarkableReference<String> str = new AtomicMarkableReference<>("A",false);
  7. public static void main(String[] args) {
  8. new Thread(()->{
  9. String pre = str.getReference();
  10. other();
  11. try {
  12. Thread.sleep(1000);
  13. } catch (InterruptedException e) {
  14. throw new RuntimeException(e);
  15. }
  16. log.debug("A->C " + str.compareAndSet("A","C",false,true));
  17. }).start();
  18. }
  19. static void other() {
  20. new Thread(()->{
  21. log.debug("A->A " + str.compareAndSet("A","A",false,true));
  22. }).start();
  23. }
  24. }

两种区别

  • AtomicStampedReference 需要我们传入整型变量作为版本号,来判定是否被更改过
  • AtomicMarkableReference需要我们传入布尔变量作为标记,来判断是否被更改过

    六、原子数组

  • AtomicIntegerArray

  • AtomicLongArray
  • AtomicReferenceArray

具体的使用其实和上面相同,只是操作时候带上数据下标即可。

七、原子更新器

  • AtomicReferenceFieldUpdater // 域 字段
  • AtomicIntegerFieldUpdater
  • AtomicLongFieldUpdate

原子更新器用于帮助我们改变某个对象中的某个属性

  1. package panw.JUC;
  2. import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
  3. public class AtomicReferenceFieldUpdaterTest {
  4. public static void main(String[] args) {
  5. Person person = new Person();
  6. AtomicReferenceFieldUpdater<Person, String> updater =AtomicReferenceFieldUpdater.newUpdater(Person.class,String.class,"name");
  7. System.out.println("更新前"+person);
  8. updater.compareAndSet(person,null,"panw");
  9. System.out.println("更新后"+person);
  10. }
  11. }
  12. class Person{
  13. volatile String name;
  14. @Override
  15. public String toString() {
  16. return "Person{" +
  17. "name='" + name + '\'' +
  18. '}';
  19. }
  20. }

从上面的例子可以看出,原子更新器是通过newUpdater来获取实例的。其中传入了三个参数

  • 拥有属性的类的Class
  • 属性的Class
  • 属性的名称

大概可以猜出来,初始化过程用到了反射,下面来看看源码:

内部实现类

image.png
AtomicReferenceFieldUpdater为抽象类,该类内部有一个自己的实现类**AtomicReferenceFieldUpdaterImpl**

  1. private static final class AtomicReferenceFieldUpdaterImpl<T,V>
  2. extends AtomicReferenceFieldUpdater<T,V>

newUpdater方法:

  1. @CallerSensitive
  2. public static <U,W> AtomicReferenceFieldUpdater<U,W> newUpdater(Class<U> tclass,
  3. Class<W> vclass,
  4. String fieldName) {
  5. return new AtomicReferenceFieldUpdaterImpl<U,W>
  6. (tclass, vclass, fieldName, Reflection.getCallerClass());
  7. }

**AtomicReferenceFieldUpdaterImpl**构造方法:

  1. AtomicReferenceFieldUpdaterImpl(final Class<T> tclass,
  2. final Class<V> vclass,
  3. final String fieldName,
  4. final Class<?> caller) {
  5. final Field field;
  6. final Class<?> fieldClass;
  7. final int modifiers;
  8. try {
  9. field = AccessController.doPrivileged(
  10. new PrivilegedExceptionAction<Field>() {
  11. public Field run() throws NoSuchFieldException {
  12. return tclass.getDeclaredField(fieldName);
  13. }
  14. });
  15. modifiers = field.getModifiers();
  16. sun.reflect.misc.ReflectUtil.ensureMemberAccess(
  17. caller, tclass, null, modifiers);
  18. ClassLoader cl = tclass.getClassLoader();
  19. ClassLoader ccl = caller.getClassLoader();
  20. if ((ccl != null) && (ccl != cl) &&
  21. ((cl == null) || !isAncestor(cl, ccl))) {
  22. sun.reflect.misc.ReflectUtil.checkPackageAccess(tclass);
  23. }
  24. fieldClass = field.getType();
  25. } catch (PrivilegedActionException pae) {
  26. throw new RuntimeException(pae.getException());
  27. } catch (Exception ex) {
  28. throw new RuntimeException(ex);
  29. }
  30. if (vclass != fieldClass)
  31. throw new ClassCastException();
  32. if (vclass.isPrimitive())
  33. throw new IllegalArgumentException("Must be reference type");
  34. if (!Modifier.isVolatile(modifiers))
  35. throw new IllegalArgumentException("Must be volatile type");
  36. this.cclass = (Modifier.isProtected(modifiers) &&
  37. tclass.isAssignableFrom(caller) &&
  38. !isSamePackage(tclass, caller))
  39. ? caller : tclass;
  40. this.tclass = tclass;
  41. this.vclass = vclass;
  42. this.offset = U.objectFieldOffset(field);
  43. }

从上面代码可以看到确实是通过反射来实现更新

八、LongAdder

LongAdder是JDK8新增的原子操作类,它提供了一种新的思路,AtomicLong的性能瓶颈是由于大量线程同时更新一个变量造成的,那么能不能把这个变量拆分出来,变成多个变量,然后让线程去竞争这些变量,最后合并即可?LongAdder的设计精髓就在这里,通过将变量拆分成多个元素,降低该变量的并发度,最后进行合并元素,变相的减少了CAS的失败次数。
共享模型之无锁 - 图2

九、Unsafe

Unsafe 对象提供了非常底层的,操作内存、线程的方法,Unsafe 对象不能直接调用,只能通过反射获得

  1. package panw.JUC;
  2. import sun.misc.Unsafe;
  3. import java.lang.reflect.Constructor;
  4. import java.lang.reflect.Field;
  5. import java.lang.reflect.InvocationTargetException;
  6. public class UnsafeGetTest {
  7. public static void main(String[] args) throws NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException, NoSuchFieldException {
  8. Class<Unsafe> unsafeClass = Unsafe.class;
  9. Constructor<Unsafe> constructor = unsafeClass.getDeclaredConstructor();
  10. constructor.setAccessible(true);
  11. Unsafe unsafe = constructor.newInstance();
  12. Field field = Teacher.class.getDeclaredField("name");
  13. long offset = unsafe.objectFieldOffset(field);
  14. System.out.println(offset);
  15. Person person = new Person();
  16. unsafe.compareAndSwapObject(person,offset,null,"张三");
  17. System.out.println(person);
  18. }
  19. }
  20. class Teacher{
  21. volatile String name;
  22. @Override
  23. public String toString() {
  24. return "Teacher{" +
  25. "name='" + name + '\'' +
  26. '}';
  27. }
  28. }