RWMutex 可以说是 Mutex 的改进版,在某些场景下可以发挥出更加强大、灵活的控制能力,比如读取数据的频率远大于写数据的频率的场景。如果在 1 次写然后 N 次读的场景使用 Mutex 的话,则整个过程是串行的,因为即使 N 次读操作互相之间并不影响,但也都需要先持有 Mutex 锁后才可以操作。如果使用 RWMutex 的话,那么 N 次读操作可以同时持有锁,互相不影响,读操作都可以并行,因此并发能力将大大的提升。
Mutex与RWMutex区别与应用场景
Mutex 是最基本的互斥锁,在同一时间只允许一个 Goroutine 访问被保护资源。它适用于那些只有一个 Goroutine 需要修改资源或者执行临界区代码的情况。
然而,在某些场景下,我们允许多个 Goroutine 并发地读取共享资源,但同时也希望确保写操作是独占式进行的。这种情况下就可以使用 RWMutex。RWMutex 分为两种状态:读模式和写模式。
-
在读模式下:
- 多个 Goroutine 可以同时获取锁,并且都能够安全地对共享资源进行只读操作。
- 如果已经有一个或多个 Goroutine 正在读取,则其它 Goroutine 仍然可以获取锁,但是不能执行写操作。
-
在写模式下:
- 只有一个 Goroutine 能够获取锁,并且能够对共享资源进行读取或写入操作。此时其它所有的读和写请求都会被阻塞。
实现 RWMutex 锁需要解决的问题
- 写锁需要阻塞写锁:即一个协程拥有写锁时,其它协程的写锁需要阻塞;
- 写锁需要阻塞读锁:即一个协程拥有写锁时,其它协程的读锁需要阻塞;
- 读锁需要阻塞写锁:即一个协程拥有读锁时,其它协程的写锁需要阻塞;
- 读锁不能阻塞读锁:即一个协程拥有读锁时,其它协程也可以拥有读锁。
RWMutex基本使用
可以通过sync包来使用RWMutex,首先创建一个新的RWMutex对象:
var rwMutex sync.RWMutex
然后就可以在代码中使用该对象了,要进入读模式(共享访问)或者写模式(独占访问),分别调用两个方法:
- 进入读模式:
rwMutex.RLock()
- 进入写模式:
rwMutex.Lock()
注意:在完成对共享资源的访问后,一定要释放锁。
- 退出读模式:
rwMutex.RUnlock()
- 退出写模式:
rwMutex.Unlock()
示例:
package main
import (
"fmt"
"sync"
)
// 共享资源
var counter = 0
// 创建新的RWMutex对象
var rwMutex sync.RWMutex
func main() {
// 使用 WaitGroup 等待所有 Goroutine 完成任务
wg := sync.WaitGroup{}
// 启动十个并发的 Goroutine 增加 counter 的值
for i := 0; i < 100; i++ {
wg.Add(1)
go incrementCounter(&wg)
}
// 等待所有 Goroutine 完成任务
wg.Wait()
// 输出最终结果
fmt.Println("Final Counter:", counter)
}
func incrementCounter(wg *sync.WaitGroup) {
// 进入写模式,获取锁
rwMutex.Lock()
// 在函数返回前释放锁,加锁和解锁一定要写在一起
defer rwMutex.Unlock()
// 对共享资源进行修改操作
counter++
// 通知 WaitGroup 当前 Goroutine 完成任务
wg.Done()
}
输出:
Final Counter: 100
RWMutex实现原理
源码数据结构:
type RWMutex struct {
w Mutex // 用于控制多个锁,获得写锁之前需要获取该锁
writerSem uint32 // 写阻塞等待的信号量,最后一个读者释放锁时会释放信号量
readerSem uint32 // 读阻塞的协程等待的信号量,持有写锁的协程释放锁后会释放信号量
readerCount int32 // 记录读协程的个数
readerWait int32 // 记录写阻塞时读协程的个数
}
读写锁内部有一个Mutex互斥锁,用于将多个写操作隔离开,其他的几个都用于隔离读操作和写操作。
RWMutex还实现了四个接口,四个接口和上面的成员配合起来完成读写互斥锁的实现。
接口定义:
RWMutex提供4个简单接口:
- RLock() :读锁定(记忆为 ReadLock)。
- RUnlock():解除读锁定(记忆为 ReadLock)。
- Lock() :写锁定,它与 Mutex 完全一致。
- UnLock():解除写锁定,与 Mutex 完全一致。
Lock()实现逻辑:
写锁定操作需要做两件事:
1.获取互斥锁Mutex
2.若此时有读操作的话,需要阻塞等待所有的读操作结束
// Lock locks rw for writing.
// If the lock is already locked for reading or writing,
// Lock blocks until the lock is available.
func (rw *RWMutex) Lock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
// 首先,解决与其它写操作的竞争。
rw.w.Lock()
// 向读操作通知有一位待定的写操作。
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// 等待活跃的读操作结束。
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
race.Acquire(unsafe.Pointer(&rw.writerSem))
}
}
UnLock()实现逻辑
解除锁定需要做两件事:
1.唤醒因读锁定而被阻塞的协程(如果有的话)
2.解除互斥锁
// Unlock unlocks rw for writing. It is a run-time error if rw is
// not locked for writing on entry to Unlock.
//
// As with Mutexes, a locked RWMutex is not associated with a particular
// goroutine. One goroutine may RLock (Lock) a RWMutex and then
// arrange for another goroutine to RUnlock (Unlock) it.
func (rw *RWMutex) Unlock() {
if race.Enabled {
_ = rw.w.state
race.Release(unsafe.Pointer(&rw.readerSem))
race.Disable()
}
// 向读协程宣布没有活跃的写协程。
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
fatal("sync: Unlock of unlocked RWMutex")
}
// 取消阻塞被阻塞的读协程(如果有的话)
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// 解除写锁,允许其它写协程继续写
rw.w.Unlock()
if race.Enabled {
race.Enable()
}
}
RLock()实现逻辑
读锁定操作需要做两件事:
1.增加读操作的计数,即 readerCount++
2.若阻塞等待写操作结束
// Happens-before relationships are indicated to the race detector via:
// - Unlock -> Lock: readerSem
// - Unlock -> RLock: readerSem
// - RUnlock -> Lock: writerSem
//
// The methods below temporarily disable handling of race synchronization
// events in order to provide the more precise model above to the race
// detector.
func (rw *RWMutex) RLock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// A writer is pending, wait for it.
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
}
}
RUnlock()实现逻辑
解除读锁定需要做两件事:
1.减少读操作计数,即 readerCount- -
2.唤醒等待写操作的协程,如果有的话。注意:这里只有最后一个解除读锁定的协程才会释放信号量将该协程唤醒,因为只有当所有的读操作协程释放锁后才可以唤醒写操作协程。
// RUnlock undoes a single RLock call;
// it does not affect other simultaneous readers.
// It is a run-time error if rw is not locked for reading
// on entry to RUnlock.
func (rw *RWMutex) RUnlock() {
if race.Enabled {
_ = rw.w.state
race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
race.Disable()
}
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// Outlined slow-path to allow the fast-path to be inlined
rw.rUnlockSlow(r)
}
if race.Enabled {
race.Enable()
}
}
RWMutex关键实现
写操作如何阻止写操作?
读写锁包含一个互斥锁(Mutex),写锁定必须先获取该互斥锁,如果互斥锁已被协程 1 获取(或者协程 1 在阻塞等待读结束),则意味着协程 1 已获取了互斥锁,那么协程 2 只能阻塞等待该互斥锁。
所以写操作依赖互斥锁阻止其他的写操作。
写操作如何阻止读操作?
这是 RWMutex 实现中最最最精华的技巧。
我们知道 RWMutex.readerCount 是一个整型值,用于表示读协程数量。在不考虑写操作的情况下,每次读锁定将该值 +1,每次解除读锁定将该值 -1,所以 readerCount 的取值为 [0, N],N 为读操作的个数,实际上最大可支持 2的30次方个读操作。
当进行写锁定时,会将 readerCount 减去 2的30次方,从而让 readerCount 变成负值,此时再有读操作到来时检查 readerCount 为负值,便知道写操作在进行中,只好阻塞等待。
而真实的读操作个数却不会丢失,因为只需要将 readerCount + 2的30次方 即可恢复回来。
因此,写操作是将 readerCount 变成负值来阻止写操作的。
读操作如何阻止写操作?
读锁定会先将 RWMutex.readerCount + 1,此时写操作到来时发现读者数量不为 0,会阻塞等待所有读操作结束。
因此,读操作是通过 readerCount 来阻止写操作的。
为什么写锁定不会被 “饿死”?
我们知道写操作要等待读操作结束后才可以获得锁,写操作等待期间可能还有新的读操作持续到来,如果写操作等待所有的读操作结束,则可能会被 “饿死”,RWMutex 是通过 RWMutex.readerWait 来完美解决这个问题的。
写操作到来时,会把RWMutex.readerCount 的值复制到 RWMutex.readerWait 中,用于标记排在写操作前面的读操作个数。
前面的读操作结束后,除了递减 RWMutex.readerCount 的值,还会递减 RWMutex.readerWait 的值,所以 RWMutex.readerWait 的值变为 0 的时候,可以唤醒写操作了(真巧妙)。
因此写操作相当于把一段连续的读操作划成了两部分,前面的读操作结束后唤醒写操作,写操作结束后唤醒后续的读操作。
注意事项
RWMutex 是一种高级的并发控制机制,但相比于 Mutex 来说,它在处理读操作时会带来更小的开销。因此,在某些场景下可以使用 RWMutex 来提升程序的性能。
然而,在使用 RWMutex 时需要注意以下几点:
- 不要滥用 RWMutex:只有在确实需要多个 Goroutine 并发地读取共享资源,并且同时保证独占式写入时才使用 RWMutex。
- 避免持有锁太久:持有锁时间过长会影响整个系统的吞吐量。所以应该尽量缩小临界区大小。
- 注意解决死锁问题:当多个 Goroutine 相互等待对方释放锁导致无法继续执行时,就产生了死锁。合理地设计和使用锁可以避免死锁的发生。