在Java中,线程部分是一个重点,本篇文章说的JUC也是关于线程的。
JUC就是java.util .concurrent工具包的简称。这是一个处理线程的工具包,JDK 1.5开始出现的。

JMM内存模型

Java内存模型,是一种规则或规范,通过这组规范定义了程序中各个变量(包括实例字段,静态字段和构成数组对象的元素)的访问方式。
线程对变量的操作(读取赋值等)必须在工作内存中进行,首先将主内存的变量拷贝到自己的工作内存(私有数据区域)。
一个线程修改值后将变量写回给主内存(共享内存区域,所有线程都可以访问),并及时通知其他线程(可见性)(这种及时通知的机制就叫JMM)。
JMM的三大特性:可见性、原子性、有序性。(保证线程安全)

volatile关键字

是java虚拟机(JVM)提供的轻量级的同步机制。
保持可见性,不支持原子性,禁止指令重排(有序性)。

可见性

  1. class MyData{
  2. int number=0;
  3. //volatile int number=0;
  4. AtomicInteger atomicInteger=new AtomicInteger();
  5. public void setTo60(){
  6. this.number=60;
  7. }
  8. //此时number前面已经加了volatile,但是不保证原子性
  9. public void addPlusPlus(){
  10. number++;
  11. }
  12. public void addAtomic(){
  13. atomicInteger.getAndIncrement();//i++
  14. }
  15. }
  16. //volatile可以保证可见性,及时通知其它线程主物理内存的值已被修改
  17. private static void volatileVisibilityDemo() {
  18. System.out.println("可见性测试");
  19. MyData myData=new MyData();//资源类
  20. //启动一个线程操作共享数据
  21. new Thread(()->{
  22. System.out.println(Thread.currentThread().getName()+"\t come in");
  23. try {TimeUnit.SECONDS.sleep(3);myData.setTo60();
  24. System.out.println(Thread.currentThread().getName()+"\t update number value: "+myData.number);}catch (InterruptedException e){e.printStackTrace();}
  25. },"AAA").start();
  26. while (myData.number==0){
  27. //main线程持有共享数据的拷贝,一直为0
  28. }
  29. System.out.println(Thread.currentThread().getName()+"\t mission is over. main get number value: "+myData.number);
  30. }

原子性

volatile不保证原子性
i++(不是原子性操作);多线程会出现了写覆盖的现象。

如何解决原子性:

解决的方式就是:

  1. addPlusPlus()方法加锁。
  2. 使用java.util.concurrent.AtomicInteger类。

    1. private static void atomicDemo() {
    2. System.out.println("原子性测试");
    3. MyData myData=new MyData();
    4. for (int i = 1; i <= 20; i++) {
    5. new Thread(()->{
    6. for (int j = 0; j <1000 ; j++) {
    7. myData.addPlusPlus();
    8. myData.addAtomic();
    9. }
    10. },String.valueOf(i)).start();
    11. }
    12. while (Thread.activeCount()>2){
    13. Thread.yield();
    14. }
    15. System.out.println(Thread.currentThread().getName()+"\t int type finally number value: "+myData.number);
    16. System.out.println(Thread.currentThread().getName()+"\t AtomicInteger type finally number value: "+myData.atomicInteger);
    17. }
  3. synchronized(太重了,会导致并发性能下降),不建议用synchronized

  4. AtomicInteger(原子性类)

AtomicInteger保证原子性的原因:因为CAS(unsafe和自旋锁)

有序性(禁止指令重排)

volatile禁止指令重排(保证有序性)
.java—>.class—>>编译器执行字节码(程序最终执行)
计算机在执行程序时,为了提高性能,编译器和处理器常常会对指令做重拍,一般分为以下3中:
JUC核心点 - 图1

  1. 单线程环境里确保程序最终执行结果和代码执行的结果一致
  2. 处理器在进行重排序时必须要考虑指令之间的数据依赖性
  3. 多线程环境中线程交替执行,由于编译器优化重排对的存在,两个线程中使用的变量能否保证一致性是无法确定的,结果无法预测。 ```java public void mySort(){ int x=11; int y=12; x=x+5; y=x*x; } //多线程并发时指令执行顺序可能是: 1234 2134 1324

由于指令重排也需要数据的依赖性,不可能先执行4

  1. 以上例子,可能出现的执行顺序有123421341342,这三个都没有问题,最终结果都是x = 16y=256。但是如果是4开头,就有问题了,y=0。这个时候就**不需要**指令重排序。
  2. volatile底层是用CPU的**内存屏障**(Memory Barrier)指令来实现的,有两个作用,一个是保证特定操作的顺序性,二是保证变量的可见性。在指令之间插入一条Memory Barrier指令,告诉编译器和CPU,在Memory Barrier指令之间的指令不能被重排序。
  3. <a name="30806f80"></a>
  4. ## 哪些地方用到过volatile?
  5. <a name="17995a5f"></a>
  6. ### 单例模式的安全问题
  7. 常见的DCLDouble Check Lock)模式虽然加了同步,但是在多线程下依然会有线程安全问题。
  8. ```java
  9. public class SingletonDemo{
  10. private static volatile SingletonDemo instance=null;//双重检测加volatile关键字
  11. //构造器私有化
  12. private void SingletonDemo(){
  13. }
  14. //双重检测锁模式
  15. public static SingletonDemo getInstance(){
  16. if(instance==null){
  17. synchronized (SingletonDemo.class){
  18. if(instance==null){
  19. instance=new SingletonDemo();
  20. }
  21. }
  22. }
  23. return instance;
  24. }
  25. public static void main(String[] args) {
  26. for (int i = 0; i < 10; i++) {
  27. new Thread(()->{
  28. SingletonDemo.getInstance();
  29. },String.valueOf(i+1)).start();
  30. }
  31. }
  32. }

这个漏洞比较tricky,很难捕捉,但是是存在的。instance=new SingletonDemo();可以大致分为三步

  1. memory = allocate(); //1.分配内存
  2. instance(memory); //2.初始化对象
  3. instance = memory; //3.设置引用地址

其中2、3没有数据依赖关系,可能发生重排。如果发生,此时内存已经分配,那么instance=memory不为null。如果此时线程挂起,instance(memory)还未执行,对象还未初始化。由于instance!=null,所以两次判断都跳过,最后返回的instance没有任何内容,还没初始化。

解决的方法就是对singletondemo对象添加上volatile关键字,禁止指令重排。
AtomicInteger保证原子性的原因:因为CAS(unsafe和自旋锁)

CAS

CAS是指Compare And Swap比较并交换,是一种很重要的同步思想。如果主内存的值跟期望值一样,那么就进行修改,否则一直重试,直到一致为止。

  1. public class CASDemo {
  2. public static void main(String[] args) {
  3. AtomicInteger atomicInteger=new AtomicInteger(5);
  4. System.out.println(atomicInteger.compareAndSet(5, 2019)+"\t current data : "
  5. + atomicInteger.get());
  6. //修改失败
  7. System.out.println(atomicInteger.compareAndSet(5, 1024)+"\t current data : "
  8. + atomicInteger.get());
  9. }
  10. }

第一次修改,期望值为5,主内存也为5,修改成功,为2019。
第二次修改,期望值为5,主内存为2019,修改失败。

查看AtomicInteger.getAndIncrement()(实现变量+1)方法,发现其没有加synchronized也实现了同步。这是为什么?

CAS底层原理

AtomicInteger内部维护了volatile int valueprivate static final Unsafe unsafe两个比较重要的参数。

  1. public final int getAndIncrement(){
  2. return unsafe.getAndAddInt(this,valueOffset,1);
  3. }
  4. public final int getAndAddInt(Object var1, long var2, int var4) {
  5. int var5;
  6. do {
  7. var5 = this.getIntVolatile(var1, var2);
  8. } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
  9. return var5;
  10. }

AtomicInteger.getAndIncrement()调用了Unsafe.getAndAddInt()方法。Unsafe类的大部分方法都是native的,用来像C语言一样从底层操作内存。

这个方法的var1和var2,就是根据对象偏移量得到在主内存的快照值var5。然后compareAndSwapInt方法通过var1和var2得到当前主内存的实际值。如果这个实际值快照值相等,那么就更新主内存的值为var5+var4。如果不等,那么就一直循环,一直获取快照,一直对比,直到实际值和快照值相等为止。

比如有A、B两个线程,一开始都从主内存中拷贝了原值为3,A线程执行到var5=this.getIntVolatile,即var5=3。此时A线程挂起,B修改原值为4,B线程执行完毕,由于加了volatile,所以这个修改是立即可见的。A线程被唤醒,执行this.compareAndSwapInt()方法,发现这个时候主内存的值不等于快照值3,所以继续循环,重新从主内存获取。

CAS缺点

CAS实际上是一种自旋锁,

  1. 一直循环,开销比较大。
  2. 只能保证一个变量的原子操作,多个变量依然要加锁。
  3. 引出了ABA问题

    ABA问题

    所谓ABA问题,就是比较并交换的循环,存在一个时间差,而这个时间差可能带来意想不到的问题。比如线程T1将值从A改为B,然后又从B改为A。线程T2看到的就是A,但是却不知道这个A发生了更改。尽管线程T2 CAS操作成功,但不代表就没有问题。
    有的需求,比如CAS,只注重头和尾,只要首尾一致就接受。但是有的需求,还看重过程,中间不能发生任何修改,这就引出了AtomicReference原子引用。
    AtomicReference
    AtomicInteger对整数进行原子操作,如果是一个POJO呢?可以用AtomicReference来包装这个POJO,使其操作原子化。

    1. User user1 = new User("Jack",25);
    2. User user2 = new User("Lucy",21);
    3. AtomicReference<User> atomicReference = new AtomicReference<>();
    4. atomicReference.set(user1);
    5. System.out.println(atomicReference.compareAndSet(user1,user2)); // true
    6. System.out.println(atomicReference.compareAndSet(user1,user2)); //false

    AtomicStampedReference和ABA问题的解决

    使用AtomicStampedReference类可以解决ABA问题。这个类维护了一个“版本号”Stamp,在进行CAS操作的时候,不仅要比较当前值,还要比较版本号。只有两者都相等,才执行更新操作。

    1. AtomicStampedReference.compareAndSet(expectedReference,newReference,oldStamp,newStamp);

    项目中遇到:多人同时修改同一个工单问题
    其他对象解决ABA问题:

  4. 红绿灯法

  5. 加版本号

我们知道ArrayList是线程不安全的,请编码写一个不安全的案例并给出解决方案。

List

有序、可重复。

  1. public static void main(String[] args) {
  2. //List<String> list=new ArrayList<>();//单线程环境下没问题,多线程下ArrayList线程不安全
  3. //List<String> list=new Vector<>();
  4. //List<String> list=Collections.synchronizedList(new ArrayList<>());//Collections辅助工具类,变成线程安全的
  5. List<String> list=new CopyOnWriteArrayList<>();
  6. for (int i = 0; i < 30; i++) {
  7. new Thread(()->{
  8. list.add(UUID.randomUUID().toString().substring(0,8));
  9. System.out.println(list);
  10. },String.valueOf(i)).start();
  11. }
  12. //出现并发书写的异常
  13. //导致原因:并发争抢修改导致,参考花名册签名
  14. //出现错误异常 java.util.ConcurrentModificationException
  15. //解决问题:
  16. //使用vector(add时加了synchronized)并发性急剧下降,不考虑线程安全用new ArrayList<>();
  17. //1.1 new vector();
  18. //1.2 Collections.singletonList(new ArrayList<>());
  19. //1.3 new CopyOnWriteArrayList<>();
  20. }

CopyOnWriteArrayList中的add方法:写时复制(也有读写分离的思想)

  1. public boolean add(E e) {
  2. final ReentrantLock lock = this.lock;
  3. lock.lock();
  4. try {
  5. Object[] elements = getArray();
  6. int len = elements.length;
  7. Object[] newElements = Arrays.copyOf(elements, len + 1);
  8. newElements[len] = e;
  9. setArray(newElements);
  10. return true;
  11. } finally {
  12. lock.unlock();
  13. }
  14. }

Set

无序、不可重复。HashSet线程不安全的

  1. public static void main(String[] args) {
  2. Set<String> set=new HashSet<>();//线程不安全
  3. Set<String> set1= Collections.synchronizedSet(new HashSet<>());
  4. Set<String> set2=new CopyOnWriteArraySet<>();
  5. for (int i = 0; i < 30; i++) {
  6. new Thread(()->{
  7. set.add(UUID.randomUUID().toString().substring(0,8));
  8. System.out.println(set);
  9. },String.valueOf(i)).start();
  10. }
  11. //HashSet 底层是hashMap
  12. //为什么是hashMap?
  13. new HashSet<>().add("");
  14. //只关注key,value是个常量
  15. /*public boolean add(E e) {
  16. return map.put(e, PRESENT)==null;
  17. }*/
  18. }

Set:元素无序(存入和取出的顺序不一定一致),元素不可以重复。

HashSet:底层数据结构是哈希表。
HashSet如何保证 元素唯一性的呢?
是通过元素的两个方法,hashCode和equals来完成的。
如果元素的HashCode值相同,才会判断equals是否为true。
如果元素的HashCode值不同,不会调用equals。

注意,对于判断元素是否存在,以及删除等操作,依赖的方法是元素的HashCode和equals方法
先判断hashcode是否相同,如果相同再判断equals

Map

1.HashMap、Hashtable不是有序的;
2.TreeMap和LinkedHashMap是有序的(TreeMap默认 Key 升序,LinkedHashMap则记录了插入顺序)。
HashMap线程不安全的

  1. public static void main(String[] args) {
  2. //Map<String,String> map=new HashMap<>();//线程不安全
  3. Map<String,String> map=new ConcurrentHashMap<>();//线程安全
  4. for (int i = 0; i < 30; i++) {
  5. new Thread(()->{
  6. map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0,8));
  7. System.out.println(map);
  8. },String.valueOf(i)).start();
  9. }
  10. }

HashMap和ConcurrentHashMap底层待补充

hashMap

默认initialCapacity(容量)默认为16,loadFactory(加载因子)默认为0.75
jdk1.7:数组+链表
jdk1.8:数组+链表+红黑树
当hash表的单一链表长度超过 8 个的时候,链表结构就会转为红黑树结构。
为什么要这样设计呢?好处就是避免在最极端的情况下链表变得很长很长,在查询的时候,效率会非常慢。
使用resize()方法进行扩容。
put:

  • 将k,v封装成Node对象
  • 调用k的hashcode方法得出hash值,通过哈希函数或哈希运算将hash值转换成数组下标
  • 下标位置如果没有任何元素,则将Node插入此位置;若这个下标位置对应的有链表,则和链表中每个节点的k进行equals,如果没有查到,插入到链表末尾,若查到,覆盖此k对应的value

get:

  • 调用k的hashcode方法获取hash值
  • 通过哈希运算计算出数组下标
  • 通过下标快速定位到所在位置,若这个位置什么都没有,则返回null;若此位置存在单向链表,则会对链表中的每个节点的k进行equals,若返回false,则返回null,若返回true,则返回k对应的value

    ConcurrentHashMap

    jdk7:
    数据结构:数组+链表,数组+Segment+分段锁的方式实现ReentrantLock+segment+HashEntey,一个Segment中包含一个HashEntry数组,每个HashEntry又是一个链表结构;
    元素查询:二次hash,第一次Hash定位到segment,第二次Hash定位到元素所在的链表的头部
    锁:Segment分段锁,segment继承了ReentrantLock,锁定操作的segment,其他的segment不受影响,并发度为segment的个数,可以通过构造函数指定,数组扩容不会影响其他的segment
    get方法无需加锁,volatile保证。
    jdk8:
    数据结构:数组+链表+红黑树;synchronized+CAS+Node+红黑树,Node的val和next都用volatile修饰,保证可见性;
    查找、替换、赋值操作都使用CAS
    CAS操作:CAS是compare and swap的缩写,即我们所说的比较交换。cas是一种基于锁的操作,而且是乐观锁。在java中锁分为乐观锁和悲观锁。悲观锁是将资源锁住,等一个之前获得锁的线程释放锁之后,下一个线程才可以访问。而乐观锁采取了一种宽泛的态度,通过某种方式不加锁来处理资源,比如通过给记录加version来获取数据,性能较悲观锁有很大的提高。
    CAS 操作包含三个操作数 —— 内存位置(V)、预期原值(A)和新值(B)。如果内存地址里面的值和A的值是一样的,那么就将内存里面的值更新成B。CAS是通过无限循环来获取数据的,如果在第一轮循环中,a线程获取地址里面的值被b线程修改了,那么a线程需要自旋,到下次循环才有可能机会执行。
    锁:锁链表的head节点,不影响其他元素的读写,锁粒度更细,效率更高,扩容时,阻塞所有读写操作,并发扩容
    读操作无锁:
    Node的val和next使用volatile修饰,读写线程对该变量互相可见
    数组用volatile修饰,保证扩容时被读线程感知。

    小结:

    1.数据结构:取消了Segment分段锁的数据结构,取而代之的是数组+链表+红黑树的结构。
    2.保证线程安全机制:JDK1.7采用segment的分段锁机制实现线程安全,其中segment继承自ReentrantLock。JDK1.8采用CAS+Synchronized保证线程安全。
    3.锁的粒度:原来是对需要进行数据操作的Segment加锁,现调整为对每个数组元素加锁(Node)。
    4.链表转化为红黑树: 定位结点的hash算法简化会带来弊端,Hash冲突加剧,因此在链表节点数量大于8时,会将链表转化为红黑树进行存储。
    5.查询时间复杂度:从原来的遍历链表O(n),变成遍历红黑树O(logN)。

    锁相关

    公平锁/非公平锁:

    公平锁:是指多个线程按照申请锁的顺序来获取锁,类似排队,队列先来后到。
    非公平锁:多个线程申请锁的顺序并不是按照申请锁的顺序(允许加塞。提高性能)在高并发情况下,有可能会造成优先级反转或饥饿。
    并发包中的ReentrantLock可指定构造函数的Boolean类型来得到公平锁或非公平所,默认是非公平锁。
    两者区别:
  1. 就很公平,先来后到;
  2. 比较粗鲁,上来就直接尝试占有锁,如果尝试失败,就再采用类似公平锁那种方式,优点在于吞吐量比公平锁大。(synchronized,也是种非公平锁)

    可重入锁(又名递归锁)

    ReentrantLock/Synchronized典型的可重入锁。
    指同一个线程外层函数获取锁后,内层递归函数仍然能获取该锁的代码,
    在一个线程在外层方法获取锁的时候,在进入内层方法会自动获取锁。
    重点:也就是说线程可以进入任何一个它已经拥有的锁所同步的代码块。
    可重入锁最大的作用是避免死锁。

锁的配对
锁之间要配对,加了几把锁,最后就得解开几把锁,下面的代码编译和运行都没有任何问题。
但锁的数量不匹配会导致死循环。

  1. lock.lock();
  2. lock.lock();
  3. try{
  4. someAction();
  5. }finally{
  6. lock.unlock();//加两把锁,只解一把会造成死锁。
  7. }

自旋锁

所谓自旋锁,就是尝试获取锁的线程不会立即阻塞,而是采用循环的方式去尝试获取。自己在那儿一直循环获取,就像“自旋”一样。这样的好处是减少线程切换的上下文开销,缺点是会消耗CPU。CAS底层的getAndAddInt就是自旋锁思想。(自旋的反义词就是阻塞)
AtomicInteger(原子整型)AtomicReference原子对象
手写自旋锁:Unsafe+CAS(自旋)

独占锁(写锁)/共享锁(读锁)/互斥锁

读锁共享的写锁独占的juc.ReentrantLocksynchronized都是独占锁,独占锁就是一个锁只能被一个线程所持有。有的时候,需要读写分离,那么就要引入读写锁,即juc.ReentrantReadWriteLock
独占锁:被一个线程独占。RenntrantLock和Synchronzied都是独占锁。
共享锁:指该锁可被多个线程共享。
业务场景:读写分离,即保证数据一致性有保证并发性。
手写ReadWriteLockDemo(读写锁)
总结:

  1. 读-读能共存
  2. 读-写不能共存
  3. 写-写不能共存 ```java class MyCache{//资源类 private volatile Map map=new HashMap<>(); private ReentrantReadWriteLock rwLock=new ReentrantReadWriteLock();

    public void put(String key,Object value){

    1. rwLock.writeLock().lock();
    2. try {
    3. System.out.println(Thread.currentThread().getName()+"\t 正在写入");
    4. map.put(key,value);
    5. System.out.println(Thread.currentThread().getName()+"\t 写入完成");
    6. } catch (Exception e) {
    7. e.printStackTrace();
    8. }finally {
    9. rwLock.writeLock().unlock();
    10. }

    }

    public void get(String key){

    1. rwLock.readLock().lock();
    2. try {
    3. System.out.println(Thread.currentThread().getName()+"\t 正在读取");
    4. Object strValue=map.get(key);
    5. System.out.println(Thread.currentThread().getName()+"读取完成"+strValue);
    6. } catch (Exception e) {
    7. e.printStackTrace();
    8. } finally {
    9. rwLock.readLock().unlock();
    10. }

    } }

//测试读写锁 public class ReadWriteLockDemo { public static void main(String[] args) { MyCache myCache=new MyCache(); for (int i = 0; i < 5; i++) { final int ntemp=i; new Thread(()->{ myCache.put(ntemp+””,ntemp); },String.valueOf(i)).start(); }

  1. for (int i = 0; i < 5; i++) {
  2. final int ntemp=i;
  3. new Thread(()->{
  4. myCache.get(ntemp+"");
  5. },String.valueOf(i)).start();
  6. }
  7. }

}

  1. <a name="hDCuG"></a>
  2. ## 分布式锁
  3. <a name="eIv22"></a>
  4. ## 基于缓存实现分布式锁,以Redis为例
  5. 使用Jedis来和Redis通信。
  6. <a name="mvkqA"></a>
  7. ### 加锁
  8. ```java
  9. public class RedisTool {
  10. private static final String LOCK_SUCCESS = "OK";
  11. private static final String SET_IF_NOT_EXIST = "NX";
  12. private static final String SET_WITH_EXPIRE_TIME = "PX";
  13. /**
  14. * 加锁
  15. * @param jedis Redis客户端
  16. * @param lockKey 锁的key
  17. * @param requestId 竞争者id
  18. * @param expireTime 锁超时时间,超时之后锁自动释放
  19. * @return
  20. */
  21. public static boolean getDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {
  22. String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
  23. return "OK".equals(result);
  24. }
  25. }

可以看到,我们加锁就一行代码:
jedis.set(String key, String value, String nxxx, String expx, int time);
这个set()方法一共五个形参:
第一个为key,我们使用key来当锁,因为key是唯一的。
第二个为value,这里写的是锁竞争者的id,在解锁时,我们需要判断当前解锁的竞争者id是否为锁持有者。
第三个为nxxx,这个参数我们填的是NX,意思是SET IF NOT EXIST,即当key不存在时,我们进行set操作;若key已经存在,则不做任何操作。
第四个为expx,这个参数我们传的是PX,意思是我们要给这个key加一个过期时间的设置,具体时间由第五个参数决定;
第五个参数为time,与第四个参数相呼应,代表key的过期时间。
总的来说,执行上面的set()方法就只会导致两种结果:

  1. 当前没有锁(key不存在),那么就进行加锁操作,并对锁设置一个有效期,同时value表示加锁的客户端。
  2. 已经有锁存在,不做任何操作。

上述解锁请求中,SET_IF_NOT_EXIST(不存在则执行)保证了加锁请求的排他性,缓存超时机制保证了即使一个竞争者加锁之后挂了,也不会产生死锁问题:超时之后其他竞争者依然可以获取锁。通过设置value为竞争者的id,保证了只有锁的持有者才能来解锁,否则任何竞争者都能解锁,那岂不是乱套了。

解锁

  1. public class RedisTool {
  2. private static final Long RELEASE_SUCCESS = 1L;
  3. /**
  4. * 释放分布式锁
  5. * @param jedis Redis客户端
  6. * @param lockKey 锁
  7. * @param requestId 锁持有者id
  8. * @return 是否释放成功
  9. */
  10. public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) {
  11. String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
  12. Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
  13. return RELEASE_SUCCESS.equals(result);
  14. }
  15. }

解锁的步骤:

  1. 判断当前解锁的竞争者id是否为锁的持有者,如果不是直接返回失败,如果是则进入第2步。
  2. 删除key,如果删除成功,返回解锁成功,否则解锁失败。

注意到这里解锁其实是分为2个步骤,涉及到解锁操作的一个原子性操作问题。这也是为什么我们解锁的时候用Lua脚本来实现,因为Lua脚本可以保证操作的原子性。那么这里为什么需要保证这两个步骤的操作是原子操作呢?
设想:假设当前锁的持有者是竞争者1,竞争者1来解锁,成功执行第1步,判断自己就是锁持有者,这是还未执行第2步。这是锁过期了,然后竞争者2对这个key进行了加锁。加锁完成后,竞争者1又来执行第2步,此时错误产生了:竞争者1解锁了不属于自己持有的锁。可能会有人问为什么竞争者1执行完第1步之后突然停止了呢?这个问题其实很好回答,例如竞争者1所在的JVM发生了GC停顿,导致竞争者1的线程停顿。这样的情况发生的概率很低,但是请记住即使只有万分之一的概率,在线上环境中完全可能发生。因此必须保证这两个步骤的操作是原子操作。

分析

是否可重入:以上实现的锁是不可重入的,如果需要实现可重入,在SET_IF_NOT_EXIST之后,再判断key对应的value是否为当前竞争者id,如果是返回加锁成功,否则失败。
锁释放时机:加锁时我们设置了key的超时,当超时后,如果还未解锁,则自动删除key达到解锁的目的。如果一个竞争者获取锁之后挂了,我们的锁服务最多也就在超时时间的这段时间之内不可用。
Redis单点问题:如果需要保证锁服务的高可用,可以对Redis做高可用方案:Redis集群+主从切换。目前都有比较成熟的解决方案。

线程通讯:生产者和消费者普通版机制

//一个初始值为零的变量,两个线程对其交替操作,一个加1一个减1,来5轮
//线程 操作 资源类

  1. class SharerData{//资源类
  2. private int number=0;
  3. private Lock lock=new ReentrantLock();
  4. private Condition condition=lock.newCondition();
  5. public void increment()throws Exception{
  6. lock.lock();
  7. try{
  8. //判断
  9. while (number!=0){
  10. //等待 不能生产
  11. condition.await();
  12. }
  13. number++;
  14. System.out.println(Thread.currentThread().getName()+"\t "+number);
  15. //通知唤醒
  16. condition.signalAll();
  17. }catch (Exception e){
  18. e.printStackTrace();
  19. }finally {
  20. lock.unlock();
  21. }
  22. }
  23. public void decrement()throws Exception{
  24. lock.lock();
  25. try{
  26. //判断
  27. while (number==0){
  28. //等待 不能生产
  29. condition.await();
  30. }
  31. number--;
  32. System.out.println(Thread.currentThread().getName()+"\t "+number);
  33. //通知唤醒
  34. condition.signalAll();
  35. }catch (Exception e){
  36. e.printStackTrace();
  37. }finally {
  38. lock.unlock();
  39. }
  40. }
  41. }
  42. //线程 操作 资源类
  43. //判断 干活 通知
  44. //防止虚假唤醒机制判断用while
  45. public class ProdConsumer_TraditionDemo {
  46. public static void main(String[] args) {
  47. SharerData sharerData=new SharerData();
  48. new Thread(()->{
  49. for (int i = 0; i < 5; i++) {
  50. try {
  51. sharerData.increment();
  52. } catch (Exception e) {
  53. e.printStackTrace();
  54. }
  55. }
  56. },"A").start();
  57. new Thread(()->{
  58. for (int i = 0; i < 5; i++) {
  59. try {
  60. sharerData.decrement();
  61. } catch (Exception e) {
  62. e.printStackTrace();
  63. }
  64. }
  65. },"B").start();
  66. }
  67. }

Synchronized和Lock区别

synchronized关键字和java.util.concurrent.locks.Lock都能加锁,两者有什么区别呢?
一个是关键字一个是具体类。

  1. 原始构成:sync是JVM层面的,底层通过monitorenter和monitorexit来实现的。Lock是JDK API层面的。(sync一个enter会有两个exit,一个是正常退出,一个是异常退出)
  2. 使用方法:sync不需要手动释放锁,而Lock需要手动释放。
  3. 是否可中断:sync不可中断,除非抛出异常或者正常运行完成。Lock是可中断的,通过调用interrupt()方法。
  4. 是否为公平锁:sync只能是非公平锁,而Lock既能是公平锁,又能是非公平锁。(true是公平锁,false默认非公平锁)
  5. 绑定多个条件:sync不能,只能随机唤醒。而Lock可以通过Condition来绑定多个条件,精确唤醒。

    锁绑定多个条件Condition

    ```java private Lock lock=new ReentrantLock(); private Condition condition=lock.newCondition();//备用钥匙 private Condition condition1=lock.newCondition(); private Condition condition2=lock.newCondition();

//精确唤醒 condition1.signal();

  1. <a name="kpa2U"></a>
  2. ## 线程池相关及应用:
  3. 高并发处理逻辑 比如同步数据
  4. <a name="e0fcf4d2"></a>
  5. ### 自定义线程池参数选择
  6. 对于CPU密集型任务,最大线程数是CPU线程数+1。<br />对于IO密集型任务,尽量多配点,可以是CPU线程数*2,或者CPU线程数/(1-阻塞系数)。<br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/12643453/1616218162663-bdbccc5b-c5d9-453b-8b5c-d57d4ba8ad43.png#height=238&id=olKKL&margin=%5Bobject%20Object%5D&name=image.png&originHeight=476&originWidth=1429&originalType=binary&ratio=1&size=371174&status=done&style=none&width=714.5)
  7. ```java
  8. public ThreadPoolExecutor(int corePoolSize,
  9. int maximumPoolSize,
  10. long keepAliveTime,
  11. TimeUnit unit,
  12. BlockingQueue<Runnable> workQueue,
  13. ThreadFactory threadFactory,
  14. RejectedExecutionHandler handler) {
  15. if (corePoolSize < 0 ||
  16. maximumPoolSize <= 0 ||
  17. maximumPoolSize < corePoolSize ||
  18. keepAliveTime < 0)
  19. throw new IllegalArgumentException();
  20. if (workQueue == null || threadFactory == null || handler == null)
  21. throw new NullPointerException();
  22. this.acc = System.getSecurityManager() == null ?
  23. null :
  24. AccessController.getContext();
  25. this.corePoolSize = corePoolSize;
  26. this.maximumPoolSize = maximumPoolSize;
  27. this.workQueue = workQueue;
  28. this.keepAliveTime = unit.toNanos(keepAliveTime);
  29. this.threadFactory = threadFactory;
  30. this.handler = handler;
  31. }
  32. //自定义创建一个线程池
  33. ExecutorService myThreadPool=new ThreadPoolExecutor(
  34. 1,//核心线程数
  35. 5,//最大线程数
  36. 10L,//超过队列数+最大线程数时线程池最大保持时间
  37. TimeUnit.SECONDS,//时间单位
  38. new ArrayBlockingQueue(5),//超过核心线程数时,阻塞队列数
  39. Executors.defaultThreadFactory(),//线程创建工程
  40. new ThreadPoolExecutor.CallerRunsPolicy());//拒绝策略

/*
workqueue
new SynchronousQueue<>() 直接提交
new LinkedBlockingDeque<>(int capacity) 无界队列 无参默认是Integer.MAX_VALUE
* new ArrayBlockingQueue<>(int capacity) 有界队列 无参默认是Integer.MAX_VALUE

new PriorityBlockingQueue<>() 默认11个
/

jdk中提供了四种工作队列:
①ArrayBlockingQueue
基于数组的有界阻塞队列,按FIFO排序。新任务进来后,会放到该队列的队尾,有界的数组可以防止资源耗尽问题。当线程池中线程数量达到corePoolSize后,再有新任务进来,则会将任务放入该队列的队尾,等待被调度。如果队列已经是满的,则创建一个新线程,如果线程数量已经达到maxPoolSize,则会执行拒绝策略。
②LinkedBlockingQuene
基于链表的无界阻塞队列(其实最大容量为Interger.MAX),按照FIFO排序。由于该队列的近似无界性,当线程池中线程数量达到corePoolSize后,再有新任务进来,会一直存入该队列,而不会去创建新线程直到maxPoolSize,因此使用该工作队列时,参数maxPoolSize其实是不起作用的。
③SynchronousQuene
一个不缓存任务的阻塞队列,生产者放入一个任务必须等到消费者取出这个任务。也就是说新任务进来时,不会缓存,而是直接被调度执行该任务,如果没有可用线程,则创建新线程,如果线程数量达到maxPoolSize,则执行拒绝策略。
④PriorityBlockingQueue
具有优先级的无界阻塞队列,优先级通过参数Comparator实现。
/*
ThreadFactory
Executors.defaultThreadFactory() XXX
Executors.privilegedThreadFactory() YYY
*/

四大拒绝策略:
1.abortpolicy :默认的拒绝策略,会抛出异常
2.discordpolicy :会默认丢弃任务,不会抛出异常
3.discordoldestpolicy :丢弃队列最前面的任务,然后重新提交被拒绝的任务。
4.callerrunspolicy :由调用线程处理该任务