上一节我们说完了 GPM 结构体,这一讲,我们来研究 Go sheduler 结构体,以及整个调度器的初始化过程。
Go scheduler 在源码中的结构体为 schedt,保存调度器的状态信息、全局的可运行 G 队列等。源码如下:
// 保存调度器的信息typeschedtstruct{// accessed atomically. keep at top to ensure alignment on 32-bit systems.// 需以原子访问访问。// 保持在 struct 顶部,以使其在 32 位系统上可以对齐 goidgen uint64 lastpoll uint64 lock mutex// 由空闲的工作线程组成的链表 midle muintptr// idle m's waiting for work// 空闲的工作线程数量 nmidle int32// number of idle m's waiting for work// 空闲的且被 lock 的 m 计数 nmidlelocked int32// number of locked m's waiting for work// 已经创建的工作线程数量 mcount int32// number of m's that have been created// 表示最多所能创建的工作线程数量 maxmcount int32// maximum number of m's allowed (or die)// goroutine 的数量,自动更新 ngsys uint32// number of system goroutines; updated atomically// 由空闲的 p 结构体对象组成的链表 pidle puintptr// idle p's// 空闲的 p 结构体对象的数量 npidle uint32 nmspinning uint32// See "Worker thread parking/unparking" comment in proc.go.// Global runnable queue.// 全局可运行的 G队列 runqhead guintptr// 队列头 runqtail guintptr// 队列尾 runqsize int32// 元素数量// Global cache of dead G's.// dead G 的全局缓存// 已退出的 goroutine 对象,缓存下来// 避免每次创建 goroutine 时都重新分配内存 gflock mutex gfreeStack *g gfreeNoStack *g// 空闲 g 的数量 ngfree int32// Central cache of sudog structs.// sudog 结构的集中缓存 sudoglock mutex sudogcache *sudog// Central pool of available defer structs of different sizes.// 不同大小的可用的 defer struct 的集中缓存池 deferlock mutex deferpool [5]*_defer gcwaiting uint32// gc is waiting to run stopwait int32 stopnote note sysmonwait uint32 sysmonnote note// safepointFn should be called on each P at the next GC// safepoint if p.runSafePointFn is set. safePointFn func(*p) safePointWait int32 safePointNote note profilehz int32// cpu profiling rate// 上次修改 gomaxprocs 的纳秒时间 procresizetime int64// nanotime() of last change to gomaxprocs totaltime int64// ∫gomaxprocs dt up to procresizetime}
// src/runtime/proc.go
// The bootstrap sequence is:
//
// call osinit
// call schedinit
// make & queue new G
// call runtime·mstart
//
// The new G calls runtime·main.
func schedinit() {
// getg 由编译器实现
// get_tls(CX)
// MOVQ g(CX), BX; BX存器里面现在放的是当前g结构体对象的地址
_g_ := getg()
if raceenabled {
_g_.racectx, raceprocctx0 = raceinit()
}
// 最多启动 10000 个工作线程
sched.maxmcount = 10000
tracebackinit()
moduledataverify()
// 初始化栈空间复用管理链表
stackinit()
mallocinit()
// 初始化 m0
mcommoninit(_g_.m)
alginit() // maps must not be used before this call
modulesinit() // provides activeModules
typelinksinit() // uses maps, activeModules
itabsinit() // uses activeModules
msigsave(_g_.m)
initSigmask = _g_.m.sigmask
goargs()
goenvs()
parsedebugvars()
gcinit()
sched.lastpoll = uint64(nanotime())
// 初始化 P 的个数
// 系统中有多少核,就创建和初始化多少个 p 结构体对象
procs := ncpu
if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
procs = n
}
if procs > _MaxGomaxprocs {
procs = _MaxGomaxprocs
}
// 初始化所有的 P,正常情况下不会返回有本地任务的 P
if procresize(procs) != nil {
throw("unknown runnable goroutine during bootstrap")
}
// ……………………
}
// getg returns the pointer to the current g.
// The compiler rewrites calls to this function into instructions
// that fetch the g directly (from TLS or from the dedicated register).
func getg() *g
// 初始化 m
func mcommoninit(mp *m) {
// 初始化过程中_g_ = g0
_g_ := getg()
// g0 stack won't make sense for user (and is not necessary unwindable).
if _g_ != _g_.m.g0 {
callers(1, mp.createstack[:])
}
// random 初始化
mp.fastrand = 0x49f6428a + uint32(mp.id) + uint32(cputicks())
if mp.fastrand == 0 {
mp.fastrand = 0x49f6428a
}
lock(&sched.lock)
// 设置 m 的 id
mp.id = sched.mcount
sched.mcount++
// 检查已创建系统线程是否超过了数量限制(10000)
checkmcount()
// ………………省略了初始化 gsignal
// Add to allm so garbage collector doesn't free g->m
// when it is just in a register or thread-local storage.
mp.alllink = allm
atomicstorep(unsafe.Pointer(&allm), unsafe.Pointer(mp))
unlock(&sched.lock)
// ………………
}
procs := ncpu
if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
procs = n
}
if procs > _MaxGomaxprocs {
procs = _MaxGomaxprocs
}
// src/runtime/proc.go
func procresize(nprocs int32) *p {
old := gomaxprocs
if old < 0 || old > _MaxGomaxprocs || nprocs <= 0 || nprocs > _MaxGomaxprocs {
throw("procresize: invalid arg")
}
// ……………………
// update statistics
// 更新数据
now := nanotime()
if sched.procresizetime != 0 {
sched.totaltime += int64(old) * (now - sched.procresizetime)
}
sched.procresizetime = now
// 初始化所有的 P
for i := int32(0); i < nprocs; i++ {
pp := allp[i]
if pp == nil {
// 申请新对象
pp = new(p)
pp.id = i
// pp 的初始状态为 stop
pp.status = _Pgcstop
pp.sudogcache = pp.sudogbuf[:0]
for i := range pp.deferpool {
pp.deferpool[i] = pp.deferpoolbuf[i][:0]
}
// 将 pp 存放到 allp 处
atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))
}
// ……………………
}
// 释放多余的 P。由于减少了旧的 procs 的数量,因此需要释放
// ……………………
// 获取当前正在运行的 g 指针,初始化时 _g_ = g0
_g_ := getg()
if _g_.m.p != 0 && _g_.m.p.ptr().id < nprocs {
// continue to use the current P
// 继续使用当前 P
_g_.m.p.ptr().status = _Prunning
} else {
// 初始化时执行这个分支
// ……………………
_g_.m.p = 0
_g_.m.mcache = nil
// 取出第 0 号 p
p := allp[0]
p.m = 0
p.status = _Pidle
// 将 p0 和 m0 关联起来
acquirep(p)
if trace.enabled {
traceGoStart()
}
}
var runnablePs *p
// 下面这个 for 循环把所有空闲的 p 放入空闲链表
for i := nprocs - 1; i >= 0; i-- {
p := allp[i]
// allp[0] 跟 m0 关联了,不会进行之后的“放入空闲链表”
if _g_.m.p.ptr() == p {
continue
}
// 状态转为 idle
p.status = _Pidle
// p 的 LRQ 里没有 G
if runqempty(p) {
// 放入全局空闲链表
pidleput(p)
} else {
p.m.set(mget())
p.link.set(runnablePs)
runnablePs = p
}
}
stealOrder.reset(uint32(nprocs))
var int32p *int32 = &gomaxprocs // make compiler check that gomaxprocs is an int32
atomic.Store((*uint32)(unsafe.Pointer(int32p)), uint32(nprocs))
// 返回有本地任务的 P 链表
return runnablePs
}
func acquirep(_p_ *p) {
// Do the part that isn't allowed to have write barriers.
acquirep1(_p_)
// have p; write barriers now allowed
_g_ := getg()
_g_.m.mcache = _p_.mcache
// ……………………
}