一. JUC中的locks包
1.1 locks包介绍
java.util.concurrent.locks: JUC中对锁支持的工具包
二. JUC的锁机制
2.1 AQS
2.1.1 AQS介绍
AQS全名AbstractQueuedSynchronizer,是并发容器JUC(java.lang.concurrent)下locks包内的一个类。
它实现了一个FIFO(FirstIn、FisrtOut先进先出)的队列。底层实现的数据结构是一个双向链表
Sync queue:同步队列,是一个双向链表。包括head节点和tail节点。head节点主要用作后续的调度
2.1.2 工作原理
AQS核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是使用用队列实现的锁,即将暂时获取不到锁的线程加入到队列中。
AQS使用一个int state成员变量来表示同步状态,通过内置的FIFO队列来完成获取资源线程的排队工作。AQS使用CAS对该同步状态进行原子操作实现对其值的修改,当state大于0的时候表示锁被占用,如果state等于0时表示没有占用锁
2.2 锁机制介绍
·JUC中的锁底层使用的就是AQS
1. ReentrantLock 可重入锁, Lock接口的实现类
2. ReentrantReadWriteLock:ReadWriteLock接口的实现类。类中包含两个静态内部类,ReadLock读锁(共享锁)、WriteLock写锁(独占锁)。线程在读时,不允许被写
3. StampedLock:Java 8 新推出的。相比ReentrantReadWriteLock好在,读的时候也允许被写
4. Condition:是一个接口,都是通过Lock.newCondition()实例化。属于wait和notify的替代品。提供了await()、signal()、singnalAll()与之对应
5. LockSupport:暂停当前的线程,恢复线程 不会涉及到锁的操作
2.3 锁机制详解
2.3.1 ReentrantLock 重入锁
ReentrantLock是JUC中对重入锁的标准实现。作用相当于synchronized
加锁和解锁过程都需要由程序员手动控制,使用很灵活
提供了2种类型的构造方法。
ReentrantLock(): 创建非公平锁的重入锁。
ReentrantLock(boolean):创建可重入公平锁。取值为true表示公平锁,取值为false表示非公平锁。
公平锁:多线程操作共一个资源时, 严格按照顺序执行
非公平锁:多线程在等待时,可以竞争,谁竞争成功,谁获取锁
非公平锁的效率要高于公平锁。ReentrantLock默认就是非公平锁
创建: ReentrantLock rk = new ReentrantLock();
加锁: 无返回值rk.lock() | 有返回值 rk.tryLock()
注意:多线程抢夺锁时容易出现的问题
如果使用tryLock(),放回false,不会等待了,当前这次枪锁失败,不在继续抢锁(只加锁一次)
rk.tryLock(1, TimeUnit.MINUTES), 在指定的时间内,尝试重新加锁(尝试多次枷锁,直到加锁成功)
解锁: rk.unLock()
注意:
1. ReentrantLock出现异常时,不会自动解锁
2. 多线程的情况下, 一个线程出现异常, 并没有释放锁, 其他线程也获取不到锁, 容易出现死锁
3. 建议把解锁方法finally{}代码块中
4. synchronized加锁与释放锁不需要手动的设置, 遇到异常时, 会自动的解锁
1. 重入锁演示
public class MyReentrantLock { _ _public static void main(String[] args) { __ //获取重入锁 _ReentrantLock rk = new ReentrantLock**(); new Thread(){ @Override public void run() { **//加锁 .lock() | .tryLock() **try { if(rk.tryLock()){ System._out.println(“第一次获取锁成功”); //加锁 .lock() | .tryLock() try { __ _if(rk.tryLock()){ System._out.println(“第二次获取锁成功”); _} }catch (Exception e){ e.printStackTrace(); }finally { **//释放锁 rk.unlock**(); } } }catch (Exception e){ e.printStackTrace(); }finally { **//释放锁 rk.unlock**(); } } }.start(); } _}** |
---|
2. 死锁演示
public class MyReentrantLock2 { _ _static int a=0; public static void main(String[] args) { __ //获取重入锁 _ReentrantLock rk = new ReentrantLock**(); new Thread(){ @Override public void run() { **//加锁 .lock() | .tryLock() rk.lock**(); for (int i = 0; i < 100000; i++) { **_a++; **_} int **i =1/0;//出现异常不会释放锁 rk.unlock**(); } }.start(); new Thread(){ @Override public void run() { **//加锁 .lock() | .tryLock() rk.lock**()**; //获取不到锁出现死锁 **for (int i = 0; i < 100000; i++) { **_a++; } __ rk.unlock(); } __ }.start(); try { __ Thread.sleep(2000); System.out.println(a); } _catch (_InterruptedException e) { __ e.printStackTrace(); } } } |
---|
2.3.2 Condition 等待|唤醒
如果说wait和notify是针对synchronized关键而研制的
Condition就是为了Lock而生的
创建: _ReentrantLock rk = new ReentrantLock**()**; Condition condition = rk. newCondition**();
线程等待: condition.await();
唤醒一个线程 | 唤醒所有线程:
condition.signal()**; //唤醒一个线程
condition.signalAll**()**; //唤醒所有线程_
1. 代码演示
public class MyCondition { _ _public static void main(String[] args) { __ //创建重入锁 _ReentrantLock rk = new ReentrantLock**()**; //等待 | 唤醒 Condition condition = rk.newCondition**()**; //线程1 new Thread**(){ @Override public void run() { try { rk.lock(); System._out.println(“线程1开始等待线程2执行”); condition.await(); //线程等待 _Thread._sleep(1000); System.out.println(“线程1结束”); }_catch (Exception e){ e.printStackTrace(); }finally { rk.unlock(); } } }.start()**; //线程2 new Thread**(){ @Override public void run() { try { rk.lock(); System._out.println(“线程2开始执行”); System.out.println(“线程2执行结束”); Thread.sleep(1000); condition.signal(); //唤醒一个线程 }_catch (Exception e){ e.printStackTrace(); }finally { rk.unlock(); } } }.start(); } _}** |
---|
2.3.3 ReadWriteLock 读写锁
ReadWriteLock为接口, 实现类为ReentrantReadWriteLock
ReadLock 读锁,又称为共享锁。允许多个线程同时获取
WriteLock 写锁,又称为独占锁。同时只有一个线程能获取,其他线程等待, 避免死锁
注意: 读写锁, 实际含义为是否能有多个线程同时获取
读锁转写锁称为锁升级。ReadWriteLock不支持锁升级。也就是说加了读锁后,不允许在添加写锁。
写锁转读锁称为锁降级。ReadWriteLock支持锁降级。也就是说加了写锁以后还可以添加读锁。
创建: ReentrantReadWriteLock rk = new ReentrantReadWriteLock();
读锁:
获取读锁 rk.readLock()
设置读锁
加锁 rk.readLock().lock(); | rk.readLock().tryLock();
解锁 rk.readLock().unlock();
写锁:
获取写锁: rk.writeLock();
设置写锁
加锁 rk.writeLock().lock(); | rk.writeLock().tryLock();
解锁 rk.writeLock().unlock();
1. 读锁(共享锁)演示
可以多个线程同时获取
public class MyReadWriteLock { _ _public static void main(String[] args) { __ //创建读写锁 _ReentrantReadWriteLock rk = new ReentrantReadWriteLock**(); new Thread(){ @Override public void run() { try { **//添加读锁(共享锁) rk.readLock**().lock(); System._out.println(“线程1: 进行了操作”); Thread.sleep(3000); } _catch (InterruptedException e) { e.printStackTrace(); }finally { **//解锁(读锁) rk.readLock**().unlock(); } } }.start(); new Thread(){ @Override public void run() { try { **//添加读锁(共享锁) rk.readLock**().lock(); System._out.println(“线程2: 进行了操作”); } _catch (Exception e) { e.printStackTrace(); }finally { **//解锁(读锁) rk.readLock**().unlock(); } } }.start(); } _}** |
---|
2. 写锁(独占锁)演示
只能有一个线程获取
public class MyReadWriteLock1 { _ _public static void main(String[] args) { __ //创建读写锁 _ReentrantReadWriteLock rk = new ReentrantReadWriteLock**(); new Thread(){ @Override public void run() { try { **//添加写锁(独占锁) rk.writeLock**().lock(); System._out.println(“线程1: 进行了操作”); Thread.sleep(3000); } _catch (InterruptedException e) { e.printStackTrace(); }finally { **//解锁(写锁) rk.writeLock**().unlock(); } } }.start(); new Thread(){ @Override public void run() { try { **//添加写锁(独占锁) rk.writeLock**().lock(); System._out.println(“线程2: 进行了操作”); } _catch (Exception e) { e.printStackTrace(); }finally { **//解锁(写锁) rk.writeLock**().unlock(); } } }.start(); } _} ** |
---|
2.3.4 LockSupport 暂停|恢复
LockSupport是Lock中专门实现线程暂停和线程恢复。
suspend和resume是专门实现synchronized暂停和恢复一样。
注意: 暂停和恢复没有锁操作, 暂停恢复不会释放锁, 避免死锁问题
暂停:
LockSupport.park()
恢复:
LockSupport.unpark(t1);
1. 代码演示
public class MyLockSupport { _ _public static void main(String[] args) { __ Thread t1 = new Thread() { __ @Override public void run() { __ System.out.println(“要开车啦, 快上车”); LockSupport.park(); //线程暂停 _System.**_out.println(“已经开车了”); } __ }; try { __ t1.start(); Thread.sleep(5000); LockSupport.unpark(t1); //线程恢复 } _catch (InterruptedException e) { e.printStackTrace(); } } _}** |
---|
2.4 synchronized和lock的区别(面试题)
1. 类型不同
synchronized是关键字。Lock是接口
2. 加锁和解锁机制同步
synchronized是自动加锁和解锁,程序员不需要控制。Lock必须由程序员控制加锁和解锁过程
3. 异常机制
synchronized碰到没有处理的异常,会自动解锁,不会出现死锁。Lock碰到异常不会自动解锁,可能出现死锁。所以写Lock锁时都是把解锁放入到finally中。4. Lock功能更强大
Lock里面提供了tryLock()/isLocked()方法,进行判断是否上锁成功。synchronized因为是关键字,所以无法判断。
5.Lock性能更优
如果多线程竞争锁特别激烈时,Lock的性能更优。如果竞争不激烈,性能相差不大。
6.线程通信方式不同
synchronized 使用wait和notify线程通信。Lock使用Condition的awake和signal通信。
7. 暂停和恢复方式不同
synchronized 使用suspend和resume暂停和恢复,这俩方法过时了。Lock使用LockSupport中park和unpark暂停和恢复,这俩方法没有过时。
三. JUC中的Tools
3.1 Tools介绍
Tools也是JUC中的工具类, 其中包含了CountDownLatch、CyclicBarrier、Semaphore
3.2 CountDownLatch 计数器
在开发中经常遇到在主线程中开启多个线程去并行执行任务,并且主线程需要等待所有子线程执行完毕后再进行汇总的场景。之前是使用join()来实现的,但是不够灵活,某些场合和还无法实现,所以开发了CountDownLatch这个类。底层基于AQS。
CountDown是计数递减的意思,Latch是门闩的意思。内部维持一个递减的计数器。可以理解为初始有n个Latch,等Latch数量递减到0的时候,就结束阻塞执行后续操作。
创建:
CountDownLatch cdl= new CountDownLatch(2);
参数为: 计数器的初始值
线程等待:
cdl.await():导致当前线程等待,直到到Latch计数到零,或者被interrupt
计数器递减:
cdl.countDown( ):减少Latch的计数,如果计数达到零,释放所有等待的线程
1. 代码演示
public class MyCountDownLatch { _ _public static void main(String[] args) { __ //创建计数器类, 并赋予初始值 _CountDownLatch cdl = new CountDownLatch**(2); new Thread(){ @Override public void run() { try { System._out.println(“线程1执行完毕”); Thread.sleep(3000); cdl.countDown(); //计数器-1 } _catch (InterruptedException e) { e.printStackTrace(); } } }.start(); new Thread(){ @Override public void run() { try { System._out.println(“线程2执行完毕”); Thread.sleep(3000); cdl.countDown(); //计数器-1 } _catch (InterruptedException e) { e.printStackTrace(); } } }.start(); try { **//主线程阻塞, 直到计数器变为0, 或者执行了interrupt(结束当前线程) cdl.await**(); System._out.println(“主线程执行”); } _catch (InterruptedException e) { e.printStackTrace(); } } }_** |
---|
3.3 CyclicBarrier回环屏障
CountDownLatch优化了join()在解决多个线程同步时的能力,但CountDownLatch的计数器是一次性的。计数递减为0之后,再调用countDown()、await()将不起作用。为了满足计数器可以重置的目的,JDK推出了CyclicBarrier类。
await()方法表示当前线程执行时计数器值不为0则等待。如果计数器为0则继续执行。每次await()之后计算器会减少一次。当减少到0下次await从初始值重新递减。
假设多个任务都有三个阶段组成,多个线程分别指向一个任务,必须保证每个任务的一个阶段结束后,才进入下一个阶段。此时使用CyclicBarrier正合适
1. 代码演示
/ 1.裁判: 比赛开始 2.球员: 开始行动 3.球员: 球员犯规 4.裁判: 吹哨 5.裁判: 出示红牌 6.球员: 接受红牌 */ public class MyCyclicBarrier { _ _public static void main(String[] args) { __ //创建循环屏障 _CyclicBarrier cb = new CyclicBarrier**(2)**; //线程一: new Thread**(){ @Override public void run() { try { System._out.println(“裁判: 比赛开始”); cb.await(); // 1.cb-1=1, 不为0, 阻塞 _Thread._sleep(500); System.out.println(“裁判: 吹哨”); cb.await(); _//4. cb-1=0, cd为0, cb重新赋值为2, 继续执行 _System.out.println(“裁判: 出示红牌”);//5.执行 } _catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }.start()**; //线程二 new Thread**(){ @Override public void run() { try { System._out.println(“球员: 开始行动”); cb.await(); _//2.cb-1=0, cd为0, cb重新赋值为2, 继续执行 _System.out.println(“球员: 球员犯规!”); cb.await(); //3.cb-1=1, 不为0, 阻塞 _Thread._sleep(500); System.out.println(“球员: 接受红牌”); //6.执行 } _catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }.start(); } _}** |
---|
3.4 Semaphore 信号量
CountDownLatch和CyclicBarrier的计数器递减的,而Semaphore的计数器是可加可减的,并可指定计数器的初始值,并且不需要事先确定同步线程的个数,等到需要同步的地方指定个数即可。且Semaphore也具有回环重置的功能,这一点和CyclicBarrier很像。底层也是基于AQS。
创建:
Semaphore sp= new Semaphore(10);
参数为初始值
获取信号量的值:
sp.availablePermits()
增加信号量:
sp.release(); 信号量+1
sp.release(n); 信号量+n
减少信号量:
sp.acquire(); 信号量-1,无返回值 | sp.tryAcquire(); 信号量-1, 有返回值
sp.acquire(n); 信号量-n,无返回值 | sp.tryAcquire(n); 信号量-n, 有返回值
1. 代码演示
public class MySemaphore { _ _public static void main(String[] args) { __ _try { _ Semaphore sp = new Semaphore(10); System.out.println(sp.availablePermits()); //当前信号量的值 10 _sp.release**()**; //信号量+1 System.**_out.println(sp.availablePermits()); _//当前信号量的值 11 _sp.release(10); _//信号量+10 _System.out.println(sp.availablePermits()); _//当前信号量的值 21 _sp.acquire(); _//信号量-1 _System.out.println(sp.availablePermits()); //当前信号量的值 20 boolean b = sp.tryAcquire(15); System.out.println(b+“-“+sp.availablePermits()); _//当前信号量的值 true-5 / acquire(n): 信号量不足 <0, 阻塞等待 * sp.tryAcquire(n): 信号量不足 <0, 返回false, 继续执行 * */ _sp.acquire(5); } _catch (InterruptedException e) { e.printStackTrace(); } } _}** |
---|
四. 并发集合类
4.1 介绍
并发集合类:主要是提供线程安全的集合, 比如:
ArrayList对应的并发类是CopyOnWriteArrayList
HashSet对应的并发类是 CopyOnWriteArraySet
HashMap对应的并发类是ConcurrentHashMap
这些类的方法API和之前学习的ArrayList、HashSet、HashMap的API是相同的,所以重在实现原理上, 而不是API的使用上
4.2 CopyOnWriteArrayList
4.2.1 ArrayList
ArrayList是最常用的集合之一,大小不固定,可以随着元素的增多可以自动扩容。储存的数据为有序, 可重复. 底层实现是基于数组, 线程不安全
4.2.1 CopyOnWriteArrayList
使用方式和ArrayList相同, CopyOnWriteArrayList线程为安全的
写时复制
通俗的理解是当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器,然后往新的容器里添加元素
添加完元素之后,再将原容器的引用指向新的容器。这样做的好处是我们可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器。
对于读操作远远多于写操作的应用非常适合,特别在并发情况下,可以提供高性能的并发读取。
CopyOnWrite容器只能保证数据的最终一致性,不能保证数据实时一致性。所以如果你希望写入的的数据,马上能读到,请不要使用CopyOnWrite容器。
4.3.2 CopyOnWriteArrayList重点源码
public class CopyOnWriteArrayList implements List __ //声明了一个不可改变的对象 final transient Object lock = new Object(); //volatile修饰的Object类型的数组, 保证了数组的可见性, 有序性 private transient volatile Object_[] _array; //获取当前的数组 final Object[] getArray() { _ _return array; } __ //设置数组 final void setArray(Object[] a) { _ _array = a; } //添加元素, 同步执行, 每次只能一个线程添加数据 public boolean add(E e) { __ //加锁 synchronized (_lock) { _ //获取当前的数组 _Object**[] es = getArray()**; //获取数组的长度 int len = es.length; //复制旧数组, 长度+1, 创建一个新数组 es = Arrays._copyOf(es, len + 1); //根据下标, 将添加的元素放入 _es**[len] **= e; //将新数组设置为当前的数组 setArray**(es); return true; } } **//获取元素, 根据下标获取, 支持多线程查询 public E get**(int index) { return elementAt(getArray(), index); } _}** |
---|
4.3 CopyOnWriteArraySet源码分析
4.3.1 HashSet
HashSet为有序(插入无序, 读取有序), 不可重复的集合, 线程不安. 底层实现为(HashMap)
4.3.2 CopyOnWriteArraySet
它是线程安全的HashSet, CopyOnWriteArraySet则是通过”动态数组(CopyOnWriteArrayList)”实现的,并不是散列表
CopyOnWriteArraySet在CopyOnWriteArrayList 的基础上使用了Java的装饰模式,所以底层是相同的。而CopyOnWriteArrayList本质是个动态数组队列,所以CopyOnWriteArraySet相当于通过通过动态数组实现的Set!
CopyOnWriteArrayList中允许有重复的元素;但CopyOnWriteArraySet是一个Set集合,所以它不能有重复数据。因此,CopyOnWriteArrayList额外提供了addIfAbsent()和addAllAbsent()这两个添加元素的API,通过这些API来添加元素时,只有当元素不存在时才执行添加操作!
4.3.3 CopyOnWriteArraySet重点源码
public class CopyOnWriteArraySet implements java.io.Serializable { //底层由CopyOnWriteArrayList实现 private final CopyOnWriteArrayList public boolean add(E e) { __ //如果不存在, 添加(true:添加成功, false:不添加) return al.addIfAbsent(e); } } |
---|
public class CopyOnWriteArrayList implements List _ _private transient volatile Object_[] _array; //获取当前的数组 final Object[] getArray() { _ _return array; } _ _public boolean addIfAbsent(E e) { __ Object[] snapshot = getArray(); / &&逻辑与, 只要第一个返回false, 直接返回false 第一个判断, 当前添加的元素是否存在: 存在返回这个元素的下标 不存在返回-1 第二个判断, 如果不存在添加元素 添加成功, true 添加失败, false */ return indexOfRange(e, snapshot, 0, snapshot.length) < 0 && addIfAbsent(e, snapshot); } __ / 参数一: 要添加的元素 参数二: 当前的数组 参数三: 0 参数四: 数组的长度 */ private static int indexOfRange(Object o, Object[] es, int from, int to) { __ //添加的元素是否为null if (o == null) { __ //循环遍历, 判断null是否存在 for _(_int i = from; i < to; i++) __ _if (_es[i] == null) __ //返回null的下标 return i; } _else { _ //循环判断当前的元素是否存在 for _(_int i = from; i < to; i++) __ _if (_o.equals(es[i])) __ //存在返回该元素的下标 return i; } __ //如果不存在返回-1 return -1; } _ _private boolean addIfAbsent(E e, Object[] snapshot) { __ _synchronized (_lock) { __ Object[] current = getArray(); int len = current.length; if (snapshot != current) { __ // Optimize for lost race to another addXXX operation int common = Math.min(snapshot.length, len); for _(_int i = 0; i < common; i++) __ _if (_current[i] != snapshot[i] __ && Objects.equals(e, current[i])) _ _return false; if (indexOfRange(e, current, common, len) >= 0) _ _return false; } __ Object[] newElements = Arrays.copyOf(current, len + 1); newElements[len] = e; setArray(newElements); return true; } } } |
4.4 ConcurrentHashMap
4.4.1 HashMap
HashMap也是使用非常多的Collection,线程不安全,以key-value的形式存在。在HashMap中,底层实现为哈希表,系统会根据hash算法来来计算key的存储位置,我们可以通过key快速地存、取value, 允许一个key-value为null
HashMap JDk1.7以及1.7之前
HashMap 底层是基于 数组+链表
组成的
HashMap JDk1.8以及1.8之后
HashMap 底层是基于 数组+链``表``+``红黑树
组成的
当 Hash 冲突严重时,在桶上形成的链表会变的越来越长,这样在查询时的效率就会越来越低, 达到一定的条件, 就会由链表转换为红黑树, 提高查询的效率
4.4.2 HashMap1.8及以后重点源码
final V putVal_(_int hash, K key, V value, boolean onlyIfAbsent, boolean evict) { __ //初始化桶大小,因为底层是数组,所以这是数组默认的大小 static final int DEFAULTINITIAL_CAPACITY = 1 << 4; // aka 16 //桶最大值 static final int MAXIMUM_CAPACITY = 1 << 30; //默认的负载因子 static final float DEFAULT_LOAD_FACTOR = 0.75f; //变为红黑树的阈值 static final int TREEIFY_THRESHOLD = 8; //红黑树变回链表的阈值 static final int UNTREEIFY_THRESHOLD = 6; //最小的数容量, 只有table数组的长度大于64,才会发生转化 static final int MIN_TREEIFY_CAPACITY = 64; HashMap.Node**<_**K,V**_>[] tab; HashMap.Node<_**K,V**_> p; int **n, i; //判断当前桶是否为空,空的就需要初始化 **if ((tab = table) == null || (n = tab.length) == 0) n = (tab = resize())**.length; / 根据当前 key 的 hashcode 定位到具体的桶中并判断是否为空, 为空表明没有 Hash 冲突就直接在当前位置创建一个新桶即可 (数组最大下标与当前key的hash值按位与计算, 如果数组最大下标为15,那么不管Hash(key)是多少都不会大于15) / **if ((p = tab[i = (n - 1) & hash]) _*== null_) tab[i] = newNode(hash, key, value, null); else { **//出现hash碰撞 HashMap.Node**<_**K,V**_> **e; K k; / 桶中key的hashcode和写入key的hashcode相等, 写入的key不为空并且桶中key和写入的key值相等, 写入的key覆盖赋值给e, 后续完成统一复制的操作 / **if (p.hash == hash && ((k = p.key) == key || (key != null **&& key.equals(k)))) e = p; //如果当前桶为红黑树,那就要按照红黑树的方式写入数据 else if (p instanceof TreeNode) e = ((HashMap.TreeNode else { for (int binCount = 0; ; ++binCount) { if ((e = p.next) == null) { p.next = newNode(hash, key, value, null); //接着判断当前链表的大小是否大于预设的阈值,大于时就要转换为红黑树 if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st treeifyBin(tab, hash); break; } //如果在遍历过程中找到 key 相同时直接退出遍历 if (e.hash == hash && ((k = e.key) == key || (key != null && key.equals(k)))) break; p = e; } } //如果 e != null 就相当于存在相同的 key,那就需要将值覆盖 if (e != null) { // existing mapping for key V oldValue = e.value; if (!onlyIfAbsent || oldValue == null) e.value = value; afterNodeAccess(e); return oldValue; } } ++modCount; //最后判断是否需要进行扩容 _if (++size > threshold) resize(); afterNodeInsertion(evict); return null; } |
---|
4.4.3 HashTable
HashTable和HashMap的实现原理几乎一样,差别无非是
1. HashTable不允许key和value为null;
2. HashTable是线程安全的。
但是HashTable线程安全的策略实现代价却太大了,简单粗暴,get/put所有相关操作都是synchronized的,这相当于给整个哈希表加了一把大锁,多线程访问时候,只要有一个线程访问或操作该对象,那其他线程只能阻塞,相当于将所有的操作串行化,在竞争激烈的并发场景中性能就会非常差
4.4.4 ConcurrentHashMap 1.7及之前
ConcurrentHashMap采用了非常精妙的”分段锁”策略
Segment继承了ReentrantLock,所以它就是一种可重入锁(ReentrantLock)。在ConcurrentHashMap,一个Segment就是一个子哈希表,Segment里维护了一个HashEntry数组,并发环境下,使用多个锁来控制对hash表的不同部分(段segment)进行的修改,如果多个修改操作发生在不同的段上,他们就可以并发进行,从而提高了效率
4.4.5 ConcurrentHashMap 1.8及之后
ConcurrentHashMap在JDK8中进行了巨大改动。它摒弃了Segment(锁段)的概念,而是启用了一种全新的方式实现, 利用synchronized + CAS, 如果没有出现hash冲突, 使用CAS直接添加数据, 只有出现hash冲突的时候才会使用同步锁添加数据, 又提升了效率, 它底层由”数组”+链表+红黑树的方式思想(JDK8中HashMap的实现), 为了做到并发,又增加了很多辅助的类,例如TreeBin,Traverser等对象内部类
Node节点: 默认桶上的结点就是Node结点。Node只有一个next指针,是一个单链表,提供find方法实现链表查询 当出现hash冲突时,Node结点会首先以链表的形式链接到table上,当结点数量超过一定数目时,链表会转化为红黑树。
TreeNode节点: TreeNode就是红黑树的结点,TreeNode不会直接链接到table[i]——桶上面,而是由TreeBin链接,TreeBin会指向红黑树的根结点。
TreeBin节点: TreeBin会直接链接到table[i]——桶上面,该结点提供了一系列红黑树相关的操作,以及加锁、解锁操作。
ForwardingNode: ForwardingNode 在table扩容时使用,内部记录了扩容后的table,即nexttable
ReservationNode: 在并发场景下、在从 Key不存在 到 插入 的 时间间隔内,为了防止哈希槽被其他线程抢占,当前线程会使用一个reservationNode节点放到槽中并加锁,从而保证线程安全
4.4.6 ConcurrentHashMap 1.8及之后重点源码
final V putVal(K key, V value, boolean onlyIfAbsent) { __ //key-value都为空, 抛出异常 if (key == null || value == null_) _throw new NullPointerException(); //计算keu的hash值 int hash = spread(key.hashCode()); / 使用链表保存时,binCount记录table[i]这个桶中所保存的结点数; 使用红黑树保存时,binCount == 2,保证put后更改计数值时能够进行扩容检查,同时不触发红黑树化操作 / int binCount = 0; //循环遍历桶 for (ConcurrentHashMap.Node<K,V>[] tab = table;;) { __ ConcurrentHashMap.Node<K,V> f; int n, i, fh; K fk; V fv; //判断当前桶是否为空,空的就需要初始化 if (tab == null || (n = tab.length) == 0) __ tab = initTable(); //计算 key 的 hash 值,通过(n - 1) & hash计算key存放的位置, 存储的位置为空,使用cas直接插入 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { __ _if (_casTabAt(tab, i, null, new ConcurrentHashMap.Node<K,V>(hash, key, value))) _ _break; // no lock when adding to empty bin } __ //发现是ForwardingNode结点,说明此时table正在扩容,则尝试协助数据迁移 else if ((fh = f.hash) == MOVED) __ tab = helpTransfer(tab, f); else if (onlyIfAbsent // check first node without acquiring lock && fh == hash && ((fk = f.key) == key || (fk != null && key.equals(fk))) __ && (fv = f.val) != null) _ _return fv; //出现hash冲突,也就是table[i]桶中已经曾经添加了Node节点, 加锁, 添加数据 else { __ V oldVal = null; synchronized (f) { __ _if (_tabAt(tab, i) == f) { __ _if (_fh >= 0) { __ binCount = 1; for (ConcurrentHashMap.Node<K,V> e = f;; ++binCount) { __ K ek; // 出现hash冲突, 就会找到“相等”的结点,判断是否需要更新value值 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { __ oldVal = e.val; if (!onlyIfAbsent) __ e.val = value; break; } __ //插入数据 _ConcurrentHashMap.Node**<_**K,V**_> pred = e; if ((e = e.next) == null) { pred.next = new ConcurrentHashMap.Node<_**K,V**_>(hash, key, value); break; } } } **//如果当前桶为红黑树,那就要按照红黑树的方式写入数据 **else if (f instanceof ConcurrentHashMap.TreeBin) { ConcurrentHashMap.Node<_**K,V**_> p; binCount = 2; if ((p = ((ConcurrentHashMap.TreeBin<_**K,V**_>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } else if (f instanceof ConcurrentHashMap.ReservationNode) throw new IllegalStateException(“Recursive update”); } } **// 如果链表中节点个数达到阈值,链表转化为红黑树 **if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } **// 计数值加1 addCount**(1L, binCount); return null; }_** |
---|
4.5 综合对比
JDK1.8的实现降低锁的粒度,JDK1.7版本锁的粒度是基于Segment的,包含多个HashEntry,而JDK1.8锁的粒度就是HashEntry(首节点)
JDK1.8版本的数据结构变得更加简单,使得操作也更加清晰流畅,因为已经使用synchronized来进行同步,所以不需要分段锁的概念,也就不需要Segment这种数据结构了,由于粒度的降低,实现的复杂度也增加了
JDK1.8使用红黑树来优化链表,基于长度很长的链表的遍历是一个很漫长的过程,而红黑树的遍历效率是很快的,代替一定阈值的链表,这样形成一个最佳拍档
JDK1.8使用内置锁synchronized来代替重入锁ReentrantLock因为在少量锁竞争时sychronized的性能和ReentrantLock相差不大