golang中读写锁的原理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
Go 标准库中的 RWMutex 设计是 Write-preferring(写优先) 方案。
如果已经有一个 writer 在等待请求锁的话,它会阻止新来的请求锁的 reader获取到锁,所以优先保障 writer。
当然,如果有一些 reader 已经请求了锁的话,新请求的 writer 也会等待已经存在的 reader 都释放锁之后才能获取。
所以,写优先级设计中的优先权是针对新来的请求而言的。这种设计主要避免了 writer 的饥饿问题。

Go RwMutex使用了readerCount记录目前读请求的总数量
将readerCount进行取反操作 这也是此字段除了标记reader数量的第二个功能,进行写锁标记
此时将取反的r值交给readerWait代表仍需要等待释放锁的reader的数量
如果该数量为0 那么代表不需要等待则直接获取写锁即可,否则就将writer挂起阻塞直至readerWait中的所有读请求全部释放掉,然后RUlock唤醒该写请求

写锁释放时,会将readerCount再加回来,所以负的代表有写请求在,正的代表只有读请求

如果你遇到可以明确区分 reader 和 writer goroutine 的场景,且有大量的并发读、少量的并发写,并且有强烈的性能需求,你就可以考虑使用读写锁 RWMutex 替换 Mutex。

Read-preferring:读优先的设计可以提供很高的并发性,但是,在竞争激烈的情况下可能会导致写饥饿。这是因为,如果有大量的读,这种设计会导致只有所有的读都释放了锁之后,写才可能获取到锁。

Write-preferring:写优先的设计意味着,如果已经有一个 writer 在等待请求锁的话,它会阻止新来的请求锁的 reader获取到锁,所以优先保障 writer。当然,如果有一些 reader 已经请求了锁的话,新请求的 writer 也会等待已经存在的 reader 都释放 锁之后才能获取。所以,写优先级设计中的优先权是针对新来的请求而言的。这种设计主要避免了 writer 的饥饿问题。

不指定优先级:这种设计比较简单,不区分 reader 和 writer 优先级,某些场景下这种不指定优先级的设计反而更有效,因为第一类优先级会导致写饥饿,第二类优先级可能

Go 标准库中的 RWMutex 设计是 Write-preferring 方案。一个正在阻塞的 Lock 调用会排除新的 reader 请求到锁。

数据结构

1
2
3
4
5
6
7
8
type RWMutex struct { 
    w Mutex // 互斥锁解决多个writer的竞争
    writerSem uint32  // writer信号量
    readerSem uint32  // reader信号量
    readerCount int32 // reader的数量
    readerWait int32  // writer等待完成的reader的数量
}
const rwmutexMaxReaders = 1 << 30

源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
func (rw *RWMutex) RLock() { 
    if atomic.AddInt32(&rw.readerCount, 1) < 0 { 
        // rw.readerCount是负值的时候,意味着此时有writer等待请求锁,因为writer优先
        // 此时需要进行阻塞挂起,等待写锁的解锁
        runtime_SemacquireMutex(&rw.readerSem, false, 0) 
    }
}

func (rw *RWMutex) RUnlock() { 
    // 将已经加锁的读锁数量-1,如果此时-1后小于0时,则代表
	// 1:有可能反复解锁,此时需要抛出panic
	// 2:有writer正在等待获取写锁
    if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 { 
        rw.rUnlockSlow(r) // 有等待的writer 
    }
}

func (rw *RWMutex) rUnlockSlow(r int32) { 
    // 此时有一个writer正在等待获取写锁,
	// 如果当前解锁的reader是最后一个需要等待的读锁
	// 则唤醒等待读锁释放完的writer进行写锁的获取
    if atomic.AddInt32(&rw.readerWait, -1) == 0 { 
        // 最后一个reader了,writer终于有机会获得锁了
        runtime_Semrelease(&rw.writerSem, false, 1) 
    } 
}

func (rw *RWMutex) Lock() { 
	// 先将Mutex字段进行加锁,以免有其他写锁操作或者其他操作破坏数据
	rw.w.Lock()
	// 将readerCount进行取反操作 这也是此字段除了标记reader数量的第二个功能,进行写锁标记
	// 即标记有writer需要竞争
	r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
	// 此时将取反的r值交给readerWait代表仍需要等待释放锁的reader的数量
	// 如果该数量为0 那么代表不需要等待则直接获取写锁即可
	// 否则就将writer挂起阻塞直至RUlock唤醒
	if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
		runtime_SemacquireMutex(&rw.writerSem, false, 0)
	}
 }

 func (rw *RWMutex) Unlock() {
	// 写锁进行解锁时首先将加锁时取反的readerCount再次取反
	// 也就是解除当前有写锁正在竞争的标记
	r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
	// 如果取反后这个值大于rwmutexMaxReaders 就代表重复解锁
	// 抛出panic
	if r >= rwmutexMaxReaders {
		throw("sync: Unlock of unlocked RWMutex")
	}
	// 解锁完毕后需要根据等待的readerCount的数量去依次唤醒这些reader 
	// 这些reader是在Lock后再次请求获取读锁的reader的数量
	for i := 0; i < int(r); i++ {
		runtime_Semrelease(&rw.readerSem, false, 0)
	}
	// 把写锁的互斥锁解锁,以便于其他writer进行写操作的竞争
	rw.w.Unlock()
}

读锁加锁 –> readerCount + 1 < 0 (有写锁)–> 等待 (无写锁)–> 无需等待