散列表map
map 几乎是每门编程语言里都有的数据结构,然而底层的原理却不尽相同。线上环境和 map 相关的常见问题就是并发读写导致的 panic,这背后的考虑和代码实现都在本节进行探讨。
# map 是什么
维基百科里这样定义 map:在计算机科学里,被称为相关数组、map、符号表或者字典,它是 由一组 <key, value>
对组成的抽象数据结构,并且同一个 key 只会出现一次。
这里有两个关键点:map 是由 <key-value>
对组成的;key 只会出现一次。
和 map 相关的操作主要是:
1)增加一个 k-v 对 —— Add or Insert。
2)删除一个 k-v 对 —— Remove or Delete。
3)修改某个 k 对应的 v —— Reassign。
4)查询某个 k 对应的 v —— Lookup。
也就是最基本的增、删、查、改。
map 的设计也被称为 “The dictionary problem”,它的任务是设计一种数据结构用来维护一个集合的数据,并且可以同时对集合进行增删查改的操作。 最主要的数据结构有两种:哈希查找表 (Hash table)、搜索树(Search tree)。
哈希查找表用一个哈希函数将 key 分配到不同的 bucket(桶, 类似于数组中的不同索引)。于是,开销主要在哈希函数的计算以及数组的常数访问时间。在很多场景下,哈希查找表的性能很高。
哈希查找表一般会存在 “碰撞” 的问题,就是说不同的 key 被哈希到了同一个 bucket。一般有两种应对方法:链表法和开放地址法。 链表法将一个 bucket 实现成一个链表, 落在同一个 bucket 中的 key 都会插入这个链表;开放地址法则在碰撞发生后,根据一定的规律,在 bucket 的后面挑选 “空位”,用来放置新的 key。
搜索树一般采用自平衡搜索树,包括:AVL 树、红黑树等。
自平衡搜索树法的最差搜索效率是 O (logN),而哈希查找表是 O (N)。当然,哈希查找表的平 均查找效率是 O (1),如果哈希函数设计得很好,最坏的情况基本不会出现。还有一点,遍历自平衡 搜索树,返回的 key 序列,一般会按照从小到大的顺序;而哈希查找表则是乱序的。
该处我理解一下,实现 map 有两种方式:hash table 和 search tree。实现之后解决 hash 碰撞的问题也有两种方式:链表法和开放地址法。
# map 的底层实现原理是什么
前面说了 map 实现的几种方案,Go 语言采用的是哈希查找表,并且使用链表法解决哈希冲突。接下来的内容将探索 map 的核心原理,一窥它的内部结构。
# 1. map 内存模型
在源码中,表示 map 的结构体是 hmap,它是 hashmap 的缩写:
// src/runtime/map.go
type hmap struct {
// 元素个数,调用 len(map) 时,直接返回此值
count int
flags uint8
// buckets 的对数 log_2
B uint8
// overflow 的 bucket 近似数
noverflow uint16
// 计算 key 的哈希的时候会传入哈希函数
hash0 uint32
// 指向 buckets 数组,大小为 2^B
// 如果元素个数为 0,就为 nil
buckets unsafe.Pointer
// 扩容的时候,buckets 长度会是 oldbuckets 的两倍
oldbuckets unsafe.Pointer
// 指示扩容进度,小于此地址的 buckets 完成迁移
nevacuate uintptr
extra *mapextra
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
B 是 buckets 数组的长度的对数,即 buckets 数组的长度为 2^B,bucket 里面存储了 key 和 value,buckets 是一个指针,指向的是一个结构体:
// src/runtime/map.go
type bmap struct {
tophash [bucketCnt]uint8
}
2
3
4
5
上面代码中的 bucketCnt ,在
src/runtime/map.go
定义的是 8,由此可以判断一个 bucket 中会有 8 个元素。
但这只是 “表面” 的结构,编译器会给它 “加料”,动态地创建一个新的结构:
type bmap struct {
topbits[8]uint8
keys [8]keytype
values [8]valuetype
pad uintptr
overflow uintptr
}
2
3
4
5
6
7
bmap 就是人们常说的 “桶”, 桶里面会最多装 8 个 <key,value>
对。 这些 key 之所以会落入同一个桶, 是因为它们经过哈希计算后, 得到的结果是 “一类” 的, 注意, 哈希值并不是完全相等。在桶内, 又会根据 key 计算出来的 hash 值的高 8 位来决定 key 到底落入桶内的哪个槽位。
Hashmap 整体如图 3-6 所示。
当 map 的 key 和 value 都不是指针,并且 size 都小于 128 字节的情况下,会把 bmap 标记为不含指针, 这样可以避免 GC 时扫描整个 hmap, 提升效率。 但是, bmap 其实有一个 overflow 的字段,是指针类型的,破坏了 bmap 不含指针的设想,这时会把 overflow 移动到 extra 字段来。当 key/value 都不含指针的情况下,启用 overflow 和 oldoverflow 字段。
// src/runtime/map.go
type mapextra struct {
overflow *[]*bmap
oldoverflow *[]*bmap
// nextOverflow 包含空闲的 overflow bucket,这是预分配的 bucket
nextOverflow *bmap
}
2
3
4
5
6
7
8
9
bmap 是存放 k-v 的地方,bmap 的内存模型如图 3-7 所示。
HOB Hash 指的就是 top hash。注意到 key 和 value 是各自放在一起的,并不是 key/value/key/value/... 这样的 形式。这样做的好处是在某些情况下可以省略掉 padding 字段,节省内存空间。
例如,有如下类型的 map:
map[int64]int8
如果按照 key/value/key/value/... 这样的模式存储,那在每一个 key/value 对之后都要额外 padding 7 个字节 (注意,这里为了防止伪共享,需要 “凑齐” 8 字节);而将所有的 key, value 分别放到一起, 使用这种形式 key/key/.../value/value/...,则只需要在最后添加 padding。
padding 7 个字节本人猜测是为了内存对齐?
每个 bucket 设计成最多只能放 8 个 key-value 对, 如果有第 9 个 key-value 落入当前的 bucket,则需要再构建一个 bucket ,并且通过 overflow 指针连接起来。这就是所谓的 “链表法”。
# 2. 创建 map
从语法层面上来说,创建 map 非常简单:
ageMp := make(map[string]int)
// 指定 map 长度
ageMp := make(map[string]int, 8)
// ageMp 为 nil,不能向其添加元素,会直接 panic
var ageMp map[string]int
2
3
4
5
6
7
通过汇编分析可以得知,创建 map 底层调用的是 makemap 函数,主要做的工作是初始化 hmap 结构体的各种字段,例如计算 B 的大小,设置哈希种子 hash0 等。
// src/runtime/map.go
func makemap(t *maptype, hint int, h *hmap) *hmap {
// 检查 bucket 个数乘以 bucket 大小,看是否超出了内存申请的限制
mem, overflow := math.MulUintptr(uintptr(hint), t.bucket.size)
if overflow || mem > maxAlloc {
hint = 0
}
// 初始化 hamp
if h == nil {
h = new(hmap)
}
h.hash0 = fastrand()
// 找到一个 B,使得 map 的装载因子在正常范围内
B := uint8(0)
for overLoadFactor(hint, B) {
B++
}
h.B = B
// 初始化 hash table
// 如果 B 等于 0,那么 buckets 就会在赋值的时候再分配(懒汉式)
// 如果长度比较大,清理内存花费的时间会长一点
if h.B != 0 {
var nextOverflow *bmap
h.buckets, nextOverflow = makeBucketArray(t, h.B, nil)
if nextOverflow != nil {
h.extra = new(mapextra)
h.extra.nextOverflow = nextOverflow
}
}
return h
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
【思考一下】slice 和 map 分别作为函数参数时有什么区别?
Makemap 函数返回的结果 *hmap,是一个指针, 而之前讲过的 makeslice 函数返回的则是 slice 结构体:
func makeslice(et *_type, len, cap int) slice
// 回顾一下 slice 的结构体定义:
// runtime/slice.go
type slice struct {
array unsafe.Pointer // 元素指针
len int // 长度
cap int // 容量
}
2
3
4
5
6
7
8
9
10
11
结构体内部包含底层的数据指针。
Makemap 和 makeslice 返回值的区别,使得当 map 和 slice 作为函数参数时,在函数内部对 map 的操作会影响 map 结构体;而对 slice 操作却不会(注意,这里的不变指的是 slice 结构体自身,slice 底层数组的元素可能会被改变)。
主要原因:前者是指针(*hmap),后者是结构体(slice)。Go 语言中的函数传参都是值传递, 在函数内部,参数会被复制到本地。*hmap 指针复制完之后,仍然指向同一个 map,因此函数内部对 map 的操作会影响实参。而 slice 被复制后,成为一个新的 slice,对它进行的操作不会影响到实参。
# 3. 哈希函数
map 的一个关键点在于哈希函数的选择。在程序启动时,Go 会检测 CPU 是否支持 aes,如果支持, 则使用 aes hash,否则使用 memhash。这在函数 alginit () 中完成,源码位于路径 src/runtime/alg.go 下。
对于 hash 函数,有加密型和非加密型。加密型的一般用于加密数据、数字摘要等,典型代表就是 md5、sha1、sha256、aes256 这类;非加密型的一般就是查找,如 MurmurHash 等。
在 map 的应用场景中,hash 函数用于查找功能。选择 hash 函数主要考察两点:性能、碰撞概率。
表示类型的结构体:
// src/runtime/type.go
type _type struct {
size uintptr
ptrdata uintptr
hash uint32
tflag tflag
align uint8
fieldalign uint8
kind uint8
equal func(unsafe.Pointer, unsafe.Pointer) bool
gcdata *byte
str nameOff
ptrToThis typeOff
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
其中 equal 字段就和哈希相关,equal 函数用于计算两个类型是否 “哈希相等”。
例如,对于 string 类型,它的 equal 函数如下:
// src/runtime/alg.go
func strequal(p, q unsafe.Pointer) bool {
return *(*string)(p) == *(*string)(q)
}
2
3
4
5
根据 key 的类型,_type 结构体的 equal 字段会被设置对应的 equal 函数。
# 4. key 定位过程
Key 经过哈希计算后得到哈希值,共 64 个 bit 位(针对 64 位机,32 位机就不讨论了,现在主流都是 64 位机),但计算它到底要落在哪个 bucket 时,只会用到最后 B 个 bit 位。还记得前面提到过的 B 吗?回顾一下,如果 B = 5,那么桶的数量,也就是 buckets 数组的长度是 2^5 = 32。
例如,现在有一个 key 经过哈希函数计算后,得到的哈希结果是:
10010111 | 000011110110110010001111001010100010010110010101010 | 00110
用最后的 5 个 bit 位,即 00110,转换为十进制,值为 6,也就是 6 号桶。 再取哈希值的高 8 位,找到此 key 在 bucket 中的槽位。最开始因为桶内还没有 key,在遍历完 bucket 中的所有槽位,包括 overflow 的槽位后,找不到相同的 key,因此会被放到第一个槽位。
因为根据后 B 个 bit 位决定 key 落入的 bucket 编号,也就是桶编号,因此肯定会存在冲突。当两个不同的 key 落在同一个桶中,也就是发生了哈希冲突。冲突的解决手段是用链表法:在 bucket 中,从前往后找到第一个空位,放入新加入的有冲突的 key。之后,在查找某个 key 时, 先找到对应的桶,再去遍历 bucket 中所有的 key。
具体的 key 定位过程的示意图如图 3-8 所示。
图 3-8 中,假定 B = 5,所以 bucket 总数就是 2^5 = 32。首先计算出待查找 key 的哈希,使用低 5 位 00110,找到对应的 bucket,也就是 6 号 bucket。使用哈希值的高 8 位 10010111,对 应十进制 151,在 6 号 bucket 中寻找 tophash 值(HOB hash)为 151 的 key,找到了 2 号槽位,这样整个查找过程就结束了。
如果在 bucket 中没找到,并且 overflow 不为空,还要继续去 overflow bucket 中寻找,直到找到或者所有的槽位都找遍了,仍然没有找到。
最后来看下源码,通过汇编分析可知查找某个 key 的底层函数是 mapacess 系列函数,这些函数的作用类似,直接看 mapacess1 函数:
// src/runtime/map.go
func mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
// ……
// 如果 hmap 为空或元素个数为零,返回零值
if h == nil || h.count == 0 {
if t.hashMightPanic() {
t.hasher(key, 0) // see issue 23734
}
return unsafe.Pointer(&zeroVal[0])
}
// 写和读冲突
if h.flags&hashWriting != 0 {
throw("concurrent map read and map write")
}
// 计算哈希值,并且加入 hash0 引入随机性
hash := t.hasher(key, uintptr(h.hash0))
// 比如 B=5,那 m 就是 31,二进制低 5 位是全 1
// 求 bucket num 时,将 hash 与 m 相与
// 达到 bucket num 由 hash 的低 5 位决定的效果
// 即 m := uintptr(1)<<h.B - 1
m := bucketMask(h.B)
// b 就是 bucket 的地址
b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.bucketsize)))
// oldbuckets 不为 nil,说明发生了扩容
if c := h.oldbuckets; c != nil {
// 如果不是同 size 扩容(参考后面扩容的内容)
if !h.sameSizeGrow() {
// 新 bucket 数量是老的 2 倍
m >>= 1
}
// 求出 key 在老的 map 中的 bucket 位置
oldb := (*bmap)(add(c, (hash&m)*uintptr(t.bucketsize)))
// 如果 oldb 没有搬迁到新的 bucket
// 那就在老的 bucket 中寻找
if !evacuated(oldb) {
b = oldb
}
}
// 计算出高 8 位的 hash
// 相当于右移 56 位,只取高 8 位
// top := uint8(hash >> (sys.PtrSize*8 - 8))
// 增加一个 minTopHash
// if top < minTopHash {
//
top += minTopHash
// }
top := tophash(hash)
bucketloop:
for ; b != nil; b = b.overflow(t) { // bucket 找完(还没找到),继续到 overflow bucket 里找
// 遍历 8 个槽位
for i := uintptr(0); i < bucketCnt; i++ {
if b.tophash[i] != top {
if b.tophash[i] == emptyRest {
break bucketloop // tophash 不匹配,且后面也没有槽位有数据,直接 break
}
continue // tophash 不匹配,继续
}
// tophash 匹配,定位到 key 的位置
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
if t.indirectkey() { // key 是指针
k = *((*unsafe.Pointer)(k)) // 解引用
}
if t.key.equal(key, k) { // 如果 key 相等
e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize)) // 定位到 value的位置
if t.indirectelem() {
e = *((*unsafe.Pointer)(e)) // value 解引用
}
return e
}
}
}
return unsafe.Pointer(&zeroVal[0]) // 没找到,返回零值
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
代码整体比较明晰, 没什么难懂的地方, 对照注释比较容易理解。 函数返回 key 对应的 value,如果 hmap 中没有此 key,则返回一个 key 相应类型的零值。
重点看定位 key 和 value 的方法以及整个循环的写法:
// key 定位公式
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
// value 定位公式
e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
2
3
4
5
定位公式中,b 是 bmap 的地址,bmap 即源码里定义的结构体,只包含一个 tophash 数组, 经过编译器扩充之后的结构体会包含 key,value,overflow 这些字段。另外,dataOffset 是 key 相对于 bmap 起始地址的偏移:
dataOffset = unsafe.Offsetof(struct {
b bmap
v int64
}{}.v)
2
3
4
因此 map 里 key 的起始地址就是 unsafe.Pointer (b)+dataOffset。第 i 个 key 的地址就要在此基础上跨过 i 个 key 的大小;并且,value 放在了所有 key 之后,因此第 i 个 value 的地址还需要加上所有 key 的偏移。
再看整个大循环的写法,共有两层循环。最外层是一个循环,通过:
b = b.overflow(t)
遍历所有的 bucket,相当于是一个 bucket 链表。里层循环则是当定位到一个具体的 bucket 时,遍历这个 bucket 里所有的 cell,或者说所有的槽位,当前的实现是 bucketCnt=8 个槽位。整个循环过程如图 3-9 所示。
再看判断一个 bucket 是否搬迁的函数:
// src/runtime/map.go
func evacuated(b *bmap) bool {
h := b.tophash[0]
return h > emptyOne && h < minTopHash
}
2
3
4
5
6
主要判断的依据就是 tophash:当第一个 cell 的 tophash 值(tophash [0])小于 minTopHash 且大于 emptyone 时,表明这个 bucket 里所有的槽位均已被搬迁到新地址。因为这个状态值是放在 tophash 数组里,为了和正常的哈希值区分开,会将所有的 key 计算出来的哈希值加上一个增量:minTopHash。这样就能区分正常的 top hash 值和表示搬迁状态的哈希值。
下面的这几种状态就表征了一个 bucket 里所有的 key/value 的情况:
// src/runtime/map.go
// 空的 cell,也是初始时 bucket 的状态
// 并且此 bucket 的后续 cell 后也都是空的,包括 overflow bucket
emptyRest = 0
// 空的 cell
emptyOne = 1
// key,value 已经搬迁完毕,但是 key 都在新 bucket 前半部分
evacuatedX = 2
// 同上,key 在后半部分
evacuatedY = 3
// 空的 cell,cell 已经被迁移到新的 bucket
evacuatedEmpty = 4
// tophash 的最小正常值
minTopHash = 5
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
对比上面的常量,当 tophash [0] 是 evacuatedEmpty、evacuatedX、evacuatedY 这三个值之一 时,说明此 bucket 中的 key 全部被搬迁到了新 bucket。
# 5. map 的赋值过程是怎样的
通过汇编分析可以得知,向 map 插入或者修改 key,调用的是 mapassign 函数。
实际上插入或修改 key 的语法是一样的,区别是前者操作的 key 在 map 中不存在,而后者操作的 key 存在于 map 中。
实际上,mapassign 有一系列的函数,根据 key 类型的不同,编译器会将其优化为相应的 “快速函数”,见表 3-2。
逻辑都是大同小异,只需要研究最一般的赋值函数 mapassign。 整体来看,流程非常简单:对 key 计算 hash 值,根据 hash 值按照之前的流程,找到要赋值的位置(可能是插入新 key,也可能是更新老 key),在相应位置进行赋值操作。
源码大体和前面 key 定位的过程类似, 核心仍然是一个双层循环:外层遍历 bucket 和 overflow bucket,内层遍历单个 bucket 的所有槽位。限于篇幅,这部分代码的注释就不展开了。
整个赋值过程有几点比较重要的,说明如下:
mapassign 函数首先会检查 map 的标志位 flags。如果 flags 的写标志位被置成 1 了,说明 有其他协程正在执行 “写” 操作,由于 assign 本身也是写操作,因此产生了并发写,直接使程序 panic。这也就说明 map 不是协程安全的。
map 的扩容是渐进式的。如果 map 处在扩容的过程中,那么当定位 key 到了某个 bucket 后,需要 确保这个 bucket 对应的老 bucket 已经完成了迁移过程。即老 bucket 里的 key 都要迁移到新 bucket 中来 (老 bucket 中的 key 会被分散到 2 个新 bucket),才能在新的 bucket 中进行插入或者更新的操作。
只有在完成了迁移操作后,才能安全地在新 bucket 里定位 key 要安置的地址,再进行之后的赋值操作。
现在到了定位 key 应该放置的位置了:准备两个指针,一个(inserti)指向 key 的 hash 值在 tophash 数组所处的位置,另一个(insertk)指向 cell 的位置(也就是 key 最终放置的地址)。当 然,对应 value 的位置就很容易就能计算出来:在 tophash 数组中的索引位置决定了 key 在整个 bucket 中的位置(共 8 个 key),而 value 的位置需要 “跨过” 8 个 key 的长度。
在循环的过程中,inserti 和 insertk 分别指向第一个空的 tophash、第一个空闲的 cell。如果之后在 map 没有找到 key 的存在,也就是说 map 中没有此 key,这意味着插入新 key,而不是更新原有的 key。那最终 key 的安置地址就是第一次发现的空闲的 cell。
如果这个 bucket 的 8 个 key 都已经放置满了,在跳出循环后,会发现 inserti 和 insertk 都为空, 这时需要在 bucket 后面挂上 overflow bucket。当然,也有可能是在 overflow bucket 后面再挂上一个 overflow bucket。这就说明,有太多 key 被哈希到了此 bucket。在这种情况下,正式放置 key 之前, 还要检查 map 的状态,看它是否需要进行扩容,如果满足扩容的条件,就主动触发一次扩容操作。
扩容完成之后,之前的查找定位 key 的过程,还得再重新走一次。因为扩容之后,key 的分布发生了变化。
最后,会更新 map 相关的值,如果是插入新 key,map 的元素数量字段 count 值会加 1;并 且会将 hashWriting 写标志位清零。
另外,前面说的找到 key 的位置,进行赋值操作,实际上并不准确。观察 mapassign 函数的原型就知道,函数并没有传入 value 值,所以赋值操作是什么时候执行的呢?
func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer
答案还得从汇编分析中寻找。实际上,mapassign 函数返回的指针就是指向的 key 所对应的 value 值的位置,有了地址,就很好操作赋值了。
# 6. map 的删除过程是怎样的
删除操作底层的执行函数是 mapdelete:
func mapdelete(t *maptype, h *hmap, key unsafe.Pointer)
根据 key 类型的不同,删除操作会被优化成更具体的函数,见表 3-3。
当然,只需关心 mapdelete 函数。它首先会检查 h.flags 标志,如果发现写标志位是 1,直接 panic,因为这表明有其他协程同时在进行写操作。大致的逻辑如下:
1)检测是否存在并发写操作。
2)计算 key 的哈希,找到落入的 bucket。
3)设置写标志位。
4)检查此 map 是否正在扩容的过程中,如果是则直接触发一次搬迁操作。
5)两层循环,核心是找到 key 的具体位置。寻找过程都是类似的,在 bucket 中挨个 cell 寻找。
6)找到对应位置后,对 key 或者 value 进行 “清零” 操作。
7)将 count 值减 1,将对应位置的 tophash 值置成 emptyOne。
8)最后,检测此槽位后面是否都是空,若是将 tophash 改成 emptyRest。
9)若前一步成功,则继续向前扩大战果:将此 cell 之前的 tophash 值为 emptyOne 的槽位都置成 emptyRest。
# 7. map 的扩容过程是怎样的
使用哈希表的目的就是要快速查找到目标 key,然而,随着向 map 中添加的 key 越来越多, key 发生哈希碰撞的概率也越来越大。bucket 中的 8 个 cell 会被逐渐塞满,查找、插入、删除 key 的效率也会越来越低。最理想的情况是一个 bucket 只装一个 key,这样,就能达到 O (1) 的效 率,但这样空间消耗太大,用空间换时间的代价太高。
Go 语言中一个 bucket 装载 8 个 key,所以在定位到某个 bucket 后,还需要再定位到具体的槽位,这实际上又用了时间换空间。
当然,这样做,要有一个度,不然所有的 key 都落在了同一个 bucket 里,直接退化成了链表,各种操作的效率直接降为 O (n),也是不行的。
因此,需要有一个指标来衡量前面描述的情况,这就是装载因子。Go 源码里这样定义装载因子:
loadFactor := count / (2^B)
公式中的 count 就是 map 里的元素个数,2^B 表示总的 bucket 数量。
在向 map 插入新 key 时,会进行条件检测,符合下面这两个条件,就会触发扩容:
1)装载因子超过阈值(源码里定义的阈值是 6.5)。
2)overflow 的 bucket 数量过多:当 B<15, 也就是 bucket 总数 2^B 小于 2^15 时, overflow 的 bucket 数量超过 2^B;当 B>= 15, 也就是 bucket 总数 2^B 大于等于 2^15, overflow 的 bucket 数量超过 2^15。
通过汇编分析可以找到赋值操作对应源码中的函数是 mapassign,对应扩容条件的源码如下:
// src/runtime/map.go
// 触发扩容时机
if !h.growing() && (overLoadFactor(int64(h.count+1), h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
hashGrow(t, h)
}
// 装载因子超过 6.5
func overLoadFactor(count int64, B uint8) bool {
return count >= bucketCnt && float32(count) >= loadFactor*float32((uint64(1)<<B))
}
// overflow buckets 太多
func tooManyOverflowBuckets(noverflow uint16, B uint8) bool {
if B > 15 {
B = 15
}
return noverflow >= uint16(1)<<(B&15)
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
具体解释一下触发扩容的两个条件:
第 1 点:每个 bucket 有 8 个空位,在没有溢出,且所有的桶都装满了的情况下,装载因子算出来的结果是 8。因此当装载因子超过 6.5 时,表明很多 bucket 都快要装满了,查找、插入、删除效率都变低了,在这个时候进行扩容是有必要的。
第 2 点:是对第 1 点的补充。就是说在装载因子比较小的情况下,这时候 map 的操作效率也很低,而第 1 点却识别不出来。表面现象就是计算装载因子的分子比较小,即 map 里元素总数少,但是 bucket 数量多(真实分配的 bucket 数量多,包括大量的 overflow bucket)。
造成这种情况的原因是:不停地插入、删除元素。先插入很多元素,导致创建了很多 bucket, 但是装载因子达不到第 1 点的临界值,没有触发扩容来缓解这种情况。之后,不断删除元素减小元素总数量,再插入很多元素,导致创建了很多的 overflow bucket,但就是不会触犯第 1 点的规定,因为 overflow bucket 数量太多,使得 key 会很分散,查找插入效率低得吓人,因此用第 2 点规定进行缓解。这就像是一座空城,房子很多,但是住户很少,住得很分散,找起人来很困难。
当满足了条件 1、2 的限制时,就会发生扩容。但是二者扩容的策略并不相同,毕竟两个条件应对的场景不同。
对于条件 1,元素太多,而 bucket 数量太少,策略很简单:将 B 加 1,bucket 总数(2^B) 直接变成原来的 2 倍。于是,就有了新老 bucket 。注意,这时候元素都在老 bucket 里,还没迁移到新的 bucket 来。而且,新 bucket 只是最大数量变为原来最大数量(2^B)的 2 倍(2^B * 2)。
对于条件 2,其实元素并没那么多,但是 overflow bucket 数特别多,说明很多 bucket 都没装满。解决办法就是开辟一个新的 bucket 空间,将老 bucket 中的元素移动到新 bucket,使得同一个 bucket 中的 key 排列地更紧密。 这样,原来,在 overflow bucket 中的 key 可以移动到新 bucket 中来。结果是节省空间,提高 bucket 利用率,map 的操作效率自然就会提升。
对于条件 2 的解决方案,还有一个极端的情况:如果插入 map 的 key 的哈希值都一样,就会落到同一个 bucket 里,超过 8 个就会产生 overflow bucket,结果也会造成 overflow bucket 数过多。 移动元素其实解决不了问题,因为这时整个哈希表已经退化成了一个链表,操作效率变成了 O (n)。
再来看一下扩容具体是怎么做的。由于 map 扩容需要将原有的 key/value 重新搬迁到新的内存地址,如果有大量的 key/value 需要搬迁,会非常影响性能。因此 Go map 的扩容采取了一种称为 “渐进式” 地方式,原有的 key 并不会一次性搬迁完毕,每次最多只会搬迁 2 个 bucket。
实际上,hashGrow () 函数并没有真正地进行 “搬迁”,它只是分配好了新的 buckets,并将老的 buckets 加载到 oldbuckets 字段上。真正搬迁 buckets 的动作在 growWork () 函数中,而调用 growWork () 函数的动作是在 mapassign 和 mapdelete 函数中。也就是在插入、修改、删除 key 的时候,都会先检查 oldbuckets 是否搬迁完毕,具体来说就是检查 oldbuckets 是否为 nil,再尝试进行搬迁 buckets 的工作。
先看 hashGrow () 函数所做的工作,再来看具体的搬迁 buckets 是如何进行的:
// src/runtime/map.go
func hashGrow(t *maptype, h *hmap) {
// B+1 相当于是原来 2 倍的空间
bigger := uint8(1)
if !overLoadFactor(h.count+1, h.B) { // 进行等量的内存扩容,所以 B 不变
bigger = 0
h.flags |= sameSizeGrow
}
oldbuckets := h.buckets // 将老 buckets 挂到 oldbuckets 上
newbuckets, nextOverflow := makeBucketArray(t, h.B+bigger, nil) // 申请新的 buckets 空间
flags := h.flags &^ (iterator | oldIterator)
if h.flags&iterator != 0 {
flags |= oldIterator
}
// 提交 grow 的动作
h.B += bigger
h.flags = flags
h.oldbuckets = oldbuckets
h.buckets = newbuckets
h.nevacuate = 0 // 搬迁进度为 0
h.noverflow = 0 // overflow buckets 数为 0
// ……
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
可以看到,hashGrow 函数主要的工作是申请到了新的 buckets 空间,把相关的标志位都进行了处理:如果是同等 size 扩容,设置 h.flags 为 sameSizeGrow;标志 nevacuate 被置为 0,表示当前搬迁进度为 0。
值得一说的是对 h.flags 的处理:
flags := h.flags &^ (iterator | oldIterator)
if h.flags&iterator != 0 {
flags |= oldIterator
}
2
3
4
这里得先说下运算符:&^,称为按位置 0 运算符。例如:
x = 01010011
y = 01010100
z = x &^ y = 00000011
2
3
如果 y 的 bit 位为 1,那么结果 z 对应 bit 位就为 0;否则 z 对应 bit 位就和 x 对应 bit 位的值相同。
所以上面那段对 h.flags 的操作的意思是:先把 h.flags 中 iterator 和 oldIterator 对应位清 0, 然后如果发现 iterator 位为 1,那就把它转接到 oldIterator 位,使得 oldIterator 对应位变成 1。潜台词就是:buckets 现在挂到了 oldBuckets 名下了,对应的标志位也转接过去。
几个标志位如下:
// src/runtime/map.go
// 可能有迭代器使用 buckets
iterator = 1
// 可能有迭代器使用 oldbuckets
oldIterator = 2
// 有协程正在向 map 中写入 key
hashWriting = 4
// 等量扩容(对应条件 2)
sameSizeGrow = 8
2
3
4
5
6
7
8
9
10
11
12
13
可能前面这段仍然不好理解, iterator 实际上就是 01, 而 oldIterator 就是 10, (iterator | oldIterator) 的结果就是 11。所以:
flags := h.flags &^ (iterator | oldIterator)
相当于将 h.flags 的低两位清零,表示没有迭代器正在使用 buckets,接着 :
if h.flags&iterator != 0 {
flags |= oldIterator
}
2
3
如果 h.flags 的最低位为 1,表示现在有迭代器正在使用 buckets。因为 map 马上要进行搬迁了,之后 buckets 就挂到 oldBuckets 名下了,因此需要将 flags 的倒数第 2 位置为 1。表明有迭代器正在使用 oldBuckets。
上面一段看的云里雾里,但是不影响看下面的逻辑。
再来看看真正执行搬迁工作的 growWork () 函数。
// src/runtime/map.go
func growWork(t *maptype, h *hmap, bucket uintptr) {
// 确认搬迁老的 bucket 对应正在使用的 bucket
evacuate(t, h, bucket&h.oldbucketmask())
// 再搬迁一个 bucket,以加快搬迁进程
if h.growing() {
evacuate(t, h, h.nevacuate)
}
}
2
3
4
5
6
7
8
9
10
函数 h.growing () 非常简单:
// src/runtime/map.go
func (h *hmap) growing() bool {
return h.oldbuckets != nil
}
2
3
4
5
如果 oldbuckets 不为空,说明还没有搬迁完毕,还得继续搬。
表达式 bucket&h.oldbucketmask () 的作用,如源码里注释的,是为了确认搬迁的 bucket 是正在使用的 bucket。oldbucketmask () 函数返回扩容前的 map 的 bucketmask。
所谓的 bucketmask,作用是将它与哈希值相与,得到的结果就是 key 应该落入的桶编号。比如 B = 5,那么 bucketmask 的低 5 位是 11111,其余位都是 0,hash 值与其相与的意思是,只用 hash 值的低 5 位决策 key 到底落入哪个 bucket。
接下来,关注搬迁的关键函数 evacuate。源码如下:
// src/runtime/map.go
func evacuate(t *maptype, h *hmap, oldbucket uintptr) {
// 定位老的 bucket 地址
b := (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)))
newbit := h.noldbuckets() // 结果是 2^B,假如 B = 5,结果为 32
if !evacuated(b) { // 如果 b 没有被搬迁过
var xy [2]evacDst // 表示 bucket 移动的目标地址
x := &xy[0]
// 默认是等 size 扩容,前后 bucket 序号不变。使用 x 来进行搬迁
x.b = (*bmap)(add(h.buckets, oldbucket*uintptr(t.bucketsize)))
x.k = add(unsafe.Pointer(x.b), dataOffset)
x.e = add(x.k, bucketCnt*uintptr(t.keysize))
// 如果不等 size 扩容,前后 bucket 序号有变
// 使用 y 来进行搬迁
if !h.sameSizeGrow() {
y := &xy[1]
// y 代表的 bucket 序号增加了 2^B
y.b = (*bmap)(add(h.buckets, (oldbucket+newbit)*uintptr(t.bucketsize)))
y.k = add(unsafe.Pointer(y.b), dataOffset)
y.e = add(y.k, bucketCnt*uintptr(t.keysize))
}
// 遍历所有的 bucket,包括 overflow buckets
// b 是老的 bucket 地址
for ; b != nil; b = b.overflow(t) {
k := add(unsafe.Pointer(b), dataOffset)
e := add(k, bucketCnt*uintptr(t.keysize))
// 遍历 bucket 中的所有 cell
for i := 0; i < bucketCnt; i, k, e = i+1, add(k, uintptr(t.keysize)), add(e, uintptr(t.elemsize)) {
top := b.tophash[i] // 当前 cell 的 top hash 值
if isEmpty(top) { // 如果 cell 为空,即没有保存 key
b.tophash[i] = evacuatedEmpty // 那就标志它被"搬迁"过
continue // 继续下个 cell
}
// 正常不会出现这种情况
// 未被搬迁的 cell 只可能是 empty 或是
// 正常的 top hash(大于 minTopHash)
if top < minTopHash {
throw("bad map state")
}
k2 := k
if t.indirectkey() {
k2 = *((*unsafe.Pointer)(k2)) // 如果 key 是指针,则解除引用
}
var useY uint8
if !h.sameSizeGrow() { // 如果不是等量扩容
// 计算 hash 值,和 key 第一次写入时一样
hash := t.hasher(k2, uintptr(h.hash0))
// 如果有协程正在遍历 map
// 并且如果出现相同的 key 值,算出来的 hash 值不同
// 只有在 float 变量的 NaN() 情况下会出现
if h.flags&iterator != 0 && !t.reflexivekey() && !t.key.equal(k2, k2) {
useY = top & 1 // 由 tophash 最低位决定
top = tophash(hash) // 取高 8 位作为 top hash 值
} else { // 一般情况下
if hash&newbit != 0 { // 取决于新哈希值的 oldB+1 位是 0 还是 1
useY = 1 // 为 1
}
}
}
if evacuatedX+1 != evacuatedY || evacuatedX^1 != evacuatedY {
throw("bad evacuatedN")
}
b.tophash[i] = evacuatedX + useY // evacuatedX + 1 == evacuatedY
dst := &xy[useY]
// evacuation destination
if dst.i == bucketCnt { // 如果 dst.i 等于 8,说明要溢出了
dst.b = h.newoverflow(t, dst.b) // 新建一个 bucket
dst.i = 0 // 从 0 开始计数
dst.k = add(unsafe.Pointer(dst.b), dataOffset) // 表示 key 要移动到的位置
dst.e = add(dst.k, bucketCnt*uintptr(t.keysize)) // 表示 value 要移动到的位置
}
dst.b.tophash[dst.i&(bucketCnt-1)] = top // 设置 top hash 值
if t.indirectkey() { // key 是指针
*(*unsafe.Pointer)(dst.k) = k2 // 将原 key(是指针)复制到新位置
} else {
typedmemmove(t.key, dst.k, k) // 将原 key(是值)复制到新位置
}
if t.indirectelem() { // value 是指针,操作同 key
*(*unsafe.Pointer)(dst.e) = *(*unsafe.Pointer)(e)
} else {
typedmemmove(t.elem, dst.e, e)
}
dst.i++ // 定位到下一个 cell
dst.k = add(dst.k, uintptr(t.keysize))
dst.e = add(dst.e, uintptr(t.elemsize))
}
}
// 如果没有协程在使用老的 buckets,就把老 buckets 清除掉,帮助 GC
if h.flags&oldIterator == 0 && t.bucket.ptrdata != 0 {
b := add(h.oldbuckets, oldbucket*uintptr(t.bucketsize))
// 只清除 bucket 的 key,value 部分,保留 top hash 部分,指示搬迁状态
ptr := add(b, dataOffset)
n := uintptr(t.bucketsize) - dataOffset
memclrHasPointers(ptr, n)
}
}
// 更新搬迁进度
if oldbucket == h.nevacuate { // 如果此次搬迁的 bucket 等于当前进度
advanceEvacuationMark(h, t, newbit)
}
}
func advanceEvacuationMark(h *hmap, t *maptype, newbit uintptr) {
h.nevacuate++ // 进度加 1
stop := h.nevacuate + 1024 // 尝试往后看 1024 个 bucket
if stop > newbit {
stop = newbit
}
// 寻找没有搬迁的 bucket
for h.nevacuate != stop && bucketEvacuated(t, h, h.nevacuate) {
h.nevacuate++
}
// 现在 h.nevacuate 之前的 bucket 都被搬迁完毕
// 所有的 buckets 搬迁完毕
if h.nevacuate == newbit {
h.oldbuckets = nil // 清除老的 buckets
// 清除老的 overflow bucket
if h.extra != nil {
h.extra.oldoverflow = nil
}
h.flags &^= sameSizeGrow // 清除正在扩容的标志位
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
函数 evacuate () 的代码注释非常清晰,对着代码和注释很容易看懂整个的搬迁过程。
搬迁的目的是将老的 buckets 搬迁到新的 buckets。而通过前面的说明已经知道,应对条件 1,新的 buckets 数量是之前的一倍;应对条件 2,新的 buckets 数量和之前相等。
对于条件 2,从老的 buckets 搬迁到新的 buckets,由于 bucktes 数量不变,因此可以按序号来搬,比如 key 原来在 0 号 bucktes,到新的地方后,仍然放在 0 号 buckets。
对于条件 1,就没这么简单了。要重新计算 key 的哈希,才能决定它到底落在哪个 bucket。 例如,原来 B = 5,计算出 key 的哈希后,只用看它的低 5 位,就能决定它落在哪个 bucket。扩容后, B 变成了 6, 因此需要多看一位, 哈希值的低 6 位决定 key 落在哪个 bucket。 这称为 map rehash,如图 3-10 所示。
因此, 某个 key 在搬迁前后落入的 bucket 序号可能和原来相等, 也可能是相比原来加上 2^B,取决于 hash 值的第 (B+1) bit 位是 0 还是 1。
如果扩容后,B 增加了 1,意味着 buckets 总数是原来的 2 倍,原来一个桶将 “裂变” 到两个桶。
例如, 原始 B = 2, 1 号 bucket 中有 2 个 key 的哈希值低 3 位分别为:010,110。由于原来 B = 2,所以低 2 位都是 10 决定它们落在 2 号桶, 现在 B 变成 3, 所以 010、110 分别落入 2、6 号桶,如图 3-11 所示。
先理解这个过程,后面讲 map 迭代的时候会用到。
再来讲搬迁函数中的几个关键点:
函数 evacuate () 每次只完成一个 bucket 的搬迁工作,需要遍历完此 bucket 的所有的 cell,将有值的 cell 复制到新的地方。并且 bucket 还会连接 overflow bucket,它们同样需要搬迁。因此需要两层循环,外层遍历 bucket 和 overflow bucket,内层遍历 bucket 的所有 cell。类似的循环在 map 的源码里到处都是,要理解透彻。
源码里有一个 bool 型变量,useY,其实代表的是落入 X, Y part 的哪个 part。前面说过,如果是扩容后 B 增加了 1,那么桶的数量是原来的 2 倍。前一半桶被称为 X part,后一半桶被称为 Y part。一个 bucket 中的 key 可能会分裂到两个桶,一个位于 X part,一个位于 Y part。所以在搬迁一个 cell 之前,需要知道这个 cell 中的每个 key 具体落到哪个 part。当然也很简单,重新计算 cell 中 key 的 hash,并向前 “多看” 一位,再决定落入哪个 part,这个前面也说得很详细了。
有一个特殊情况是:有一种 key,每次对它计算 hash,得到的结果都不一样。这个 key 就是 math.NaN () 的结果,它的含义是 not a number,类型是 float64。当它作为 map 的 key,在搬迁的时候,会遇到一个问题:再次计算它的哈希值和它当初插入 map 时的计算出来的哈希值不一样。
读者可能想到了,这样带来的一个后果是,这个 key 是永远不会被 Get 操作获取到的。当使用 m [math.NaN ()] 语句的时候,是查不出来结果的。这个 key 只有在遍历整个 map 的时候,才有机会 “现身”。所以,可以向一个 map 插入任意数量的 math.NaN () 作为 key。
当搬迁碰到 math.NaN () 的 key 时,只通过 tophash 的最低位决定分配到 X part 还是 Y part(如果扩容后是原来 buckets 数量的 2 倍)。如果 tophash 的最低位是 0,分配到 X part;如果是 1, 则分配到 Y part。
其实这样的 key 随便搬迁到哪个 bucket 都行,当然,还是要搬迁到裂变后对应的两个 bucket 中去。当然现在这样做是有好处的,在后面讲 map 迭代的时候会再详细解释,暂时只需要知道是这样分配的就行。
确定了要搬迁到的目标 bucket 后,搬迁操作就比较好进行了。
1)将源 key/value 值复制到目的地相应的位置。
2)设置 key 在原始 buckets 的 tophash 为 evacuatedX 或是 evacuatedY,表示已经搬迁到了新 map 的 x part 或是 y part。
3)新 map 的 tophash 取 key 哈希值的高 8 位。
下面来宏观地看一下扩容前后的变化,不妨假设扩容前 B 等于 2。
扩容前,B = 2,共有 4 个 buckets,lowbits 表示 hash 值的低位。假设不关注其他 buckets 情况,只专注在 2 号 bucket。并且假设 overflow bucket 太多,触发了等量扩容(对应于前面的条件 2),如图 3-12 所示。
扩容完成后,overflow bucket 消失了,key 都集中到了一个 bucket 中,更紧凑了,提高了查找的效率,如图 3-13 所示。
假设触发了 2 倍的扩容,那么扩容完成后,老 buckets 中的 key 就分裂出了 2 个新的 bucket。 一个在 X part,一个在 Y part。依据是 hash 的 lowbits。新 map 中 0-3 bucket 称为 X part,4-7 bucket 称为 Y part,如图 3-14 所示。
注意,上面的两张图忽略了其他 buckets 的搬迁情况,并且只展示了所有的 bucket 都搬迁完毕后的情形。实际上,搬迁是一个 “渐进” 的过程,并不会一次就全部搬迁完毕。所以在搬迁过程中,oldbuckets 指针还会指向原来老的 [] bmap,并且已经搬迁完毕的 key 的 tophash 值会是一个状态值,表示 key 的搬迁去向。
# 8. map 的遍历过程是怎样的
本来 map 的遍历过程比较简单:遍历所有的 bucket 以及它后面挂的 overflow bucket,同时挨个遍历 bucket 中的所有 cell。每个 bucket 中包含 8 个 cell,从 cell 中取出 key 和 value,遍历过程就完成了。
但是,现实并没有这么简单。还记得前面讲过的扩容过程吗?扩容过程不是一个原子的操作, 它每次最多只搬运 2 个 bucket,所以如果触发了扩容操作,那么在很长时间里,map 的状态都是 处于一个中间态:有些 bucket 已经搬迁到 “新家”,而有些 bucket 还待在 “老家”。
因此,遍历如果发生在扩容的过程中,就会涉及遍历新老 bucket 的过程,这是难点所在。
先看一个简单的代码样例,假装不知道遍历过程具体调用的是什么函数:
package main
func main() {
ageMp := make(map[string]int)
ageMp["qcrao"] = 18
for name, age := range ageMp {
fmt.Println(name, age)
}
}
2
3
4
5
6
7
8
执行命令:
go tool compile -S main.go
得到汇编指令,关键的几行代码如下:
// ......
0x0124 00292 (test16.go:9) CALL runtime.mapiterinit(SB)
// ......
0x01fb 00507 (test16.go:9) CALL runtime.mapiternext(SB)
0x0200 00512 (test16.go:9) MOVQ ""..autotmp_4+160(SP), AX
0x0208 00520 (test16.go:9) TESTQ AX, AX
0x020b 00523 (test16.go:9) JNE 302
// ......
2
3
4
5
6
7
8
9
10
关于 map 如何迭代,底层的函数调用关系就非常清楚了。先是调用 mapiterinit 函数初始化迭代器, 然后循环调用 mapiternext 函数进行 map 遍历,如图 3-15 所示。
迭代器的结构体定义:
// src/runtime/map.go
type hiter struct {
key unsafe.Pointer // key 指针
elem unsafe.Pointer // value 指针
t *maptype // map 类型,包含如 key size 大小等
h *hmap // map header
buckets unsafe.Pointer // 初始化时指向的 bucket
bptr *bmap // 当前遍历到的 bmap
overflow *[]*bmap // keeps overflow buckets of hmap.buckets alive
oldoverflow *[]*bmap // keeps overflow buckets of hmap.oldbuckets alive
startBucket uintptr // 起始遍历的 bucet 编号
offset uint8 // 遍历开始时 cell 的编号(每个 bucket 中有 8 个 cell)
wrapped bool // 是否从头遍历了
B uint8 // B 的大小
i uint8 // 指示当前 cell 序号
bucket uintptr // 指向当前的 bucket
checkBucket uintptr // 因为扩容,需要检查的 bucket
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
函数 mapiterinit () 就是对 hiter 结构体里的字段进行初始化赋值操作。
新手很可能会掉的一个坑是认为 Go map 的遍历顺序是有序的,实际上即使是对一个写死的 map 进行遍历,每次出来的结果也是无序的。下面就可以 “近距离地” 观察这个特性。
// src/runtime/map.go
// 生成随机数 r
r := uintptr(fastrand())
if h.B > 31-bucketCntBits {
r += uintptr(fastrand()) << 31
}
// 从哪个 bucket 开始遍历
it.startBucket = r & (uintptr(1)<<h.B - 1)
// 从 bucket 的哪个 cell 开始遍历
it.offset = uint8(r >> h.B & (bucketCnt - 1))
2
3
4
5
6
7
8
9
10
11
例如,假设 B = 2,那 uintptr(1)<<h.B - 1
的结果就是 3,低 8 位为 0000 0011,将 r 与之相与,就可以得到一个 0~3 的起始 bucket 序号;同理,bucketCnt - 1 等于 7, 低 8 位为 0000 0111,将 r 右移 2 位后,与 7 相与,就可以得到一个 0~7 号的起始 cell 序号。
0000 0011 与任何数字相与,因为前 6 位始终为 0,只有后两位变化,所以只能是 0~3 变化,同理,和 0000 0111 相与,只有后三位变化,也只能是 0~7
于是,在 mapiternext 函数中就会从 it.startBucket 的 it.offset 号的 cell 开始遍历,取出其中 的 key 和 value,直到又回到起点 bucket,完成遍历过程。
源码部分比较好看懂,尤其是理解了前面注释的几段代码后,再看这部分代码就没什么压力了。所以,接下来将通过图形化的方式讲解整个遍历过程。
假设有如下图所示的一个 map,起始时 B = 1,有两个 bucket,后来触发了扩容(这里不用深究是否符合扩容条件, 只是一个设定), B 变成 2。 并且, 1 号 bucket 中的内容搬迁到了新的 bucket,1 号裂变成 1 号和 3 号;0 号 bucket 暂未搬迁。老的 bucket 挂在 *oldbuckets 指针上面,新的 bucket 则挂在 *buckets 指针上面,如图 3-16 所示。
这时,对此 map 进行遍历。假设经过初始化后,startBucket = 3,offset = 2。于是,遍历的起点就是 3 号 bucket 的 2 号 cell,图 3-17 就是开始遍历时的状态:
标红的方框表示起始位置,bucket 遍历顺序为:3 -> 0 -> 1 -> 2。
因为 3 号 bucket 对应老的 1 号 bucket,因此先检查老 1 号 bucket 是否已经被搬迁过。判断方法就是:
// src/runtime/map.go
func evacuated(b *bmap) bool {
h := b.tophash[0]
return h > emptyOne && h < minTopHash
}
2
3
4
5
6
如果 b.tophash [0] 的值在标志值范围内,即在 [2,4] 区间里,说明已经被搬迁过了。
在本例中,老 1 号 bucket 已经被搬迁过了。所以它的 tophash [0] 值在 [2,4] 范围内,因此只需遍历新的 3 号 bucket。
依次遍历 3 号 bucket 的 cell, 这时候会找到第一个非空的 key:元素 e。 到这里, mapiternext 函数返回,这时遍历结果仅有一个元素,如图 3-18 所示。
由于返回的 key 不为空,所以会继续调用 mapiternext 函数。继续从上次遍历到的地方往后遍历,从新 3 号 overflow bucket 中找到了元素 f 和元素 g。遍历结果集也因此扩大,如图 3-19 所示。
新 3 号 bucket 遍历完之后,回到了新 0 号 bucket。0 号 bucket 对应老的 0 号 bucket,经检查,老 0 号 bucket 并未搬迁,因此对新 0 号 bucket 的遍历就改为遍历老 0 号 bucket。那是不是把老 0 号 bucket 中的所有 key 都取出来呢?
并没有这么简单,回忆一下,老 0 号 bucket 在搬迁后将裂变成 2 个 bucket:新 0 号、新 2 号。而此时正在遍历的只是新 0 号 bucket(注意,遍历都是遍历的 * bucket 指针,也就是所谓的新 buckets)。所以,只会取出老 0 号 bucket 中那些在裂变之后,分配到新 0 号 bucket 中的那些 key。
因此,lowbits 等于 00 的 key 将进入遍历结果集,如图 3-20 所示。
这里需要说明的是,每次都会从一个 bucket 的固定序号的 cell 开始遍历,因此加入结果集中的顺序是:c,b。
和之前的流程一样,继续遍历新 1 号 bucket,发现老 1 号 bucket 已经搬迁,只用遍历新 1 号 bucket 中现有的元素就可以了。结果集如图 3-21 所示。
继续遍历新 2 号 bucket,它来自老 0 号 bucket,因此需要在老 0 号 bucket 中那些会裂变到新 2 号 bucket 中的 key,也就是 lowbit 等于 10 的那些 key。
这样,遍历结果集如图 3-22 所示。
最后,继续遍历到新 3 号 bucket 时,发现所有的 bucket 都已经遍历完毕,整个遍历过程执行完毕。
顺便说一下,如果碰到 key 是 math.NaN () 这种的,处理方式是类似的。核心还是要看它被分裂后具体落入哪个 bucket。只不过只用看它 top hash 的最低位。如果 top hash 的最低位是 0 ,分配到 X part;如果是 1 ,则分配到 Y part。据此决定是否取出 key,放到遍历结果集里。
综上,map 遍历的核心在于理解 2 倍扩容时,老 bucket 会分裂到 2 个新 bucket 中去。而遍历操作,会按照新 bucket 的序号顺序进行,碰到老 bucket 未搬迁的情况时,要在老 bucket 中 找到将来要搬迁到新 bucket 来的 key。
# map 中的 key 为什么是无序的
随着 map 中 key 数量的增多,原有的空间无法保证高效地进行增删查改的操作时就会发生扩容。而 map 在扩容时,会发生 key 的搬迁,原来落在同一个 bucket 中的 key,搬迁后,有些 key 会搬迁到其他 bucket,bucket 序号加上了 2^B。
而遍历的过程,则是按顺序遍历 bucket,同时按顺序遍历 bucket 中的 cell。搬迁后,key 的位置发生了重大的变化, 有些 key 移到了其他的 bucket, 有些 key 则 “原地不动”(仅仅指 bucket 序号不变)。这样,再次遍历 map 的结果就不可能仍然按原来的顺序了。
如果整个程序在运行过程中就只有一个 hard code 的 map,即 map 里的 key 都是写死的,也不会进行插入、删除的操作,按理说每次遍历这样的 map 都会返回一个固定顺序的 key/value 序列吧?理论上可以这样,但是 Go 杜绝了这种做法,因为这样会给新手程序员带来误解,以为这是一定会发生的事情,在某些情况下,可能会酿成大错。
实际上,在 Go 的实现中,当遍历 map 时,并不是固定地从 0 号 bucket 开始遍历,而是每次都从一个随机序号的 bucket 开始,并且从这个 bucket 的一个随机序号的 cell 开始遍历。这 样,即使是一个写死的 map,仅仅只是遍历它,也不太可能会返回一个固定序列的 key/value 对。
另外,“遍历 map 的结果是无序的” 这个特性是从 Go 1.0 开始加入的。
两种情况导致了 key 是无序的,一、扩容导致的,二、遍历的时候随机数导致的。
# map 是线程安全的吗
从 map 的源码上看,map 不是线程安全的。
在查找、赋值、遍历、删除的过程中都会检测写标志,一旦发现写标志置位(等于 1),则直接 panic。 赋值和删除函数在检测完写标志是复位状态(等于 0)之后, 先将写标志位置位(置为 1),才会进行之后的操作。
检测写标志:
if h.flags&hashWriting == 0 {
throw("concurrent map writes")
}
2
3
设置写标志:
h.flags |= hashWriting
写标志其实就是低三位为 100:
hashWriting = 4 // a goroutine is writing to the map
其实就是位或运算
# float 类型可以作为 map 的 key 吗
从语法上看,是可以的:Go 语言中只要是可比较的类型都可以作为 key。除了 slice、map、 functions 这几种类型,其他类型都可以作为 map 的 key。具体包括:布尔值、数字、字符串、指 针、通道、接口类型、结构体、只包含上述类型的数组。这些类型的共同特征是支持 == 和!= 操 作符,当 k1 == k2 时,可认为 k1 和 k2 是同一个 key。如果是结构体,只有 hash 后的值相等以及字面值相等,才被认为是相同的 key。
顺便说一句,任何类型都可以作为 value,包括 map 类型。
来看个例子:
func main() {
m := make(map[float64]int)
m[1.4] = 1
m[2.4] = 2
m[math.NaN()] = 3
m[math.NaN()] = 3
for k, v := range m {
fmt.Printf("[%v, %d] ", k, v)
}
fmt.Printf("\nk: %v, v: %d\n", math.NaN(), m[math.NaN()])
fmt.Printf("k: %v, v: %d\n", 2.400000000001, m[2.400000000001])
fmt.Printf("k: %v, v: %d\n", 2.4000000000000000000000001, m[2.4000000000000000000000001])
fmt.Println(math.NaN() == math.NaN())
}
2
3
4
5
6
7
8
9
10
11
12
13
14
程序的输出:
[2.4, 2] [NaN, 3] [NaN, 3] [1.4, 1]
k: NaN, v: 0
k: 2.400000000001, v: 0
k: 2.4, v: 2
false
2
3
4
5
例子中定义了一个 key 类型是 float64 的 map, 并向其中插入了 4 个 key:1.4, 2.4, NAN,NAN。
遍历 map 打印的时候也打印出了 4 个 key,读者可能感到疑惑,map 里的 key 怎么会有重复的呢?前面提到过 NAN != NAN,因为他们比较的结果不相等,自然在 map 看来就是两个不同的 key 了。
接着,查询了几个 key,发现 NAN 不存在,2.400000000001 也不存在,而 2.4000000000000000000000001 却存在。
有点诡异,不是吗?通过汇编分析发现了如下的事实:当用 float64 作为 key 的时候,先要将其转成 unit64 类型,再插入 key 中。
具体通过 Float64frombits 函数完成:
func Float64frombits(b uint64) float64 { return *(*float64)(unsafe.Pointer(&b)) }
也就是将浮点数表示成 IEEE 754 规定的格式。如赋值语句对应的汇编代码:
0x00bd 00189 (test18.go:9) LEAQ "".statictmp_0(SB), DX
0x00c4 00196 (test18.go:9) MOVQ DX, 16(SP)
0x00c9 00201 (test18.go:9) PCDATA $0, $2
0x00c9 00201 (test18.go:9) CALL runtime.mapassign(SB)
2
3
4
"".statictmp_0 (SB) 变量是这样的:
"".statictmp_0 SRODATA size=8
0x0000 33 33 33 33 33 33 03 40
"".statictmp_1 SRODATA size=8
0x0000 ff 3b 33 33 33 33 03 40
"".statictmp_2 SRODATA size=8
0x0000 33 33 33 33 33 33 03 40
2
3
4
5
6
将 float 64 转成十六进制:
package main
import (
"fmt"
"math"
)
func main() {
m := make(map[float64]int)
m[2.4] = 2
fmt.Println(math.Float64bits(2.4))
fmt.Println(math.Float64bits(2.400000000001))
fmt.Println(math.Float64bits(2.4000000000000000000000001))
}
2
3
4
5
6
7
8
9
10
11
12
13
14
程序输出:
4612586738352862003
4612586738352864255
4612586738352862003
2
3
转成十六进制为:
0x4003333333333333
0x4003333333333BFF
0x4003333333333333
2
3
和前面的 "".statictmp_0 比较一下,很清晰了吧。2.4 和 2.4000000000000000000000001 经过 math.Float64bits () 函数转换后的结果是一样的。因此,这二者在 map 看来,就是同一个 key 了。
再来看一下 NAN(not a number):
func NaN() float64 { return Float64frombits(uvnan) }
uvan 的定义为:
uvnan = 0x7FF8000000000001
NAN () 直接调用 Float64frombits,传入写死的 const 型变量 0x7FF8000000000001,得到 NAN 型值。既然 NAN 是从一个常量解析得来的,为什么插入 map 时,会被认为是不同的 key?
这其实是由类型的哈希函数决定的,例如,对于 64 位的浮点数,它的哈希函数如下:
// src/runtime/alg.go
func f64hash(p unsafe.Pointer, h uintptr) uintptr {
f := *(*float64)(p)
switch {
case f == 0:
return c1 * (c0 ^ h) // +0, -0
case f != f:
return c1 * (c0 ^ h ^ uintptr(fastrand()))
default:
return memhash(p, h, 8)
}
}
2
3
4
5
6
7
8
9
10
11
12
13
第 2 个 case,f != f 就是针对 NAN,这里会再加一个随机数。
这样,所有的谜题都解开了。由于 NAN 的特性:
NAN != NAN
hash(NAN) != hash(NAN)
2
因此如果从 map 中查找的 key 为 NAN 时, 什么也查不到;如果向其中增加了 4 次 NAN,遍历会得到 4 个 NAN。
结论:float 型可以作为 key,但是由于精度的问题,会导致一些诡异的问题出现,故慎用之。
# map 如何实现两种 get 操作
Go 语言中读取 map 有两种语法:带 comma 和不带 comma。当要查询的 key 不在 map 里,带 comma 的用法会返回一个 bool 型变量提示 key 是否在 map 中;而不带 comma 的语句则只会返回一 个 key 类型的零值。如果 key 是 int 型就会返回 0,如果 key 是 string 类型,则会返回空字符串。
两种用法的示例如下:
package main
import "fmt"
func main() {
ageMap := make(map[string]int)
ageMap["qcrao"] = 18
// 不带 comma 用法
age1 := ageMap["stefno"]
fmt.Println(age1)
// 带 comma 用法
age2, ok := ageMap["stefno"]
fmt.Println(age2, ok)
}
2
3
4
5
6
7
8
9
10
11
12
13
14
运行结果:
0
0 false
2
读者可能会感到奇怪,Go 语言并不支持重载,为什么同样的赋值语句会有两种不同类型的返回值呢?这其实是编译器在背后做的工作:编译器在分析完用户代码后,会将这两种语法分别对应到 runtime 两个不同的函数。
// src/runtime/map.go
func mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer
func mapaccess2(t *maptype, h *hmap, key unsafe.Pointer) (unsafe.Pointer, bool)
2
3
4
从上面两个函数的声明也可以看出差别,mapaccess2 函数返回值多了一个 bool 型变量,两者的代码也是完全一样的,只是 mapaccess2 函数在返回值后面多返回了一个 bool 型的值。
另外,根据 key 的类型,编译器还会将查找、插入、删除的函数用更具体的函数替换,以提 升效率,见表 3-4。
这些函数的参数类型直接是具体的 uint32、unt64、string,在函数内部由于提前知晓了 key 的类型,所以内存布局是很清楚的,因此能节省很多类型相关的操作,提高效率。
# 如何比较两个 map 是否相等
两个 map 深度相等的条件如下:
1)都为 nil。
2)非空、长度相等,指向同一个 map 实体对象。
3)相应的 key 指向的 value “深度” 相等。
三个条件是或的关系,满足任何一条即认为两个 map 深度相等。
直接使用 map1 == map2 是错误的,这种写法只能比较 map 是否为 nil。
package main
import "fmt"
func main() {
var m map[string]int
var n map[string]int
fmt.Println(m == nil)
fmt.Println(n == nil)
// 下面这条语句,不能通过编译
//fmt.Println(m == n)
}
2
3
4
5
6
7
8
9
10
11
12
输出结果:
true
true
2
# 可以对 map 的元素取地址吗
无法对 map 的 key 或 value 进行取址。以下代码不能通过编译:
package main
import "fmt"
func main() {
m := make(map[string]int)
fmt.Println(&m["qcrao"])
}
2
3
4
5
6
7
8
编译报错:
./main.go:8:14: cannot take the address of m["qcrao"]
如果通过其他 hack 的方式,例如 unsafe.Pointer 等获取到了 key 或 value 的地址,也不能长期持有,因为一旦发生扩容,key 和 value 的位置就会改变,之前保存的地址也就失效了。
# 可以边遍历边删除吗
前面讲过,map 并不是一个线程安全的数据结构。同时读写一个 map 是未定义的行为,如果被检测到,会直接 panic。
上面说的是发生在多个线程同时读写同一个 map 的情况下。如果在同一个协程内边遍历边删除,并不会检测到同时读写,理论上是可以这样做的。在 Go 官方文档中也有这样的例子支撑:
for key := range m {
if key.expired() {
delete(m, key)
}
}
2
3
4
5
能够这样做的原因可以追溯到语言规范,在 Go 的语言规范中并没有指定 map 的迭代顺序, 因此遍历一个 map 时不能保证每次迭代的顺序相同。如果在迭代过程中删除了尚未访问的 map 键值对,则不会产生相应的迭代值。同样的道理,如果在迭代过程中创建了新的 map 条目,该条目既可能在迭代过程中产生,也可能被跳过。对于每一个创建的条目,以及从一个迭代到下一个迭代,选择可能会有所不同。如果 map 为 nil,则迭代次数为 0。上面的例子中,只会删除当前正在迭代的 key。
实际上,sync.Map 是 Go 标准库提供的线程安全的 map,在存在多个读者和写者的情况下, 应该优先考虑使用。