散列表是由数组演化而来的,利用了数组按照下标随机访问时的时间复杂度是 O(1) 的特性。我们通过散列函数把元素的键值映射为下标,然后将数据存储在数组中对应下标的位置。当我们按照键值查询元素时,我们用同样的散列函数将键值转化数组下标,从对应的数组下标的位置取数据。
散列函数
散列函数在散列表中起着非常关键的作用。散列函数,顾名思义,它是一个函数。我们可以把它定义成 hash(key),其中 key 表示元素的键值,hash(key) 的值表示经过散列函数计算得到的散列值。那么我们应该如何构造散列函数呢?下面总结了三点散列函数设计的基本要求:
- 散列函数计算得到的散列值是一个非负整数
- 如果 key1 = key2,那 hash(key1) == hash(key2)
- 如果 key1 ≠ key2,那 hash(key1) ≠ hash(key2)
其中,第一点理解起来应该没有问题。因为数组下标是从 0 开始的,所以散列函数生成的散列值也要是一个非负整数。第二点也很好理解,相同的 key 经过散列函数得到的散列值也应该是相同的。针对第三点,在真实的情况下,要想找到一个不同的 key 对应的散列值都不一样的散列函数,几乎是不可能的。即便像业界著名的 MD5、SHA、CRC 等哈希算法,也无法完全避免这种散列冲突。而且因为数组的存储空间有限,也会加大散列冲突的概率。所以我们几乎无法找到一个完美的无冲突的散列函数,即便能找到,付出的时间成本、计算成本也是很大的,所以针对散列冲突问题,我们需要通过其他途径来解决。
散列冲突
再好的散列函数也无法避免散列冲突,当遇到散列冲突问题时,我们常用的散列冲突解决方法有两类,开放寻址法(open addressing)和链表法(chaining)。
1. 开放寻址法
开放寻址法的核心思想是:如果出现了散列冲突,我们就重新探测一个空闲位置,将其插入。那如何重新探测新的位置呢?方法有如下三种:
1)线性探测
当我们往散列表中插入数据时,如果某个数据经过散列函数散列之后,存储位置已经被占用了,我们就从当前位置开始依次往后查找,看是否有空闲位置,直到找到为止。如下图所示,图中黄色的色块表示空闲位置,橙色的色块表示已经存储了数据。
从图中可以看出,散列表的大小为 10,在元素 x 插入散列表之前,已经 6 个元素插入到散列表中。x 经过 Hash 算法之后,被散列到位置下标为 7 的位置,但是这个位置已经有数据了,所以就产生了冲突。于是我们就顺序地往后一个一个找,看有没有空闲的位置,遍历到尾部都没有找到空闲的位置,于是我们再从表头开始找,直到找到空闲位置 2,于是将其插入到这个位置。
在散列表中查找元素的过程有点儿类似插入过程。我们通过散列函数求出要查找元素的键值对应的散列值,然后比较数组中下标为散列值的元素和要查找的元素。如果相等,则说明就是我们要找的元素;否则就顺序往后依次查找。如果遍历到数组中的空闲位置,还没有找到,就说明要查找的元素并没有在散列表中。
散列表跟数组一样,不仅支持插入、查找操作,还支持删除操作。对于使用线性探测法解决冲突的散列表,删除操作稍微有些特别。我们不能单纯地把要删除的元素设置为空。因为在查找时,一旦我们通过线性探测方法找到一个空闲位置,我们就可以认定散列表中不存在这个数据。但如果这个空闲位置是我们后来删除的,就会导致原来的查找算法失效。本来存在的数据,会被认定为不存在。这个问题如何解决呢?我们可以将删除的元素特殊标记为 deleted。当线性探测查找时,遇到标记为 deleted 的空间,并不是停下来,而是继续往下探测。
线性探测法其实存在很大问题。当散列表中插入的数据越来越多时,散列冲突发生的可能性就会越来越大,空闲位置会越来越少,线性探测的时间就会越来越久。极端情况可能需要探测整个散列表,所以最坏情况下的时间复杂度为 O(n)。同理在删除和查找时,也有可能会线性探测整张散列表,才能找到要查找或者删除的数据。
2)二次探测
所谓二次探测,跟线性探测很像,线性探测每次探测的步长是 1,那它探测的下标序列就是 hash(key)+0,hash(key)+1,hash(key)+2……而二次探测探测的步长就变成了原来的“二次方”,也就是说,它探测的下标序列就是 hash(key)+0,hash(key)+12,hash(key)+22……
3)双重散列
双重散列的意思就是不仅要使用一个散列函数,而是使用一组散列函数 hash1(key),hash2(key),hash3(key) …。我们先用第一个散列函数,如果计算得到的存储位置已被占用,再用第二个散列函数,依次类推,直到找到空闲的存储位置。
不管采用哪种探测方法,当散列表中空闲位置不多的时候,散列冲突的概率就会大大提高。为了尽可能保证散列表的操作效率,一般情况下,我们会尽可能保证散列表中有一定比例的空闲槽位。我们用装载因子(Load Factor)来表示空位的多少。装载因子越大,说明空闲位置越少,冲突越多,散列表的性能会下降。
2. 链表法
链表法是一种更加常用的散列冲突解决办法,相比开放寻址法要简单很多。如下图所示,在散列表中,每个桶(bucket)或槽(slot)会对应一条链表,所有散列值相同的元素我们都放到相同槽位对应的链表中。
当插入的时候,我们只需要通过散列函数计算出对应的散列槽位,将其插入到对应链表中即可,所以插入的时间复杂度是 O(1)。当查找、删除一个元素时,我们同样通过散列函数计算出对应的槽,然后遍历链表查找或者删除即可。这两个操作的时间复杂度跟链表的长度 k 成正比,也就是 O(k)。对于散列比较均匀的散列函数来说,理论上讲,k=n/m,其中 n 表示散列中数据的个数,m 表示散列表中“槽”的个数。
3. 使用场景
开放寻址法
开放寻址法不像链表法,需要拉很多链表,数据都存储在数组中,可以有效地利用 CPU 缓存加快查询速度。而且这种方法实现的散列表,序列化起来比较简单。链表法包含指针,序列化起来就没那么容易。
但用开放寻址法解决冲突的散列表,删除数据时需要特殊标记已经删除掉的数据。而且所有的数据都存储在一个数组中,相比链表法冲突的代价更高,比链表法更浪费内存空间。因此当数据量比较小、装载因子比较小的时候,适合采用开放寻址法。这也是 ThreadLocalMap 使用开放寻址法解决散列冲突的原因。
链表法
首先,链表法对内存的利用率比开放寻址法要高,因为链表结点可以在需要的时候再创建。其次,链表法比开放寻址法对大装载因子的容忍度更高。只要散列函数的值随机均匀,即便装载因子变成 10,也就是链表的长度变长了而已,虽然查找效率有所下降,但比顺序查找还是快很多。但由于链表中的结点是零散分布在内存中的,不是连续的,所以对 CPU 缓存是不友好的,这方面对于执行效率也有一定的影响。
实际上,我们对链表法稍加改造,可以实现一个更加高效的散列表。那就是,我们将链表法中的链表改造为其他高效的动态数据结构,比如跳表、红黑树。这样,即便出现散列冲突,极端情况下,所有的数据都散列到同一个桶内,那最终退化成的散列表的查找时间也只不过是 O(logn)。这样也就有效避免了前面讲到的散列碰撞攻击。
因此基于链表的散列冲突处理方法比较适合存储大对象、大数据量的散列表,而且,比起开放寻址法,它更加灵活,支持更多的优化策略,比如用红黑树代替链表。
散列表设计原则
我们知道,散列表的查询效率并不能笼统地说成是 O(1)。它跟散列函数、装载因子、散列冲突等都有关系。如果散列函数设计得不好,或者装载因子过高,都可能导致散列冲突发生的概率升高,查询效率下降。
1. 散列函数的设计
散列函数设计的好坏,决定了散列表冲突的概率大小,也直接决定了散列表的性能。那什么才算是一个好的散列函数呢?其设计原则有如下几点:
- 散列函数的设计不能太复杂。过于复杂的散列函数,势必会消耗很多计算时间,也就间接地影响到散列表的性能。
- 散列函数生成的值要尽可能随机且均匀分布,这样才能避免或者最小化散列冲突,而且即便出现冲突,散列到每个槽里的数据也会比较平均,不会出现某个槽内数据特别多的情况。
2. 装载因子
装载因子越大,说明散列表中的元素越多,空闲位置越少,散列冲突的概率就越大。不仅插入数据的过程要多次寻址或者拉很长的链,查找的过程也会因此变得很慢。而对于一个动态散列表来说,数据集合是频繁变动的,我们事先无法预估将要加入的数据个数,所以我们也无法事先申请一个足够大的散列表。随着数据慢慢加入,装载因子就会慢慢变大。当装载因子大到一定程度之后,散列冲突就会变得不可接受。
针对散列表,当装载因子过大时,我们可以进行动态扩容,重新申请一个更大的散列表,将数据搬移到这个新散列表中。假设每次扩容我们都申请一个原来散列表大小两倍的空间。如果原来散列表的装载因子是 0.8,那经过扩容之后,新散列表的装载因子就下降为原来的一半,变成了 0.4。
针对数组的扩容,数据搬移操作比较简单。但针对散列表的扩容,数据搬移操作要复杂很多。因为散列表的大小变了,数据的存储位置也变了,所以我们需要通过散列函数重新计算每个数据的存储位置。如下图所示,在原来的散列表中,21 这个元素原来存储在下标为 0 的位置,搬移到新的散列表中,存储在下标为 7 的位置。
对于支持动态扩容的散列表,插入操作的时间复杂度是多少呢?插入一个数据,最好情况下,不需要扩容,所以最好时间复杂度是 O(1)。最坏情况下,散列表装载因子过高,启动扩容,我们需要重新申请内存空间,重新计算哈希位置,并且搬移数据,所以时间复杂度是 O(n)。用摊还分析法,均摊情况下,时间复杂度接近最好情况,就是 O(1)。
实际上,对于动态散列表,随着数据的删除,散列表中的数据会越来越少,空闲空间会越来越多。如果我们对空间消耗非常敏感,我们可以在装载因子小于某个值之后,启动动态缩容。当然,如果我们更加在意执行效率,能够容忍多消耗一点内存空间,那就可以不用费劲来缩容了。
装载因子阈值需要选择得当。如果太大,会导致冲突过多;如果太小,会导致内存浪费严重。装载因子阈值的设置要权衡时间、空间复杂度。如果内存空间不紧张,对执行效率要求很高,可以降低负载因子的阈值;相反,如果内存空间紧张,对执行效率要求又不高,可以增加负载因子的值,甚至可以大于 1。
3. 避免低效扩容
大部分情况下,动态扩容的散列表插入一个数据都很快,但当装载因子已经到达阈值,需要先进行扩容,再插入数据,这时插入数据就会变得很慢。如果我们的业务代码直接服务于用户,尽管大部分情况下,插入一个数据的操作都很快,但极个别非常慢的插入操作,也会让用户崩溃。这时,“一次性”扩容的机制就不合适了。
为了解决一次性扩容耗时过多的情况,我们可以将扩容操作穿插在插入操作的过程中,分批完成。当装载因子触达阈值之后,我们只申请新空间,但并不将老的数据搬移到新散列表中。当有新数据要插入时,我们将新数据插入新散列表中,并从老的散列表中拿出一个数据放入到新散列表。每次插入一个数据到散列表,然后重复上述过程。经过多次插入操作后,老的散列表中的数据就一点一点全部搬移到新散列表中了。这样没有了集中的一次性数据搬移,插入操作就都变得很快了。
这期间的查询操作,我们先从新散列表中查找,如果没找到,再去老的散列表中查找。通过这样均摊的方法,将一次性扩容的代价,均摊到多次插入操作中,就避免了一次性扩容耗时过多的情况。这种实现方式,在任何情况下,插入一个数据的时间复杂度都是 O(1)。Redis 中的哈希表扩容就是利用了这个思想。
代码实现
public class HashTable<K, V> {/** 散列表默认长度 */private static final int DEFAULT_INIT_CAPACITY = 8;/** 装载因子 */private static final float LOAD_FACTOR = 0.75f;/** 初始化散列表数组 */private Entry<K, V>[] table = (Entry<K, V>[])new Entry[DEFAULT_INIT_CAPACITY];/** 实际元素数量 */private int size = 0;/** 散列表索引数量 */private int use = 0;/*** 新增*/public void put(K key, V value) {int index = hash(key);// 位置未被引用,创建哨兵节点if (table[index] == null) {table[index] = new Entry<>(null, null, null);}Entry<K, V> tmp = table[index];if (tmp.next == null) {tmp.next = new Entry<>(key, value, null);size++;use++;// 动态扩容if (use >= table.length * LOAD_FACTOR) {resize();}} else {// 解决散列冲突,使用链表法do {tmp = tmp.next;// key相同,覆盖旧的数据if (tmp.key == key) {tmp.value = value;return;}} while (tmp.next != null);Entry<K, V> temp = table[index].next;table[index].next = new Entry<>(key, value, temp);size++;}}/*** 散列函数,参考hashmap散列函数*/private int hash(Object key) {int h;return (key == null) ? 0 : ((h = key.hashCode()) ^ (h >>> 16)) % this.table.length;}/*** 扩容*/private void resize() {Entry<K, V>[] oldTable = table;table = (Entry<K, V>[]) new Entry[table.length * 2];use = 0;for (Entry<K, V> kvEntry : oldTable) {if (kvEntry == null || kvEntry.next == null) {continue;}Entry<K, V> e = kvEntry;while (e.next != null) {e = e.next;int index = hash(e.key);if (table[index] == null) {use++;// 创建哨兵节点table[index] = new Entry<>(null, null, null);}table[index].next = new Entry<>(e.key, e.value, table[index].next);}}}/*** 删除*/public void remove(K key) {int index = hash(key);Entry<K, V> e = table[index];if (e == null || e.next == null) {return;}Entry<K, V> pre;Entry<K, V> headNode = table[index];do {pre = e;e = e.next;if (key == e.key) {pre.next = e.next;size--;if (headNode.next == null) {use--;}return;}} while (e.next != null);}/*** 获取*/public V get(K key) {int index = hash(key);Entry<K, V> e = table[index];if (e == null || e.next == null) {return null;}while (e.next != null) {e = e.next;if (key == e.key) {return e.value;}}return null;}private static class Entry<K, V> {K key;V value;Entry<K, V> next;Entry(K key, V value, Entry<K, V> next) {this.key = key;this.value = value;this.next = next;}}}
