Go语言RWMutex读写互斥锁

Thursday, September 14, 2023

RWMutex 可以说是 Mutex 的改进版,在某些场景下可以发挥出更加强大、灵活的控制能力,比如读取数据的频率远大于写数据的频率的场景。如果在 1 次写然后 N 次读的场景使用 Mutex 的话,则整个过程是串行的,因为即使 N 次读操作互相之间并不影响,但也都需要先持有 Mutex 锁后才可以操作。如果使用 RWMutex 的话,那么 N 次读操作可以同时持有锁,互相不影响,读操作都可以并行,因此并发能力将大大的提升。

Mutex与RWMutex区别与应用场景

Mutex 是最基本的互斥锁,在同一时间只允许一个 Goroutine 访问被保护资源。它适用于那些只有一个 Goroutine 需要修改资源或者执行临界区代码的情况。

然而,在某些场景下,我们允许多个 Goroutine 并发地读取共享资源,但同时也希望确保写操作是独占式进行的。这种情况下就可以使用 RWMutex。RWMutex 分为两种状态:读模式和写模式。

  • 在读模式下:

    • 多个 Goroutine 可以同时获取锁,并且都能够安全地对共享资源进行只读操作。
    • 如果已经有一个或多个 Goroutine 正在读取,则其它 Goroutine 仍然可以获取锁,但是不能执行写操作。
  • 在写模式下:

    • 只有一个 Goroutine 能够获取锁,并且能够对共享资源进行读取或写入操作。此时其它所有的读和写请求都会被阻塞。

实现 RWMutex 锁需要解决的问题

  • 写锁需要阻塞写锁:即一个协程拥有写锁时,其它协程的写锁需要阻塞;
  • 写锁需要阻塞读锁:即一个协程拥有写锁时,其它协程的读锁需要阻塞;
  • 读锁需要阻塞写锁:即一个协程拥有读锁时,其它协程的写锁需要阻塞;
  • 读锁不能阻塞读锁:即一个协程拥有读锁时,其它协程也可以拥有读锁。

RWMutex基本使用

可以通过sync包来使用RWMutex,首先创建一个新的RWMutex对象:

var rwMutex sync.RWMutex

然后就可以在代码中使用该对象了,要进入读模式(共享访问)或者写模式(独占访问),分别调用两个方法:

  • 进入读模式:rwMutex.RLock()
  • 进入写模式:rwMutex.Lock()

注意:在完成对共享资源的访问后,一定要释放锁。

  • 退出读模式:rwMutex.RUnlock()
  • 退出写模式:rwMutex.Unlock()

示例:

package main

import (
	"fmt"
	"sync"
)

// 共享资源
var counter = 0

// 创建新的RWMutex对象
var rwMutex sync.RWMutex

func main() {
	// 使用 WaitGroup 等待所有 Goroutine 完成任务
	wg := sync.WaitGroup{}

	// 启动十个并发的 Goroutine 增加 counter 的值
	for i := 0; i < 100; i++ {
		wg.Add(1)
		go incrementCounter(&wg)
	}

	// 等待所有 Goroutine 完成任务
	wg.Wait()

	// 输出最终结果
	fmt.Println("Final Counter:", counter)
}

func incrementCounter(wg *sync.WaitGroup) {
	// 进入写模式,获取锁
	rwMutex.Lock()
	// 在函数返回前释放锁,加锁和解锁一定要写在一起
	defer rwMutex.Unlock()

	// 对共享资源进行修改操作
	counter++

	// 通知 WaitGroup 当前 Goroutine 完成任务
	wg.Done()
}

输出:

Final Counter: 100

RWMutex实现原理

源码数据结构:

type RWMutex struct {
    w           Mutex  // 用于控制多个锁,获得写锁之前需要获取该锁
    writerSem   uint32 // 写阻塞等待的信号量,最后一个读者释放锁时会释放信号量
    readerSem   uint32 // 读阻塞的协程等待的信号量,持有写锁的协程释放锁后会释放信号量
    readerCount int32  // 记录读协程的个数
    readerWait  int32  // 记录写阻塞时读协程的个数
}

读写锁内部有一个Mutex互斥锁,用于将多个写操作隔离开,其他的几个都用于隔离读操作和写操作。

RWMutex还实现了四个接口,四个接口和上面的成员配合起来完成读写互斥锁的实现。

接口定义:
RWMutex提供4个简单接口:

  • RLock() :读锁定(记忆为 ReadLock)。
  • RUnlock():解除读锁定(记忆为 ReadLock)。
  • Lock() :写锁定,它与 Mutex 完全一致。
  • UnLock():解除写锁定,与 Mutex 完全一致。

Lock()实现逻辑:

写锁定操作需要做两件事:
1.获取互斥锁Mutex
2.若此时有读操作的话,需要阻塞等待所有的读操作结束

// Lock locks rw for writing.
// If the lock is already locked for reading or writing,
// Lock blocks until the lock is available.
func (rw *RWMutex) Lock() {
    if race.Enabled {
        _ = rw.w.state
        race.Disable()
    }
    // 首先,解决与其它写操作的竞争。
    rw.w.Lock()
    
    // 向读操作通知有一位待定的写操作。
    r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
    
    // 等待活跃的读操作结束。
    if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
        runtime_SemacquireMutex(&rw.writerSem, false, 0)
    }
    if race.Enabled {
        race.Enable()
        race.Acquire(unsafe.Pointer(&rw.readerSem))
        race.Acquire(unsafe.Pointer(&rw.writerSem))
    }
}

UnLock()实现逻辑

解除锁定需要做两件事:
1.唤醒因读锁定而被阻塞的协程(如果有的话)
2.解除互斥锁

// Unlock unlocks rw for writing. It is a run-time error if rw is
// not locked for writing on entry to Unlock.
//
// As with Mutexes, a locked RWMutex is not associated with a particular
// goroutine. One goroutine may RLock (Lock) a RWMutex and then
// arrange for another goroutine to RUnlock (Unlock) it.
func (rw *RWMutex) Unlock() {
    if race.Enabled {
        _ = rw.w.state
        race.Release(unsafe.Pointer(&rw.readerSem))
        race.Disable()
    }

    // 向读协程宣布没有活跃的写协程。
    r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
    if r >= rwmutexMaxReaders {
        race.Enable()
        fatal("sync: Unlock of unlocked RWMutex")
    }
    
    // 取消阻塞被阻塞的读协程(如果有的话)
    for i := 0; i < int(r); i++ {
        runtime_Semrelease(&rw.readerSem, false, 0)
    }
    
    // 解除写锁,允许其它写协程继续写
    rw.w.Unlock()
    if race.Enabled {
        race.Enable()
    }
}

RLock()实现逻辑

读锁定操作需要做两件事:
1.增加读操作的计数,即 readerCount++
2.若阻塞等待写操作结束

// Happens-before relationships are indicated to the race detector via:
// - Unlock  -> Lock:  readerSem
// - Unlock  -> RLock: readerSem
// - RUnlock -> Lock:  writerSem
//
// The methods below temporarily disable handling of race synchronization
// events in order to provide the more precise model above to the race
// detector.
func (rw *RWMutex) RLock() {
    if race.Enabled {
        _ = rw.w.state
        race.Disable()
    }
    if atomic.AddInt32(&rw.readerCount, 1) < 0 {
        // A writer is pending, wait for it.
        runtime_SemacquireMutex(&rw.readerSem, false, 0)
    }
    if race.Enabled {
        race.Enable()
        race.Acquire(unsafe.Pointer(&rw.readerSem))
    }
}

RUnlock()实现逻辑

解除读锁定需要做两件事:
1.减少读操作计数,即 readerCount- -
2.唤醒等待写操作的协程,如果有的话。注意:这里只有最后一个解除读锁定的协程才会释放信号量将该协程唤醒,因为只有当所有的读操作协程释放锁后才可以唤醒写操作协程。

// RUnlock undoes a single RLock call;
// it does not affect other simultaneous readers.
// It is a run-time error if rw is not locked for reading
// on entry to RUnlock.
func (rw *RWMutex) RUnlock() {
    if race.Enabled {
        _ = rw.w.state
        race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
        race.Disable()
    }
    if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
        // Outlined slow-path to allow the fast-path to be inlined
        rw.rUnlockSlow(r)
    }
    if race.Enabled {
        race.Enable()
    }
}

RWMutex关键实现

写操作如何阻止写操作?

读写锁包含一个互斥锁(Mutex),写锁定必须先获取该互斥锁,如果互斥锁已被协程 1 获取(或者协程 1 在阻塞等待读结束),则意味着协程 1 已获取了互斥锁,那么协程 2 只能阻塞等待该互斥锁。

所以写操作依赖互斥锁阻止其他的写操作。

写操作如何阻止读操作?

这是 RWMutex 实现中最最最精华的技巧。

我们知道 RWMutex.readerCount 是一个整型值,用于表示读协程数量。在不考虑写操作的情况下,每次读锁定将该值 +1,每次解除读锁定将该值 -1,所以 readerCount 的取值为 [0, N],N 为读操作的个数,实际上最大可支持 2的30次方个读操作。

当进行写锁定时,会将 readerCount 减去 2的30次方,从而让 readerCount 变成负值,此时再有读操作到来时检查 readerCount 为负值,便知道写操作在进行中,只好阻塞等待。

而真实的读操作个数却不会丢失,因为只需要将 readerCount + 2的30次方 即可恢复回来。

因此,写操作是将 readerCount 变成负值来阻止写操作的。

读操作如何阻止写操作?

读锁定会先将 RWMutex.readerCount + 1,此时写操作到来时发现读者数量不为 0,会阻塞等待所有读操作结束。

因此,读操作是通过 readerCount 来阻止写操作的。

为什么写锁定不会被 “饿死”?

我们知道写操作要等待读操作结束后才可以获得锁,写操作等待期间可能还有新的读操作持续到来,如果写操作等待所有的读操作结束,则可能会被 “饿死”,RWMutex 是通过 RWMutex.readerWait 来完美解决这个问题的。

写操作到来时,会把RWMutex.readerCount 的值复制到 RWMutex.readerWait 中,用于标记排在写操作前面的读操作个数。

前面的读操作结束后,除了递减 RWMutex.readerCount 的值,还会递减 RWMutex.readerWait 的值,所以 RWMutex.readerWait 的值变为 0 的时候,可以唤醒写操作了(真巧妙)。

因此写操作相当于把一段连续的读操作划成了两部分,前面的读操作结束后唤醒写操作,写操作结束后唤醒后续的读操作。

注意事项

RWMutex 是一种高级的并发控制机制,但相比于 Mutex 来说,它在处理读操作时会带来更小的开销。因此,在某些场景下可以使用 RWMutex 来提升程序的性能。

然而,在使用 RWMutex 时需要注意以下几点:

  • 不要滥用 RWMutex:只有在确实需要多个 Goroutine 并发地读取共享资源,并且同时保证独占式写入时才使用 RWMutex。
  • 避免持有锁太久:持有锁时间过长会影响整个系统的吞吐量。所以应该尽量缩小临界区大小。
  • 注意解决死锁问题:当多个 Goroutine 相互等待对方释放锁导致无法继续执行时,就产生了死锁。合理地设计和使用锁可以避免死锁的发生。
Golang修炼

Go匿名结构体提高搬砖效率

反射biubiubiu

comments powered by Disqus