// newg.sched.pc 表示当 newg 被调度起来运行时从这个地址开始执行指令newg.sched.pc =funcPC(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
设置 g 字段为 newg 的地址。插一句,sched 是 g 结构体的一个字段,它本身也是一个结构体,保存调度信息。复习一下:
typegobufstruct {// 存储 rsp 寄存器的值 sp uintptr// 存储 rip 寄存器的值 pc uintptr// 指向 goroutine g guintptr ctxt unsafe.Pointer// this has to be a pointer so that gc scans it// 保存系统调用的返回值 ret sys.Uintreg lr uintptr bp uintptr// for GOEXPERIMENT=framepointer}
接下来的这个函数非常重要,可以解释之前为什么要那样设置 pc 字段的值。调用 gostartcallfn:
上图中,newg 新增了 sched.pc 指向 runtime.main 函数,当它被调度起来执行时,就从这里开始;新增了 sched.sp 指向了 newg 栈顶位置,同时,newg 栈顶位置的内容是一个跳转地址,指向 runtime.goexit 的第二条指令,当 goroutine 退出时,这条地址会载入 CPU 的 PC 寄存器,跳转到这里执行“扫尾”工作。
之后,将 newg 的状态改为 runnable,设置 goroutine 的 id:
// 设置 g 的状态为 _Grunnable,可以运行了casgstatus(newg, _Gdead, _Grunnable)newg.goid =int64(_p_.goidcache)
每个 P 每次会批量(16个)申请 id,每次调用 newproc 函数,新创建一个 goroutine,id 加 1。因此 g0 的 id 是 0,而 main goroutine 的 id 就是 1。
newg 的状态变成可执行后(Runnable),就可以将它加入到 P 的本地运行队列里,等待调度。所以,goroutine 何时被执行,用户代码决定不了。来看源码:
// 将 G 放入 _p_ 的本地待运行队列runqput(_p_, newg, true)// runqput 尝试将 g 放到本地可执行队列里。// 如果 next 为假,runqput 将 g 添加到可运行队列的尾部// 如果 next 为真,runqput 将 g 添加到 p.runnext 字段// 如果 run queue 满了,runnext 将 g 放到全局队列里//// runnext 成员中的 goroutine 会被优先调度起来运行funcrunqput(_p_ *p, gp *g, next bool) {// ……………………if next { retryNext: oldnext := _p_.runnextif!_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {// 有其它线程在操作 runnext 成员,需要重试goto retryNext }// 老的 runnext 为 nil,不用管了if oldnext ==0 {return }// 把之前的 runnext 踢到正常的 runq 中// 原本存放在 runnext 的 gp 放入 runq 的尾部 gp = oldnext.ptr() }retry: h := atomic.Load(&_p_.runqhead) // load-acquire, synchronize with consumers t := _p_.runqtail// 如果 P 的本地队列没有满,入队if t-h <uint32(len(_p_.runq)) { _p_.runq[t%uint32(len(_p_.runq))].set(gp)// 原子写入 atomic.Store(&_p_.runqtail, t+1) // store-release, makes the item available for consumptionreturn }// 可运行队列已经满了,放入全局队列了ifrunqputslow(_p_, gp, h, t) {return }// the queue is not full, now the put above must succeed// 没有成功放入全局队列,说明本地队列没满,重试一下goto retry}
runqput 函数的主要作用就是将新创建的 goroutine 加入到 P 的可运行队列,如果本地队列满了,则加入到全局可运行队列。前两个参数都好理解,最后一个参数 next 的作用是,当它为 true 时,会将 newg 加入到 P 的 runnext 字段,具有最高优先级,将先于普通队列中的 goroutine 得到执行。
先将 P 老的 runnext 成员取出,接着用一个原子操作 cas 来试图将 runnext 成员设置成 newg,目的是防止其他线程在同时修改 runnext 字段。
// 将 g 和 _p_ 本地队列的一半 goroutine 放入全局队列。// 因为要获取锁,所以会慢funcrunqputslow(_p_ *p, gp *g, h, t uint32) bool {var batch [len(_p_.runq)/2+1]*g// First, grab a batch from local queue. n := t - h n = n /2if n !=uint32(len(_p_.runq)/2) {throw("runqputslow: queue is not full") }for i :=uint32(0); i < n; i++ { batch[i] = _p_.runq[(h+i)%uint32(len(_p_.runq))].ptr() }// 如果 cas 操作失败,说明本地队列不满了,直接返回if!atomic.Cas(&_p_.runqhead, h, h+n) { // cas-release, commits consumereturnfalse } batch[n] = gp// …………………………// Link the goroutines.// 全局运行队列是一个链表,这里首先把所有需要放入全局运行队列的 g 链接起来,// 减小锁粒度,从而降低锁冲突,提升性能for i :=uint32(0); i < n; i++ { batch[i].schedlink.set(batch[i+1]) }// Now put the batch on global queue.lock(&sched.lock)globrunqputbatch(batch[0], batch[n], int32(n+1))unlock(&sched.lock)returntrue}
// Put a batch of runnable goroutines on the global runnable queue.// Sched must be locked.funcglobrunqputbatch(ghead *g, gtail *g, n int32) { gtail.schedlink =0if sched.runqtail !=0 { sched.runqtail.ptr().schedlink.set(ghead) } else { sched.runqhead.set(ghead) } sched.runqtail.set(gtail) sched.runqsize += n}