Go进阶—并发编程 Mutex 2024-02-04 默认分类 暂无评论 520 次阅读 [转自:【Go进阶—并发编程】Mutex https://segmentfault.com/a/1190000041467918](https://segmentfault.com/a/1190000041467918 "【Go进阶—并发编程】Mutex") 总结来说,GO语言中mutex的两种模式(正常模式和饥饿模式)的主要区别在于对等待锁的goroutine的优先级处理上: 正常模式:等待锁的goroutine会按照先后顺序依次获取锁,即先到先得。即使等待时间较长,也不会主动放弃获取锁的机会。 饥饿模式:等待锁的goroutine可能会主动放弃获取锁的机会,从而让出锁的使用权给等待时间更长的goroutine。这样可以确保等待时间较长的goroutine更容易获取到锁,提高了公平性和避免长时间的等待。 因此,正常模式注重等待锁goroutine的先后顺序,而饥饿模式注重等待时间较长的goroutine能更容易获取到锁,提高了公平性。在使用mutex时,根据实际的并发场景和性能需求,可以选择合适的模式来平衡并发性能和公平性。 互斥锁是并发程序中对临界资源进行访问控制的最基本手段,Mutex 即为 Go 语言原生的互斥锁实现。 **数据结构** 源码包 `src/sync/mutex.go` 中定义了 Mutex 的数据结构: ```go type Mutex struct { state int32 sema uint32 } ``` ![](https://i-cooltea.top/usr/uploads/2024/02/1294005778.png) - Locked: 表示该互斥锁是否已被锁定; - Woken: 表示是否从正常模式被唤醒; - Starving:表示该Mutex是否处于饥饿状态; - Waiter: 表示互斥锁上阻塞等待的协程个数。 sema 字段表示信号量,加锁失败的协程阻塞等待该信号量,解锁的协程释放信号量从而唤醒等待信号量的协程。 **正常模式和饥饿模式** Mutex 有两种模式——正常模式和饥饿模式,饥饿模式是 1.9 版本中引入的优化,目的是保证互斥锁的公平性,防止协程饿死。默认情况下,Mutex 的模式为正常模式。 在正常模式下,协程如果加锁不成功不会立即转入等待队列,而是判断是否满足自旋的条件,如果满足则会自旋。 **当持有锁的协程释放锁的时候,会释放一个信号量来唤醒等待队列中的一个协程,但如果有协程正处于自旋过程中,锁往往会被该自旋协程获取到。被唤醒的协程只好再次阻塞,不过阻塞前会判断自上次阻塞到本次阻塞经过了多长时间,如果超过 1ms 的话,会将 Mutex 标记为饥饿模式。** 在饥饿模式下,新加锁的协程不会进入自旋状态,它们只会在队列的末尾等待,互斥锁被释放后会直接交给等待队列最前面的协程。如果一个协程获得了互斥锁并且它在队列的末尾或者它等待的时间少于 1ms,那么互斥锁就会切换回正常模式。 **方法** 互斥锁 Mutex 就提供两个方法 Lock 和 Unlock:进入临界区之前调用 Lock 方法,退出临界区的时候调用 Unlock 方法。 **Lock** ```go func (m *Mutex) Lock() { if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { return } m.lockSlow() } ``` 1. 判断当前 Goroutine 能否进入自旋; 2. 通过自旋等待互斥锁的释放; 3. 计算互斥锁的最新状态; 4. 更新互斥锁的状态并获取锁。 **判断当前 Goroutine 能否进入自旋** ```go func (m *Mutex) lockSlow() { var waitStartTime int64 starving := false awoke := false iter := 0 old := m.state for { if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } runtime_doSpin() iter++ old = m.state continue } ``` - 互斥锁只有在普通模式才能进入自旋; - runtime.sync\_runtime\_canSpin 需要返回 true: - 运行在多 CPU 的机器上; - 当前 Goroutine 为了获取该锁进入自旋的次数小于 4 次; - 当前机器上至少存在一个正在运行的处理器 P 并且处理的运行队列为空。 **通过自旋等待互斥锁的释放** 一旦当前 Goroutine 能够进入自旋就会调用 runtime.sync\_runtime\_doSpin 和 runtime.procyield 执行 30 次的 PAUSE 指令,该指令只会占用 CPU 并消耗 CPU 时间: ```go func sync_runtime_doSpin() { procyield(active_spin_cnt) } TEXT runtime·procyield(SB),NOSPLIT,$0-0 MOVL cycles+0(FP), AX again: PAUSE SUBL $1, AX JNZ again RET ``` 处理了自旋相关的逻辑后,会根据上下文计算当前互斥锁最新的状态。几个不同的条件分别会更新 state 字段中存储的不同信息 — mutexLocked、mutexStarving、mutexWoken 和 mutexWaiterShift: ```go new := old if old&mutexStarving == 0 { new |= mutexLocked } if old&(mutexLocked|mutexStarving) != 0 { new += 1 << mutexWaiterShift } if starving && old&mutexLocked != 0 { new |= mutexStarving } if awoke { if new&mutexWoken == 0 { throw("sync: inconsistent mutex state") } new &^= mutexWoken } ``` 计算了新的互斥锁状态之后,会通过 CAS 函数更新状态。如果没有获得锁,会调用 runtime.sync\_runtime\_SemacquireMutex 通过信号量保证资源不会被两个 Goroutine 获取。runtime.sync\_runtime\_SemacquireMutex 会在方法中不断尝试获取锁并陷入休眠等待信号量的释放,一旦当前 Goroutine 可以获取信号量,它就会立刻返回,继续执行剩余代码。 - 在正常模式下,这段代码会设置唤醒和饥饿标记、重置迭代次数并重新执行获取锁的循环; - 在饥饿模式下,当前 Goroutine 会获得互斥锁,如果等待队列中只存在当前 Goroutine,互斥锁还会从饥饿模式中退出。 ```go if atomic.CompareAndSwapInt32(&m.state, old, new) { if old&(mutexLocked|mutexStarving) == 0 { break } queueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } runtime_SemacquireMutex(&m.sema, queueLifo, 1) starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state if old&mutexStarving != 0 { if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state") } delta := int32(mutexLocked - 1<>mutexWaiterShift == 1 { delta -= mutexStarving } atomic.AddInt32(&m.state, delta) break } awoke = true iter = 0 } else { old = m.state } } } ``` ```go func (m *Mutex) Unlock() { new := atomic.AddInt32(&m.state, -mutexLocked) if new != 0 { m.unlockSlow(new) } } func (m *Mutex) unlockSlow(new int32) { if (new+mutexLocked)&mutexLocked == 0 { throw("sync: unlock of unlocked mutex") } if new&mutexStarving == 0 { old := new for { if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } new = (old - 1<**易错场景** 使用 Mutex 常见的错误场景有 4 类,分别是 Lock/Unlock 不是成对出现、Copy 已使用的 Mutex、重入和死锁。其他三种比较简单,这里重点介绍一下有关重入的问题。 **重入** 标准库 Mutex 不是可重入锁,也就是指在一个 goroutine 中不可以多次获取同一把锁。如果想在 Mutex 的基础上要实现一个可重入锁的话,可以有下面两个方案: - 通过 hacker 的方式获取到 goroutine id,记录下获取锁的 goroutine id,它可以实现 Locker 接口。 - 调用 Lock/Unlock 方法时,由 goroutine 提供一个 token,用来标识它自己,而不是我们通过 hacker 的方式获取到 goroutine id,但是,这样一来就不满足 Locker 接口。 可重入锁解决了代码重入或者递归调用带来的死锁问题,同时它也带来了另一个好处,就是我们可以要求,只有持有锁的 goroutine 才能 unlock 这个锁。这也很容易实现,因为在上面这两个方案中,都已经记录了是哪一个 goroutine 持有这个锁。 **方案一:goroutine id** 这个方案的关键第一步是获取 goroutine id,方式有两种,分别是简单方式和 hacker 方式。 简单方式,就是通过 runtime.Stack 方法获取栈帧信息,栈帧信息里包含 goroutine id。runtime.Stack 方法可以获取当前的 goroutine 信息。 接下来我们来看 hacker 的方式,我们获取运行时的 g 指针,反解出对应的 g 的结构。每个运行的 goroutine 结构的 g 指针保存在当前 goroutine 的一个叫做 TLS 对象中。 1. 我们先获取到 TLS 对象; 2. 再从 TLS 中获取 goroutine 结构的 g 指针; 3. 再从 g 指针中取出 goroutine id。 我们没有必要重复发明轮子,直接使用第三方的库来获取 goroutine id 就可以了。现在已经有很多成熟的库了,比如 [petermattis/goid](https://link.segmentfault.com/?enc=v8XEU26YASM7mCuv%2BEXAlg%3D%3D.8zez98cfJ1xpRdqM73sq6PWtQNv%2BwW9jvftKAO5yoYv9%2Bf%2FS6h%2Bl%2FRExrohEI%2FY7)。接下来我们实现一个可以使用的可重入锁: ```go type RecursiveMutex struct { sync.Mutex owner int64 recursion int32 } func (m *RecursiveMutex) Lock() { gid := goid.Get() if atomic.LoadInt64(&m.owner) == gid { m.recursion++ return } m.Mutex.Lock() atomic.StoreInt64(&m.owner, gid) m.recursion = 1 } func (m *RecursiveMutex) Unlock() { gid := goid.Get() if atomic.LoadInt64(&m.owner) != gid { panic(fmt.Sprintf("wrong the owner(%d): %d!", m.owner, gid)) } m.recursion-- if m.recursion != 0 { return } atomic.StoreInt64(&m.owner, -1) m.Mutex.Unlock() } ``` 方案一是用 goroutine id 做 goroutine 的标识,我们也可以让 goroutine 自己来提供标识。不管怎么说,Go 开发者不期望使用者利用 goroutine id 做一些不确定的东西,所以,他们没有暴露获取 goroutine id 的方法。 我们可以这么设计,调用者自己提供一个 token,获取锁的时候把这个 token 传入,释放锁的时候也需要把这个 token 传入。通过用户传入的 token 替换方案一中 goroutine id,其它逻辑和方案一一致。 **拓展** **TryLock** 我们可以为 Mutex 添加一个 TryLock 的方法,也就是尝试获取锁。当一个 goroutine 调用这个 TryLock 方法请求锁的时候,如果这把锁没有被其他 goroutine 所持有,那么,这个 goroutine 就持有了这把锁,并返回 true。如果这把锁已经被其他 goroutine 所持有,或者是正在准备交给某个被唤醒的 goroutine,那么就直接返回 false,不会阻塞在方法调用上。 ```go const ( mutexLocked = 1 << iota mutexWoken mutexStarving mutexWaiterShift = iota ) type Mutex struct { sync.Mutex } func (m *Mutex) TryLock() bool { if atomic.CompareAndSwapInt32((*int32)(unsafe.Pointer(&m.Mutex)), 0, mutexLocked) { return true } old := atomic.LoadInt32((*int32)(unsafe.Pointer(&m.Mutex))) if old&(mutexLocked|mutexStarving|mutexWoken) != 0 { return false } new := old | mutexLocked return atomic.CompareAndSwapInt32((*int32)(unsafe.Pointer(&m.Mutex)), old, new) } ``` 如果锁已经被其他 goroutine 所持有,或者被其他唤醒的 goroutine 准备持有,那么,就直接返回 false,不再请求,代码逻辑在第 23 行。 如果没有被持有,也没有其它唤醒的 goroutine 来竞争锁,锁也不处于饥饿状态,就尝试获取这把锁(第 29 行),不论是否成功都将结果返回。因为,这个时候,可能还有其他的 goroutine 也在竞争这把锁,所以,不能保证成功获取这把锁。 **获取等待者的数量等指标** Mutex 的数据结构包含两个字段:state 和 sema。前四个字节(int32)就是 state 字段。Mutex 结构中的 state 字段有很多个含义,通过 state 字段,可以知道锁是否已经被某个 goroutine 持有、当前是否处于饥饿状态、是否有等待的 goroutine 被唤醒、等待者的数量等信息。但是,state 这个字段并没有暴露出来,怎么获取未暴露的字段呢?很简单,我们可以通过 unsafe 的方式实现。 ```go const ( mutexLocked = 1 << iota mutexWoken mutexStarving mutexWaiterShift = iota ) type Mutex struct { sync.Mutex } func (m *Mutex) Count() int { v := atomic.LoadInt32((*int32)(unsafe.Pointer(&m.Mutex))) v = v >> mutexWaiterShift v = v + (v & mutexLocked) return int(v) } ``` 文章目录 **数据结构** **正常模式和饥饿模式** **方法** **Lock** **判断当前 Goroutine 能否进入自旋** **通过自旋等待互斥锁的释放** **易错场景** **重入** **方案一:goroutine id** **拓展** **TryLock** **获取等待者的数量等指标** 标签: golang, 并发编程 转载请注明文章来源 本作品采用 知识共享署名-相同方式共享 4.0 国际许可协议 进行许可。
评论已关闭