Golang数据类型之Map源码实现

golang 数据类型 map 的源码实现

Map 的数据结构

Golang中的 map是一个 hmap类型的指针

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// 哈希表
type hmap struct {

	count     int // 哈希表中的 k-v数量
	flags     uint8 // 迭代 map 或者对 map进行写操作的时候,会记录标志位,用于一些并发场景的检测校验
	B         uint8  // 创建的 buckets 桶的数量为 2^B 个
	noverflow uint16 // 溢出的buckets数量
	hash0     uint32 // hash seed,为hash函数的结果引入随机性

	buckets    unsafe.Pointer // bucket数组指针,指向一个 []bmap 类型的数组,数组大小为 2^B, 我们将一个bmap 叫做一个桶, bucket 字段我们称之为正常桶,正常桶存满8个元素后,正常桶指向的下一个桶,我们将其称为溢出桶(拉链法)
	oldbuckets unsafe.Pointer // 当map扩容的时候,指向旧的 buckets 数组。
	nevacuate  uintptr        // 计数器,表示扩容时的迁移进度

	extra *mapextra // 用于GC,指向所有的溢出桶,正常桶里面的某个 bmap 存满了,会使用者里面的
}

// 溢出桶结构
type mapextra struct {
  // 如果 key 和 value 都不包含指针,并且可以被 inline (<= 128 字节)
  // 就是用 hmap 的 extra 字段来存储 overflow buckets,这样可以避免 GC 扫描整个 map。
  // 然而 bmap.overflow 也是个指针,这时候我们只能把这些 overflow 的指针
  // 都放在 hmap.extra.overflow 和 hmap.extra.oldoverflow 字段中。
  // overflow 包含的是 hmap.buckets 的 overflow 的 buckers
  // oldoverflow 包含扩容时的 hmap.oldbuckets 的 overflow的 buckets
  // 

	overflow    *[]*bmap  // 指针数组,指向所有的溢出桶
	oldoverflow *[]*bmap  // 指针数组,发生扩容时,指向所有旧的溢出桶

	// nextOverflow holds a pointer to a free overflow bucket.
	nextOverflow *bmap // 指向所有溢出桶中,下一个可以使用的溢出桶
}

// 正常 桶结构
type bmap struct {

	tophash [bucketCnt]uint8	// 存储key hash 值的高8位,用于决定 kv 键值对存放在桶内的哪个位置

	// 以下属性,编译时动态生成
	keys [bucketCnt]keytype	// 存放 key 的数组
	values [bucketCnt]valuetype // 存放 value 的数组
	pad uintptr // 用于对齐内存
	overflow uintptr // 指向下一个桶,即溢出桶
}
  • map的内存模型中,最重要的三种结构: hmap, bmap, mapextra
  • hmap 表示整个map,bmap 表示hmap中的一个桶,map底层其实是由很多歌桶组成的
  • 当一个桶存满之后,指向的下一个桶,就叫做溢出桶,溢出桶是拉链法的具体体现
  • mapextra 表示所有的溢出桶,之所以还需要重新的指向,目的是为了用于GC,避免GC时扫描整个map,仅扫描所有溢出桶就够了
  • 桶结构的很多字段得在编译时才会动态生成,比如 keyvalues
  • 桶结构中,之所以所有的 key 放一起,所有的 value 放一起,而不是 key/value 一对对的一起存放,目的便是在某些情况下可以省去 pad 字段,节省内存空间
  • golang 中的 map 使用的内存是不会收缩的,只会越用越多

map 中比较重要的3个数据结构: hmap, mapextra, bmap

  • hmap 为 map 的主要结构,hmap.buckets 指向了一个桶数组,数组中的每一个元素就是一个 bmap 结构
  • mapextra 表示所有的溢出桶,之所以还要重新的指向,目的是为了用于GC,避免GC时扫描整个map,仅扫描所有溢出桶就够了
  • bmap 为bucket 桶结构,存储 key/value 值,v1.17之前是 k/v/k/v…交替存储

Map的设计原理

hash值的使用

通过hash函数,key 可以得到一个唯一值,map将这个唯一值,分成高8位和低8位,分别有不同的用途

  • 低8位:用于寻找当前key 属于哪个 bucket
  • 高8位: 用于寻找当前 key 在bucket中的位置,bucket 有 tophash 字段,便是存储的高8位的值,用来声明当前bucket中有哪些key,这样搜索查找时就不用遍历bucket中的每个key,只要先看看 tophash 数组值即可,提高搜索查找效率

map 其使用的hash算法会根据硬件选择,比如如果cpu是否支持 aes,那么采用 aes hash,并且将hash值映射到bucket时,会采用位运算来规避mod的开销

桶的细节设计

bmap结构,即桶,是map中最重要的底层实现之一,其设计要点如下:

  • 桶是map中最小的挂载粒度: map中不是每个key都申请一个结构通过链表串联,而是每8个kv键值对存放在一个bucket中,然后桶再以链表的形式串联起来,这样做的原因就是减少对象的数量,减轻GC的负担。
  • 桶串联实现拉链法:当某个桶数量满了,会申请一个新桶,挂在这个桶后面形成链表,新桶优先使用预分配的桶。
  • 哈希高8位优化桶查找key:将key 哈希值的高8位存储在桶的 tophash数组中,这样查找时不用比较完整的 key 就能过滤掉不符合要求的 key, tophash中的值相等,再去比较 key 值
  • 桶中key/value分开存放:同种所有的key存一起,所有的value村一起,目的是为了方便内存对齐
  • 根据k/v大小存储不同值:当 k 或 v 大于 128字节时,其存储的字段为指针,指向k或v的实际内容,小于等于128字节,其存储的字段为原值
  • 桶的搬迁状态:可以根据 tophash 字段的值,是否小于minTopHash,来表示桶是否处于搬迁状态

map 的扩容和搬迁策略

map的底层扩容策略如下:

  • map 的扩容策略是新分配一个更大的数组,然后在插入和删除key的时候,将对应桶中的数据迁移到新分配的桶中去

map的搬迁策略如下:

  • 由于map扩容需要将原有的kv键值对搬迁到新的内存地址,直接一下子全部搬完会非常的影响性能
  • 采用渐进式的搬迁策略,将搬迁的O(N)开销均摊到O(1)的赋值和删除操作上

以下两种情况时,会进行扩容:

  • 当装载因子超过6.5时,扩容一倍,属于增量扩容
  • 当使用的溢出桶过多时间,重新分配一样大的内存空间,属于等量扩容,实际上没有扩容,主要是为了回收空闲的溢出桶

装载因子等于 map中元素的个数 / map的容量,即len(map) / 2^B

  • 载因子用来表示空闲位置的情况,装载因子越大,表明空闲位置越少,冲突也越多
  • 随着装载因子的增大,哈希表线性探测的平均用时就会增加,这会影响哈希表的性能,当装载因子大于70%,哈希表的性能就会急剧下降,当装载因子达到100%,整个哈希表就会完全失效,这个时候,查找和插入任意元素的复杂度都是O(N),因为需要遍历所有元素

为什么会出现以上两种情况?

  • 情况1:确实是数据量越来越多,撑不住了
  • 情况2:比较特殊,归根结底还是map删除的特性导致的,当我们不断向哈希表中插入数据,并且将他们又全部删除时,其内存占用并不会减少,因为删除只是将桶对应位置的tohash置nil而已,这种情况下,就会不断的积累溢出桶造成内存泄露。为了解决这种情况,采用了等量扩容的机制,一旦哈希表中出现了过多的溢出桶,她会创建新桶保存数据,gc会清理掉老的溢出桶,从而避免内存泄露。

如何定义溢出桶是否太多需要等量扩容呢?两种情况:

  • 当B小于15时,溢出桶的数量超过2^B,属于溢出桶数量太多,需要等量扩容
  • 当B大于等于15时,溢出桶数量超过2^B,属于溢出桶数量太多,需要等量扩容

Map 的源码实现

创建 map

创建 map,主要是创建 hmap 这个结构,以及对 hmap 的初始化

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func makemap(t *maptype, hint int, h *hmap) *hmap {
	mem, overflow := math.MulUintptr(uintptr(hint), t.Bucket.Size_)
	if overflow || mem > maxAlloc {
		hint = 0
	}

	// 初始化 Hmap
	if h == nil {
		h = new(hmap)
	}

	// 获取一个随机种子
	h.hash0 = fastrand()

	// 确定B的大小
	B := uint8(0)
	for overLoadFactor(hint, B) {
		B++
	}
	h.B = B


	if h.B != 0 {
		var nextOverflow *bmap
		h.buckets, nextOverflow = makeBucketArray(t, h.B, nil)
		if nextOverflow != nil {
			h.extra = new(mapextra)
			h.extra.nextOverflow = nextOverflow
		}
	}

	return h
}

默认会创建 2^B个bucket,如果b大于等于4,会预先创建一些溢出桶,b小于4的情况可能用不到溢出桶

map中赋值元素

  • 确认hash值
  • 根据hash值确认 key 所属的 bucket
  • 遍历所属桶和该桶串联的溢出桶,寻找key
  • 当前所属桶以及串联的溢出桶都已满,则创建一个新的溢出桶,串联的末尾
  • 根据 key 是否存在,选择插入还是更新 key/value
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
	if h == nil {
		panic(plainError("assignment to entry in nil map"))
	}
	if raceenabled {
		callerpc := getcallerpc()
		pc := abi.FuncPCABIInternal(mapassign)
		racewritepc(unsafe.Pointer(h), callerpc, pc)
		raceReadObjectPC(t.Key, key, callerpc, pc)
	}
	if msanenabled {
		msanread(key, t.Key.Size_)
	}
	if asanenabled {
		asanread(key, t.Key.Size_)
	}
	if h.flags&hashWriting != 0 {
		fatal("concurrent map writes")
	}

	// 第一步,确认 hash值
	hash := t.Hasher(key, uintptr(h.hash0))

	// Set hashWriting after calling t.hasher, since t.hasher may panic,
	// in which case we have not actually done a write.
	h.flags ^= hashWriting

	if h.buckets == nil {
		h.buckets = newobject(t.Bucket) // newarray(t.Bucket, 1)
	}

again:
	// 第二步,根据hash值确认所属的 bucket
	bucket := hash & bucketMask(h.B)
	if h.growing() {
		growWork(t, h, bucket)
	}
	b := (*bmap)(add(h.buckets, bucket*uintptr(t.BucketSize)))
	top := tophash(hash)

	var inserti *uint8
	var insertk unsafe.Pointer
	var elem unsafe.Pointer
bucketloop:
	// 第三步,遍历所属桶和此通串联的溢出桶,寻找key(通过桶的tophash字段和 key 值)
	for {
		for i := uintptr(0); i < bucketCnt; i++ {
			if b.tophash[i] != top {
				if isEmpty(b.tophash[i]) && inserti == nil {
					inserti = &b.tophash[i]
					insertk = add(unsafe.Pointer(b), dataOffset+i*uintptr(t.KeySize))
					elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.KeySize)+i*uintptr(t.ValueSize))
				}
				if b.tophash[i] == emptyRest {
					break bucketloop
				}
				continue
			}
			k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.KeySize))
			if t.IndirectKey() {
				k = *((*unsafe.Pointer)(k))
			}
			if !t.Key.Equal(key, k) {
				continue
			}
			// already have a mapping for key. Update it.
			if t.NeedKeyUpdate() {
				typedmemmove(t.Key, k, key)
			}
			elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.KeySize)+i*uintptr(t.ValueSize))
			goto done
		}
		ovf := b.overflow(t)
		if ovf == nil {
			break
		}
		b = ovf
	}

	// Did not find mapping for key. Allocate new cell & add entry.

	// If we hit the max load factor or we have too many overflow buckets,
	// and we're not already in the middle of growing, start growing.
	if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
		hashGrow(t, h)
		goto again // Growing the table invalidates everything, so try again
	}

	// 第四步,当前链上所有桶都满了,创建一个新的溢出桶,串联的末尾,然后更新相关字段
	if inserti == nil {
		// The current bucket and all the overflow buckets connected to it are full, allocate a new one.
		newb := h.newoverflow(t, b)
		inserti = &newb.tophash[0]
		insertk = add(unsafe.Pointer(newb), dataOffset)
		elem = add(insertk, bucketCnt*uintptr(t.KeySize))
	}

	// 第五步,在插入位置存储新的 key/elem
	if t.IndirectKey() {
		kmem := newobject(t.Key)
		*(*unsafe.Pointer)(insertk) = kmem
		insertk = kmem
	}
	if t.IndirectElem() {
		vmem := newobject(t.Elem)
		*(*unsafe.Pointer)(elem) = vmem
	}
	typedmemmove(t.Key, insertk, key)
	*inserti = top
	h.count++

done:
	if h.flags&hashWriting == 0 {
		fatal("concurrent map writes")
	}
	h.flags &^= hashWriting
	if t.IndirectElem() {
		elem = *((*unsafe.Pointer)(elem))
	}
	return elem
}

删除元素

  • 写保护
  • 获取hash值
  • 根据hash值确定bucket位置,判断是否需要扩容
  • 遍历桶和桶串联的溢出桶
  • 找到key,将tophash中key的标志位置空
  • 删除 key 仅仅只是将其对应的 tophash 值置空,如果 kv 存储的是指针,那么会清理指针指向的内存,否则不会真正回收内存,内存占用并不会减少
  • 如果正在扩容,并且操作的 bucket 没有搬迁完,那么会搬迁bucket
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
func mapdelete(t *maptype, h *hmap, key unsafe.Pointer) {
	if raceenabled && h != nil {
		callerpc := getcallerpc()
		pc := abi.FuncPCABIInternal(mapdelete)
		racewritepc(unsafe.Pointer(h), callerpc, pc)
		raceReadObjectPC(t.Key, key, callerpc, pc)
	}
	if msanenabled && h != nil {
		msanread(key, t.Key.Size_)
	}
	if asanenabled && h != nil {
		asanread(key, t.Key.Size_)
	}
	if h == nil || h.count == 0 {
		if err := mapKeyError(t, key); err != nil {
			panic(err) // see issue 23734
		}
		return
	}
	if h.flags&hashWriting != 0 {
		fatal("concurrent map writes")
	}

	hash := t.Hasher(key, uintptr(h.hash0))

	// Set hashWriting after calling t.hasher, since t.hasher may panic,
	// in which case we have not actually done a write (delete).
	h.flags ^= hashWriting

	bucket := hash & bucketMask(h.B)
	if h.growing() {
		growWork(t, h, bucket)
	}
	b := (*bmap)(add(h.buckets, bucket*uintptr(t.BucketSize)))
	bOrig := b
	top := tophash(hash)
search:
	for ; b != nil; b = b.overflow(t) {
		for i := uintptr(0); i < bucketCnt; i++ {
			if b.tophash[i] != top {
				if b.tophash[i] == emptyRest {
					break search
				}
				continue
			}
			k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.KeySize))
			k2 := k
			if t.IndirectKey() {
				k2 = *((*unsafe.Pointer)(k2))
			}
			if !t.Key.Equal(key, k2) {
				continue
			}
			// Only clear key if there are pointers in it.
			if t.IndirectKey() {
				*(*unsafe.Pointer)(k) = nil
			} else if t.Key.PtrBytes != 0 {
				memclrHasPointers(k, t.Key.Size_)
			}
			e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.KeySize)+i*uintptr(t.ValueSize))
			if t.IndirectElem() {
				*(*unsafe.Pointer)(e) = nil
			} else if t.Elem.PtrBytes != 0 {
				memclrHasPointers(e, t.Elem.Size_)
			} else {
				memclrNoHeapPointers(e, t.Elem.Size_)
			}
			b.tophash[i] = emptyOne
			// If the bucket now ends in a bunch of emptyOne states,
			// change those to emptyRest states.
			// It would be nice to make this a separate function, but
			// for loops are not currently inlineable.
			if i == bucketCnt-1 {
				if b.overflow(t) != nil && b.overflow(t).tophash[0] != emptyRest {
					goto notLast
				}
			} else {
				if b.tophash[i+1] != emptyRest {
					goto notLast
				}
			}
			for {
				b.tophash[i] = emptyRest
				if i == 0 {
					if b == bOrig {
						break // beginning of initial bucket, we're done.
					}
					// Find previous bucket, continue at its last entry.
					c := b
					for b = bOrig; b.overflow(t) != c; b = b.overflow(t) {
					}
					i = bucketCnt - 1
				} else {
					i--
				}
				if b.tophash[i] != emptyOne {
					break
				}
			}
		notLast:
			h.count--
			// Reset the hash seed to make it more difficult for attackers to
			// repeatedly trigger hash collisions. See issue 25237.
			if h.count == 0 {
				h.hash0 = fastrand()
			}
			break search
		}
	}

	if h.flags&hashWriting == 0 {
		fatal("concurrent map writes")
	}
	h.flags &^= hashWriting
}

查询元素

  • 计算hash值,并根据hash值找到桶
  • 遍历桶以及该桶串联的溢出桶,查找key
  • 如果根据hash值定位到桶正在搬迁,并且这个桶还没有搬迁到新的hash表中,那么就从老的hash表中查找
  • 在bucket中进行顺序查找,使用高8位进行快速过滤,高8位相等,再比较key是否相等,找到就返回 value,如果当前bucket未找到,就继续找溢出桶中的数据,都没有的话则返回零值

扩容和搬迁

  • 扩容触发条件: 当 map 中的键值对数量达到负载因子(load factor)乘以桶的数量时,就会触发扩容。负载因子是键值对数量与桶的数量的比值,它可以看作是哈希表的填充程度。扩容条件通常是负载因子达到阈值(通常为0.75),此时 map 会自动扩容。
  • 创建新的哈希表:在扩容时,Go 会创建一个新的哈希表,通常将当前哈希表的大小翻倍。新的哈希表的桶数量会是当前桶数量的两倍
  • 重新计算哈希值:对于当前哈希表中的每个键值对,会重新计算其哈希值。由于新哈希表的桶数量发生变化,哈希函数可能也会不同。
  • 搬迁键值对:将重新计算哈希值后的键值对搬迁到新的哈希表中。这个过程可能涉及到哈希冲突的处理,因为在新的哈希表中,同一个桶上可能会有多个键值对。链地址法被用来处理这种情况,即将相同桶上的键值对存储在链表中。
  • 替换旧的哈希表:一旦所有的键值对都搬迁完成,新的哈希表会取代旧的哈希表,成为 map 的底层数据结构。
  • 释放旧哈希表:最后,旧的哈希表会被释放,回收其占用的内存。

这个扩容和搬迁的过程保证了 map 在不断插入新的键值对时,能够动态地调整大小以保持较低的负载因子,从而提高性能。需要注意的是,由于搬迁的过程,map 的迭代顺序在扩容后可能会发生变化。这是因为新的哈希表的桶数量和哈希函数可能不同,导致键值对的分布发生改变。因此,如果迭代顺序对你的程序逻辑有影响,需要谨慎处理。

并发安全

map 并不是并发安全的,如果多个 goroutine 同时对同一个 map 进行读写操作,可能会导致数据竞争,进而导致程序行为不一致,甚至崩溃,这是因为 map 操作不是原子的,例如读取、写入等操作都可能被打断,从而导致并发访问问题

以下是一个简单的例子,演示了 map 的并发安全问题:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
func main() {
	myMap := make(map[string]int)

	for i := 0; i < 10; i++ {
		go func (i int)  {
			myMap[fmt.Sprintf("key%d", i)] = i	
		}(i)
	}

	go func () {
		for k, v := range myMap {
			fmt.Println(k, v)
		}
	}()

	time.Sleep(time.Second * 2)
}

在上述例子中,多个 goroutine 同时向 myMap 写入数据,同时一个 goroutine 读取 myMap 的数据。由于 map 操作不是并发安全的,可能会发生数据竞争,导致输出结果不确定。

为了解决这个问题,可以采用以下两种方式之一:

  • 加锁保护:使用 sync.Mutex 或者其他互斥锁来保护对 map 的并发访问。在读写 map 之前,使用锁进行加锁,操作完成后解锁。这样可以确保在任何时刻只有一个 goroutine 能够访问 map。
  • 使用并发安全的数据结构:Go 提供了 sync.Map 类型,它是一种并发安全的映射。sync.Map 的操作是原子的,不需要额外的锁。使用 sync.Map 可以避免手动管理锁,但也需要注意它的一些特性,例如不能通过 range 直接迭代,需要使用 Load 和 Range 方法

map 的无序性

Go 中的 map 是无序的,这意味着在遍历 map 时,元素的顺序是不确定的。这是由于 map 的实现原理和设计选择所导致的。

  • 哈希表实现map 的底层实现是哈希表,它使用哈希函数将键映射到桶(buckets)中。在理想情况下,哈希函数将键均匀地分散到桶中,但由于哈希表的大小是有限的,而键的域可能是无限的,因此会出现哈希冲突。
  • 冲突解决方法:Go使用链地址发来解决哈希冲突,如果两个键映射到同一个bucket,他们将被链接成一个链表,当桶的链表较长时,可能会影响查找性能,但由于哈希表的大小是固定的,冲突是不可避免的
  • 动态扩容:为了保持较低的负载因子,map在键值对数量达到一定阈值(负载因子达到0.75)时会自动进行扩容,这就导致了哈希表的重新排列,而这个重新排列会使得元素的顺序发生变化
皖ICP备20014602号
Built with Hugo
Theme Stack designed by Jimmy