// int32 clone(int32 flags, void *stk, M *mp, G *gp, void (*fn)(void));TEXT runtime·clone(SB),NOSPLIT,$0// 准备系统调用的参数 MOVL flags+0(FP), DI MOVQ stk+8(FP), SI MOVQ $0, DX MOVQ $0, R10// 将 mp,gp,fn 拷贝到寄存器,对子线程可见 MOVQ mp+16(FP), R8 MOVQ gp+24(FP), R9 MOVQ fn+32(FP), R12// 系统调用 clone MOVL $56, AX SYSCALL// In parent, return. CMPQ AX, $0 JEQ 3(PC)// 父线程,返回 MOVL AX, ret+40(FP) RET// In child, on new stack.// 在子线程中。设置 CPU 栈顶寄存器指向子线程的栈顶 MOVQ SI, SP// If g or m are nil, skip Go-related setup. CMPQ R8, $0// m JEQ nog CMPQ R9, $0// g JEQ nog// Initialize m->procid to Linux tid// 通过 gettid 系统调用获取线程 ID(tid) MOVL $186, AX // gettid SYSCALL// 设置 m.procid = tid MOVQ AX, m_procid(R8)// Set FS to point at m->tls.// 新线程刚刚创建出来,还未设置线程本地存储,即 m 结构体对象还未与工作线程关联起来,// 下面的指令负责设置新线程的 TLS,把 m 对象和工作线程关联起来 LEAQ m_tls(R8), DI CALL runtime·settls(SB)// In child, set up new stackget_tls(CX) MOVQ R8, g_m(R9) // g.m = m MOVQ R9, g(CX) // tls.g = &m.g0 CALL runtime·stackcheck(SB)nog:// Call fn// 调用 mstart 函数。永不返回 CALL R12// It shouldn't return. If it does, exit that thread. MOVL $111, DI MOVL $60, AX SYSCALL JMP -3(PC) // keep exiting
funcretake(now int64) uint32 { n :=0// 遍历所有的 pfor i :=int32(0); i < gomaxprocs; i++ { _p_ := allp[i]if _p_ ==nil {continue }// 用于 sysmon 线程记录被监控 p 的系统调用时间和运行时间 pd :=&_p_.sysmontick// p 的状态 s := _p_.statusif s == _Psyscall {// P 处于系统调用之中,需要检查是否需要抢占// Retake P from syscall if it's there for more than 1 sysmon tick (at least 20us).// _p_.syscalltick 用于记录系统调用的次数,在完成系统调用之后加 1 t :=int64(_p_.syscalltick)ifint64(pd.syscalltick) != t {// pd.syscalltick != _p_.syscalltick,说明已经不是上次观察到的系统调用了,// 而是另外一次系统调用,所以需要重新记录 tick 和 when 值 pd.syscalltick =uint32(t) pd.syscallwhen = nowcontinue }// 只要满足下面三个条件中的任意一个,则抢占该 p,否则不抢占// 1. p 的运行队列里面有等待运行的 goroutine// 2. 没有无所事事的 p// 3. 从上一次监控线程观察到 p 对应的 m 处于系统调用之中到现在已经超过 10 毫秒ifrunqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) >0&& pd.syscallwhen+10*1000*1000> now {continue }incidlelocked(-1)if atomic.Cas(&_p_.status, s, _Pidle) {// …………………… n++ _p_.syscalltick++// 寻找一新的 m 接管 phandoffp(_p_) }incidlelocked(1) } elseif s == _Prunning {// P 处于运行状态,检查是否运行得太久了// Preempt G if it's running for too long.// 每发生一次调度,调度器 ++ 该值 t :=int64(_p_.schedtick)ifint64(pd.schedtick) != t { pd.schedtick =uint32(t) pd.schedwhen = nowcontinue }//pd.schedtick == t 说明(pd.schedwhen ~ now)这段时间未发生过调度// 这段时间是同一个goroutine一直在运行,检查是否连续运行超过了 10 毫秒if pd.schedwhen+forcePreemptNS > now {continue }// 连续运行超过 10 毫秒了,发起抢占请求preemptone(_p_) } }returnuint32(n)}
从代码来看,主要会对处于 _Psyscall 和 _Prunning 状态的 p 进行抢占。
抢占进行系统调用的 P
当 P 处于 _Psyscall 状态时,表明对应的 goroutine 正在进行系统调用。如果抢占 p,需要满足几个条件:
p 的本地运行队列里面有等待运行的 goroutine。这时 p 绑定的 g 正在进行系统调用,无法去执行其他的 g,因此需要接管 p 来执行其他的 g。
当 p 的本地运行队列或全局运行队列里面有待运行的 goroutine,说明还有很多工作要做,调用 startm(_p_, false) 启动一个 m 来结合 p,继续工作。
当除了当前的 p 外,其他所有的 p 都在运行 goroutine,说明天下太平,每个人都有自己的事做,唯独自己没有。为了全局更快地完成工作,需要启动一个 m,且要使得 m 处于自旋状态,和 p 结合之后,尽快找到工作。
最后,如果实在没有工作要处理,就将 p 放入全局空闲队列里。
我们接着来看 startm 函数都做了些什么:
// runtime/proc.go// // 调用 m 来绑定 p,如果没有 m,那就新建一个// 如果 p 为空,那就尝试获取一个处于空闲状态的 p,如果找到 p,那就什么都不做funcstartm(_p_ *p, spinning bool) {lock(&sched.lock)if _p_ ==nil {// 没有指定 p 则需要从全局空闲队列中获取一个 p _p_ =pidleget()if _p_ ==nil {unlock(&sched.lock)if spinning {// 如果找到 p,放弃。还原全局处于自旋状态的 m 的数量ifint32(atomic.Xadd(&sched.nmspinning, -1)) <0 {throw("startm: negative nmspinning") } }// 没有空闲的 p,直接返回return } }// 从 m 空闲队列中获取正处于睡眠之中的工作线程,// 所有处于睡眠状态的 m 都在此队列中 mp :=mget()unlock(&sched.lock)if mp ==nil {// 如果没有找到 mvar fn func()if spinning {// The caller incremented nmspinning, so set m.spinning in the new M. fn = mspinning }// 创建新的工作线程newm(fn, _p_)return }if mp.spinning {throw("startm: m is spinning") }if mp.nextp !=0 {throw("startm: m has p") }if spinning &&!runqempty(_p_) {throw("startm: p has runnable gs") }// The caller incremented nmspinning, so set m.spinning in the new M. mp.spinning = spinning// 设置 m 马上要结合的 p mp.nextp.set(_p_)// 唤醒 mnotewakeup(&mp.park)}
首先处理 p 为空的情况,直接从全局空闲 p 队列里找,如果没找到,则直接返回。如果设置了 spinning 为 true 的话,还需要还原全局的处于自旋状态的 m 的数值:&sched.nmspinning 。
funcgoschedImpl(gp *g) { status :=readgstatus(gp)if status&^_Gscan != _Grunning {dumpgstatus(gp)throw("bad g status") }// 更改 gp 的状态casgstatus(gp, _Grunning, _Grunnable)// 解除 m 和 g 的关系dropg()lock(&sched.lock)// 将 gp 放入全局可运行队列globrunqput(gp)unlock(&sched.lock)// 进入新一轮的调度循环schedule()}
将 gp 的状态改为 _Grunnable,放入全局可运行队列,等待下次有 m 来全局队列找工作时才能继续运行,毕竟你已经运行这么长时间了,给别人一点机会嘛。