工作线程 M 费尽心机也要找到一个可运行的 goroutine,这是它的工作和职责,不达目的,绝不罢体,这种锲而不舍的精神值得每个人学习。
共经历三个过程:先从本地队列找,定期会从全局队列找,最后实在没办法,就去别的 P 偷。如下图所示:
M 找工作的过程
先看第一个:从 P 本地队列找。源码如下:
// 从本地可运行队列里找到一个 g
// 如果 inheritTime 为真,gp 应该继承这个时间片,否则,新开启一个时间片
func runqget(_p_ *p) (gp *g, inheritTime bool) {
// If there's a runnext, it's the next G to run.
// 如果 runnext 不为空,则 runnext 是下一个待运行的 G
for {
next := _p_.runnext
if next == 0 {
// 为空,则直接跳出循环
break
}
// 再次比较 next 是否没有变化
if _p_.runnext.cas(next, 0) {
// 如果没有变化,则返回 next 所指向的 g。且需要继承时间片
return next.ptr(), true
}
}
for {
// 获取队列头
h := atomic.Load(&_p_.runqhead) // load-acquire, synchronize with other consumers
// 获取队列尾
t := _p_.runqtail
if t == h {
// 头和尾相等,说明本地队列为空,找不到 g
return nil, false
}
// 获取队列头的 g
gp := _p_.runq[h%uint32(len(_p_.runq))].ptr()
// 原子操作,防止这中间被其他线程因为偷工作而修改
if atomic.Cas(&_p_.runqhead, h, h+1) { // cas-release, commits consume
return gp, false
}
}
}
整个源码结构比较简单,主要是两个 for 循环。
第一个 for 循环尝试返回 P 的 runnext 成员,因为 runnext 具有最高的运行优先级,因此要首先尝试获取 runnext。当发现 runnext 为空时,直接跳出循环,进入第二个。否则,用原子操作获取 runnext,并将其值修改为 0,也就是空。这里用到原子操作的原因是防止在这个过程中,有其他线程过来“偷工作”,导致并发修改 runnext 成员。
第二个 for 循环则是在尝试获取 runnext 成员失败后,尝试从本地队列中返回队列头的 goroutine。同样,先用原子操作获取队列头,使用原子操作的原因同样是防止其他线程“偷工作”时并发对队列头的并发写操作。之后,直接获取队列尾,因为不担心其他线程同时更改,所以直接获取。注意,“偷工作”时只会修改队列头。
比较队列头和队列尾,如果两者相等,说明 P 本地队列没有可运行的 goroutine,直接返回空。否则,算出队列头指向的 goroutine,再用一个 CAS 原子操作来尝试修改队列头,使用原子操作的原因同上。
// 从本地运行队列和全局运行队列都没有找到需要运行的 goroutine,
// 调用 findrunnable 函数从其它工作线程的运行队列中偷取,如果偷不到,则当前工作线程进入睡眠
// 直到获取到 runnable goroutine 之后 findrunnable 函数才会返回。
if gp == nil {
gp, inheritTime = findrunnable() // blocks until work is available
}
这是整个找工作过程最复杂的部分:
// 从其他地方找 goroutine 来执行
func findrunnable() (gp *g, inheritTime bool) {
_g_ := getg()
top:
_p_ := _g_.m.p.ptr()
// ……………………
// local runq
// 从本地队列获取
if gp, inheritTime := runqget(_p_); gp != nil {
return gp, inheritTime
}
// global runq
// 从全局队列获取
if sched.runqsize != 0 {
lock(&sched.lock)
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
if gp != nil {
return gp, false
}
}
// ……………………
// Steal work from other P's.
// 如果其他的 P 都处于空闲状态,那肯定没有其他工作要做
procs := uint32(gomaxprocs)
if atomic.Load(&sched.npidle) == procs-1 {
goto stop
}
// 如果有很多工作线程在找工作,那我就停下休息。避免消耗太多 CPU
if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) {
goto stop
}
if !_g_.m.spinning {
// 设置自旋状态为 true
_g_.m.spinning = true
// 自旋状态数加 1
atomic.Xadd(&sched.nmspinning, 1)
}
// 从其它 p 的本地运行队列盗取 goroutine
for i := 0; i < 4; i++ {
for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
// ……………………
stealRunNextG := i > 2 // first look for ready queues with more than 1 g
if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil {
return gp, false
}
}
}
stop:
// ……………………
// return P and block
lock(&sched.lock)
if sched.gcwaiting != 0 || _p_.runSafePointFn != 0 {
unlock(&sched.lock)
goto top
}
if sched.runqsize != 0 {
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
return gp, false
}
// 当前工作线程解除与 p 之间的绑定,准备去休眠
if releasep() != _p_ {
throw("findrunnable: wrong p")
}
// 把 p 放入空闲队列
pidleput(_p_)
unlock(&sched.lock)
wasSpinning := _g_.m.spinning
if _g_.m.spinning {
// m 即将睡眠,不再处于自旋
_g_.m.spinning = false
if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
throw("findrunnable: negative nmspinning")
}
}
// check all runqueues once again
// 休眠之前再检查一下所有的 p,看一下是否有工作要做
for i := 0; i < int(gomaxprocs); i++ {
_p_ := allp[i]
if _p_ != nil && !runqempty(_p_) {
lock(&sched.lock)
_p_ = pidleget()
unlock(&sched.lock)
if _p_ != nil {
acquirep(_p_)
if wasSpinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}
goto top
}
break
}
}
// ……………………
// 休眠
stopm()
goto top
}
这部分也是最能说明 M 找工作的锲而不舍精神:尽力去各个运行队列中寻找 goroutine,如果实在找不到则进入睡眠状态,等待有工作时,被其他 M 唤醒。
接着往下看,返回的 n 表示偷到的工作数量。先将 n 自减 1,目的是把第 n 个工作(也就是 g)直接返回,如果这时候 n 变成 0 了,说明就只偷到了一个 g,那就直接返回。否则,将队尾往后移动 n,把偷来的工作合法化,简直完美!
我们接着往下看 runqgrab 函数的实现:
// 从 _p_ 批量获取可运行 goroutine,放到 batch 数组里
// batch 是一个环,起始于 batchHead
// 返回偷的数量,返回的 goroutine 可被任何 P 执行
func runqgrab(_p_ *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 {
for {
// 队列头
h := atomic.Load(&_p_.runqhead) // load-acquire, synchronize with other consumers
// 队列尾
t := atomic.Load(&_p_.runqtail) // load-acquire, synchronize with the producer
// g 的数量
n := t - h
// 取一半
n = n - n/2
if n == 0 {
if stealRunNextG {
// 连 runnext 都要偷,没有人性
// Try to steal from _p_.runnext.
if next := _p_.runnext; next != 0 {
// 这里是为了防止 _p_ 执行当前 g,并且马上就要阻塞,所以会马上执行 runnext,
// 这个时候偷就没必要了,因为让 g 在 P 之间"游走"不太划算,
// 就不偷了,给他们一个机会。
// channel 一次同步的的接收发送需要 50ns 左右,因此 3us 差不多给了他们 50 次机会了,做得还是不错的
if GOOS != "windows" {
usleep(3)
} else {
osyield()
}
if !_p_.runnext.cas(next, 0) {
continue
}
// 真的偷走了 next
batch[batchHead%uint32(len(batch))] = next
// 返回偷的数量,只有 1 个
return 1
}
}
// 没偷到
return 0
}
// 如果 n 这时变得太大了,重新来一遍了,不能偷的太多,做得太过分了
if n > uint32(len(_p_.runq)/2) { // read inconsistent h and t
continue
}
// 将 g 放置到 bacth 中
for i := uint32(0); i < n; i++ {
g := _p_.runq[(h+i)%uint32(len(_p_.runq))]
batch[(batchHead+i)%uint32(len(batch))] = g
}
// 工作被偷走了,更新一下队列头指针
if atomic.Cas(&_p_.runqhead, h, h+n) { // cas-release, commits consume
return n
}
}
}
外层直接就是一个无限循环,先用原子操作取出 p 的队列头和队列尾,算出一半的 g 的数量,如果 n == 0,说明地主家也没有余粮,这时看 stealRunNextG 的值。如果为假,说明不偷 runnext,那就直接返回 0,啥也没偷到;如果为真,则要尝试偷一下 runnext。
先判断 runnext 不为空,那就真的准备偷了。不过在这之前,要先休眠 3 us。这是为了防止 p 正在执行当前的 g,马上就要阻塞(可能是向一个非缓冲的 channel 发送数据,没有接收者),之后会马上执行 runnext。这个时候偷就没必要了,因为 runnext 马上就要执行了,偷走它还不是要去执行,那何必要偷呢?大家的愿望就是提高效率,这样让 g 在 P 之间"游走"不太划算,索性先不偷了,给他们一个机会。channel 一次同步的的接收或发送需要 50ns 左右,因此休眠 3us 差不多给了他们 50 次机会了,做得还是挺厚道的。
继续看,再次判断 n 是否小于等于 p.runq 长度的一半,因为这个时候很可能 p 也被其他线程偷了,它的 p.runq 就没那么多工作了,这个时候就不能偷这么多了,要重新再走一次循环。
最后一个 for 循环,将 p.runq 里的 g 放到 batch 数组里。使用原子操作更新 p 的队列头指针,往后移动 n 个位置,这些都是被偷走的,伤心!
比较简单,获取链表最后一个,再更新 sched.pidle,使其指向前一个 P。调用 acquirep(_p_) 绑定获取到的 p 和 m,主要的动作就是设置 p 的 m 字段,更改 p 的工作状态为 _Prunning,并且设置 m 的 p 字段。做完这些之后,再次进入 top 代码段,再走一遍之前找工作的过程。