// newg.sched.pc 表示当 newg 被调度起来运行时从这个地址开始执行指令
newg.sched.pc = funcPC(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
设置 g 字段为 newg 的地址。插一句,sched 是 g 结构体的一个字段,它本身也是一个结构体,保存调度信息。复习一下:
type gobuf struct {
// 存储 rsp 寄存器的值
sp uintptr
// 存储 rip 寄存器的值
pc uintptr
// 指向 goroutine
g guintptr
ctxt unsafe.Pointer // this has to be a pointer so that gc scans it
// 保存系统调用的返回值
ret sys.Uintreg
lr uintptr
bp uintptr // for GOEXPERIMENT=framepointer
}
接下来的这个函数非常重要,可以解释之前为什么要那样设置 pc 字段的值。调用 gostartcallfn:
每个 P 每次会批量(16个)申请 id,每次调用 newproc 函数,新创建一个 goroutine,id 加 1。因此 g0 的 id 是 0,而 main goroutine 的 id 就是 1。
newg 的状态变成可执行后(Runnable),就可以将它加入到 P 的本地运行队列里,等待调度。所以,goroutine 何时被执行,用户代码决定不了。来看源码:
// 将 G 放入 _p_ 的本地待运行队列
runqput(_p_, newg, true)
// runqput 尝试将 g 放到本地可执行队列里。
// 如果 next 为假,runqput 将 g 添加到可运行队列的尾部
// 如果 next 为真,runqput 将 g 添加到 p.runnext 字段
// 如果 run queue 满了,runnext 将 g 放到全局队列里
//
// runnext 成员中的 goroutine 会被优先调度起来运行
func runqput(_p_ *p, gp *g, next bool) {
// ……………………
if next {
retryNext:
oldnext := _p_.runnext
if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
// 有其它线程在操作 runnext 成员,需要重试
goto retryNext
}
// 老的 runnext 为 nil,不用管了
if oldnext == 0 {
return
}
// 把之前的 runnext 踢到正常的 runq 中
// 原本存放在 runnext 的 gp 放入 runq 的尾部
gp = oldnext.ptr()
}
retry:
h := atomic.Load(&_p_.runqhead) // load-acquire, synchronize with consumers
t := _p_.runqtail
// 如果 P 的本地队列没有满,入队
if t-h < uint32(len(_p_.runq)) {
_p_.runq[t%uint32(len(_p_.runq))].set(gp)
// 原子写入
atomic.Store(&_p_.runqtail, t+1) // store-release, makes the item available for consumption
return
}
// 可运行队列已经满了,放入全局队列了
if runqputslow(_p_, gp, h, t) {
return
}
// the queue is not full, now the put above must succeed
// 没有成功放入全局队列,说明本地队列没满,重试一下
goto retry
}
runqput 函数的主要作用就是将新创建的 goroutine 加入到 P 的可运行队列,如果本地队列满了,则加入到全局可运行队列。前两个参数都好理解,最后一个参数 next 的作用是,当它为 true 时,会将 newg 加入到 P 的 runnext 字段,具有最高优先级,将先于普通队列中的 goroutine 得到执行。
先将 P 老的 runnext 成员取出,接着用一个原子操作 cas 来试图将 runnext 成员设置成 newg,目的是防止其他线程在同时修改 runnext 字段。
// 将 g 和 _p_ 本地队列的一半 goroutine 放入全局队列。
// 因为要获取锁,所以会慢
func runqputslow(_p_ *p, gp *g, h, t uint32) bool {
var batch [len(_p_.runq)/2 + 1]*g
// First, grab a batch from local queue.
n := t - h
n = n / 2
if n != uint32(len(_p_.runq)/2) {
throw("runqputslow: queue is not full")
}
for i := uint32(0); i < n; i++ {
batch[i] = _p_.runq[(h+i)%uint32(len(_p_.runq))].ptr()
}
// 如果 cas 操作失败,说明本地队列不满了,直接返回
if !atomic.Cas(&_p_.runqhead, h, h+n) { // cas-release, commits consume
return false
}
batch[n] = gp
// …………………………
// Link the goroutines.
// 全局运行队列是一个链表,这里首先把所有需要放入全局运行队列的 g 链接起来,
// 减小锁粒度,从而降低锁冲突,提升性能
for i := uint32(0); i < n; i++ {
batch[i].schedlink.set(batch[i+1])
}
// Now put the batch on global queue.
lock(&sched.lock)
globrunqputbatch(batch[0], batch[n], int32(n+1))
unlock(&sched.lock)
return true
}
// Put a batch of runnable goroutines on the global runnable queue.
// Sched must be locked.
func globrunqputbatch(ghead *g, gtail *g, n int32) {
gtail.schedlink = 0
if sched.runqtail != 0 {
sched.runqtail.ptr().schedlink.set(ghead)
} else {
sched.runqhead.set(ghead)
}
sched.runqtail.set(gtail)
sched.runqsize += n
}