STW实现原理
STW(Stop The World)是Go Runtime内部的一个方法,它的作用如它的名字所言,停止世界,在stopTheWorld调用返回后,有且仅有调用该方法的goroutine可以运行。
用途
主要在GC(垃圾回收)和runtime.GOMAXPROCS()
中使用
当然你可以可以通过//go:linkname
的方式来使用,进行些hack操作
实现方式
Go调度模型
Go的调度模型分为G M P 三部分
这里简单说下,详细讲解Go调度模型的文章有很多,读者有兴趣可以学习下。
注:本文需要对Go调度器流程和实现有些基本的了解
G: Goroutine,使用go 关键字创建的协程,代表要执行的任务,在Go中,main函数也是一个goroutine。
M:Machine,对应操作系统线程,提供cpu执行资源,M的目的是获取一个P,并找到一个可运行的G去执行。
P:Processor,中间层,包含Goroutine执行所需的资源,比如说内存和待执行的Goroutine等,也可以用来控制程序并发度,M可以运行G的前提是获得到一个P。
分析
那么如果让你来实现Stop The World功能,应该对调度器中的哪个部分进行干预呢?
首先再进一步明确一下,Stop The World的目的,程序中所有goroutine(除去调用方)停止运行即可。
另外Stop The World还有一个前置依赖功能,抢占调度,需要依赖于这个功能来让Goroutine可以在一个合适的安全点停止运行。对于协作式抢占和信号抢占的调度方式其实不care。
- G:抢占所有运行的G,并且禁止新创建和唤醒的G运行,听起来可行,但是需要修改限制的地方较多,而且要关注的G的数量可能较多。
- M:抢占正在运行的G后,停止M,使其不再执行新的G,看起开可行,但是Go中还有跨语言调用,在跨C调用时,由于没有抢占点(信号抢占中对于这种情况也决定不进行抢占),所以没有办法抢占所有的M。
- P:抢占所有运行的P,对于idel闲置状态的P,保证其不再被M获取到,跨C调用的M是不持有P的(其实是可以认为不持有的,可能会保存之前的P,但是超时会被抢占,而且跨C调用结束回来时,也只是尝试获取之前的P)
源码解析
Go是选择了抢占P的方案,直接来看源码
// stopTheWorld stops all P's from executing goroutines, interrupting
// all goroutines at GC safe points and records reason as the reason
// for the stop. On return, only the current goroutine's P is running.
// stopTheWorld must not be called from a system stack and the caller
// must not hold worldsema. The caller must call startTheWorld when
// other P's should resume execution.
//
// stopTheWorld is safe for multiple goroutines to call at the
// same time. Each will execute its own stop, and the stops will
// be serialized.
//
// This is also used by routines that do stack dumps. If the system is
// in panic or being exited, this may not reliably stop all
// goroutines.
func stopTheWorld(reason string) {
semacquire(&worldsema)
gp := getg()
gp.m.preemptoff = reason
// 切换到g0系统栈上执行该函数
systemstack(func() {
// 先切换调用方goroutine到_Gwaiting状态
casgstatus(gp, _Grunning, _Gwaiting)
// stopTheWorld
stopTheWorldWithSema()
// 切换调用方goroutine到_Grunning状态,返回,此时 World 已经Stop
casgstatus(gp, _Gwaiting, _Grunning)
})
}
// stopTheWorldWithSema是stopTheWorld的核心实现.
// The caller is responsible for acquiring worldsema and disabling
// preemption first and then should stopTheWorldWithSema on the system
// stack:
//
// semacquire(&worldsema, 0)
// m.preemptoff = "reason"
// systemstack(stopTheWorldWithSema)
//
// When finished, the caller must either call startTheWorld or undo
// these three operations separately:
//
// m.preemptoff = ""
// systemstack(startTheWorldWithSema)
// semrelease(&worldsema)
//
// It is allowed to acquire worldsema once and then execute multiple
// startTheWorldWithSema/stopTheWorldWithSema pairs.
// Other P's are able to execute between successive calls to
// startTheWorldWithSema and stopTheWorldWithSema.
// Holding worldsema causes any other goroutines invoking
// stopTheWorld to block.
func stopTheWorldWithSema() {
_g_ := getg() // 获取当前g,这个已经变为g0
lock(&sched.lock)
sched.stopwait = gomaxprocs // P的总数量(需要停止的P的数量)
atomic.Store(&sched.gcwaiting, 1) // 设置sched.gcwaiting为1
preemptall() // 对所有处于运行状态的P执行抢占操作
// 停止当前的P
_g_.m.p.ptr().status = _Pgcstop // Pgcstop is only diagnostic.
sched.stopwait--
// 尝试抢占处于syscall状态的P,其实仅修改了P的状态
for _, p := range allp {
s := p.status
if s == _Psyscall && atomic.Cas(&p.status, s, _Pgcstop) {
if trace.enabled {
traceGoSysBlock(p)
traceProcStop(p)
}
p.syscalltick++
sched.stopwait-- // 如果状态修改成功,计数减一
}
}
// 处于闲置的P,由于闲置的P没有在运行,仅需从闲置列表中取出,避免其他M获取到即可,并设置对应状态,计数减一
for {
p := pidleget()
if p == nil {
break
}
p.status = _Pgcstop
sched.stopwait--
}
wait := sched.stopwait > 0 // 是有还有需要等待停止的P
unlock(&sched.lock)
// wait for remaining P's to stop voluntarily
// 等待剩余的P停止(上方preemptall已经尝试抢占所有的运行的P)
if wait {
for {
// wait for 100us, then try to re-preempt in case of any races
// 等待,底层为futex系统调用,信号量机制,超时时间为100us
// 如果是因为超时而唤醒,则返回false
// 如果是因为其他线程调用notetwakeup而唤醒,则返回true
if notetsleep(&sched.stopnote, 100*1000) {
// 如果因为notetwakeup而唤醒,清除该标志,并退出for循环
noteclear(&sched.stopnote)
break
}
preemptall()
}
}
// 正常情况下,这里应该没有需要等待被停止的P了,再检查一下
bad := ""
if sched.stopwait != 0 {
bad = "stopTheWorld: not stopped (stopwait != 0)"
} else {
for _, p := range allp {
if p.status != _Pgcstop {
bad = "stopTheWorld: not stopped (status != _Pgcstop)"
}
}
}
if atomic.Load(&freezing) != 0 {
// Some other thread is panicking. This can cause the
// sanity checks above to fail if the panic happens in
// the signal handler on a stopped thread. Either way,
// we should halt this thread.
lock(&deadlock)
lock(&deadlock)
}
if bad != "" {
throw(bad)
}
worldStopped()
}
如何停止其他P,或者说其他P是如何停止的
G处理抢占信号后会将自身状态切换为_Grunnable
,放入global run queue
,并重新调用schedule()
记住这个schedule()
// Tell all goroutines that they have been preempted and they should stop.
// This function is purely best-effort. It can fail to inform a goroutine if a
// processor just started running it.
// No locks need to be held.
// Returns true if preemption request was issued to at least one goroutine.
func preemptall() bool {
res := false
// 遍历所有的P
for _, _p_ := range allp {
// 只关心处于_Prunning状态的P
if _p_.status != _Prunning {
continue
}
// 对每个处于_Prunning状态的P调用 preemptone
if preemptone(_p_) {
res = true
}
}
return res
}
// 抢占入参指定的P,这里分为协作式抢占和信号抢占
// 协作式抢占是设置G的preempt和stackguard0字段值,等待函数调用时在函数头插入的more stack检查,停止运行的G,切换G的状态为_Grunnable,放入global run queue,并重新调用schedule
// 信号抢占,是向当前P锁绑定的线程 M 发送信号,使得线程触发信号处理函数,在满足安全点的前提下响应该抢占信号,停止运行的G
// 信号处理函数响应后,会在该G的调用栈中强行插入asyncPreempt,然后return
// 等到G继续运行时,会先运行asyncPreempt,进而执行asyncPreempt2->g0->gopreempt_m->goschedImpl->切换G的状态为_Grunnable,放入global run queue,并重新调用schedule
func preemptone(_p_ *p) bool {
mp := _p_.m.ptr()
if mp == nil || mp == getg().m {
return false
}
gp := mp.curg
if gp == nil || gp == mp.g0 {
return false
}
gp.preempt = true
// Every call in a goroutine checks for stack overflow by
// comparing the current stack pointer to gp->stackguard0.
// Setting gp->stackguard0 to StackPreempt folds
// preemption into the normal stack overflow check.
gp.stackguard0 = stackPreempt
// Request an async preemption of this P.
if preemptMSupported && debug.asyncpreemptoff == 0 {
_p_.preempt = true
preemptM(mp)
}
return true
}
谁会调用notewakeup
?
// Stops the current m for stopTheWorld.
// Returns when the world is restarted.
func gcstopm() {
_g_ := getg()
if sched.gcwaiting == 0 {
throw("gcstopm: not waiting for gc")
}
if _g_.m.spinning {
_g_.m.spinning = false
// OK to just drop nmspinning here,
// startTheWorld will unpark threads as necessary.
if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
throw("gcstopm: negative nmspinning")
}
}
_p_ := releasep()
lock(&sched.lock)
_p_.status = _Pgcstop
sched.stopwait--
if sched.stopwait == 0 {
// 当所有需要停止的 P 已经被停止时,调用 notewakeup 唤醒等待该信号量的G
notewakeup(&sched.stopnote)
}
unlock(&sched.lock)
stopm()
}
进一步 stopm
会做什么?
// Stops execution of the current m until new work is available.
// Returns with acquired P.
func stopm() {
_g_ := getg()
lock(&sched.lock)
mput(_g_.m) // 将当前 M 放入idel空闲列表
unlock(&sched.lock)
mPark() // 会在这里睡眠等待被唤醒
acquirep(_g_.m.nextp.ptr()) // 被唤醒后从_g_.m.nextp重新获得P
_g_.m.nextp = 0
}
func mPark() {
g := getg()
for {
notesleep(&g.m.park) // 调用futex进行睡眠,在g.m.park信号量上等待,被唤醒后返回
// Note, because of signal handling by this parked m,
// a preemptive mDoFixup() may actually occur via
// mDoFixupAndOSYield(). (See golang.org/issue/44193)
noteclear(&g.m.park)
if !mDoFixup() {
return
}
}
}
M如何被唤醒在番外篇中进行讲述
进一步 谁会调用gcstopm
?
schedule
的Top
和findrunnable
函数中
在调度循环中,会检查当前是否处于STW的过程(sched.gcwaiting != 0
),如果是的话,则会调用gcstopm
。
// One round of scheduler: find a runnable goroutine and execute it.
// Never returns.
func schedule() {
_g_ := getg()
if _g_.m.locks != 0 {
throw("schedule: holding locks")
}
if _g_.m.lockedg != 0 {
stoplockedm()
execute(_g_.m.lockedg.ptr(), false) // Never returns.
}
// We should not schedule away from a g that is executing a cgo call,
// since the cgo call is using the m's g0 stack.
if _g_.m.incgo {
throw("schedule: in cgo")
}
top:
pp := _g_.m.p.ptr()
pp.preempt = false
if sched.gcwaiting != 0 {
gcstopm()
goto top
}
if pp.runSafePointFn != 0 {
runSafePointFn()
}
// Sanity check: if we are spinning, the run queue should be empty.
// Check this before calling checkTimers, as that might call
// goready to put a ready goroutine on the local run queue.
if _g_.m.spinning && (pp.runnext != 0 || pp.runqhead != pp.runqtail) {
throw("schedule: spinning with local work")
}
checkTimers(pp, 0)
var gp *g
var inheritTime bool
// Normal goroutines will check for need to wakeP in ready,
// but GCworkers and tracereaders will not, so the check must
// be done here instead.
tryWakeP := false
if trace.enabled || trace.shutdown {
gp = traceReader()
if gp != nil {
casgstatus(gp, _Gwaiting, _Grunnable)
traceGoUnpark(gp, 0)
tryWakeP = true
}
}
if gp == nil && gcBlackenEnabled != 0 {
gp = gcController.findRunnableGCWorker(_g_.m.p.ptr())
if gp != nil {
tryWakeP = true
}
}
if gp == nil {
// Check the global runnable queue once in a while to ensure fairness.
// Otherwise two goroutines can completely occupy the local runqueue
// by constantly respawning each other.
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock)
gp = globrunqget(_g_.m.p.ptr(), 1)
unlock(&sched.lock)
}
}
if gp == nil {
gp, inheritTime = runqget(_g_.m.p.ptr())
// We can see gp != nil here even if the M is spinning,
// if checkTimers added a local goroutine via goready.
}
if gp == nil {
gp, inheritTime = findrunnable() // blocks until work is available
}
至此所有的信息都已经完美拼接上了。
番外篇
如何开启世界(Start The Workd)
注:这里只关注GOMAXPROCS没有发生变更的情况,GOMAXPROCS的实现原理后续再详细分析。
// startTheWorld undoes the effects of stopTheWorld.
func startTheWorld() {
systemstack(func() { startTheWorldWithSema(false) })
// worldsema must be held over startTheWorldWithSema to ensure
// gomaxprocs cannot change while worldsema is held.
//
// Release worldsema with direct handoff to the next waiter, but
// acquirem so that semrelease1 doesn't try to yield our time.
//
// Otherwise if e.g. ReadMemStats is being called in a loop,
// it might stomp on other attempts to stop the world, such as
// for starting or ending GC. The operation this blocks is
// so heavy-weight that we should just try to be as fair as
// possible here.
//
// We don't want to just allow us to get preempted between now
// and releasing the semaphore because then we keep everyone
// (including, for example, GCs) waiting longer.
mp := acquirem()
mp.preemptoff = ""
semrelease1(&worldsema, true, 0)
releasem(mp)
}
核心实现
先假设proc没有发生变更
func startTheWorldWithSema(emitTraceEvent bool) int64 {
assertWorldStopped()
mp := acquirem() // disable preemption because it can be holding p in a local var
if netpollinited() { // 检查netpoll中是否有已完成的任务
list := netpoll(0) // non-blocking
injectglist(&list)
}
lock(&sched.lock)
procs := gomaxprocs
p1 := procresize(procs) // 获取所有需要继续运行的P
sched.gcwaiting = 0 // 清空 sched.gcwaiting 标志位
if sched.sysmonwait != 0 { // 唤醒sysmon
sched.sysmonwait = 0
notewakeup(&sched.sysmonnote)
}
unlock(&sched.lock)
worldStarted()
// 遍历可运行的P链表
for p1 != nil {
p := p1
p1 = p1.link.ptr()
if p.m != 0 { // 如果有已绑定的M,直接唤醒该M
mp := p.m.ptr()
p.m = 0
if mp.nextp != 0 {
throw("startTheWorld: inconsistent mp->nextp")
}
mp.nextp.set(p) // 唤醒之前放置到m的nextp字段中
notewakeup(&mp.park) // 唤醒m,m在mPark()函数中等待
} else {
// 如果没有可用的M,创建一个新的M来绑定这个P
// Start M to run P. Do not start another M below.
newm(nil, p, -1)
}
}
// Capture start-the-world time before doing clean-up tasks.
startTime := nanotime()
if emitTraceEvent {
traceGCSTWDone()
}
// Wakeup an additional proc in case we have excessive runnable goroutines
// in local queues or in the global queue. If we don't, the proc will park itself.
// If we have lots of excessive work, resetspinning will unpark additional procs as necessary.
// 唤醒一个额外的P,以避免任务不均匀的分布在本地队列或全局队列
wakep()
releasem(mp)
return startTime
}
func procresize(nprocs int32) *p {
assertLockHeld(&sched.lock)
assertWorldStopped()
old := gomaxprocs
// update statistics
now := nanotime()
if sched.procresizetime != 0 {
sched.totaltime += int64(old) * (now - sched.procresizetime)
}
sched.procresizetime = now
maskWords := (nprocs + 31) / 32
_g_ := getg()
if _g_.m.p != 0 && _g_.m.p.ptr().id < nprocs {
// continue to use the current P
// 处理当前的P
_g_.m.p.ptr().status = _Prunning
_g_.m.p.ptr().mcache.prepareForSweep()
}
var runnablePs *p
// 处理剩余的P
for i := nprocs - 1; i >= 0; i-- {
p := allp[i]
if _g_.m.p.ptr() == p {
continue
}
// 设置P的状态 -> _Pidle
p.status = _Pidle
if runqempty(p) {
// 如果P的runque中没有待运行的任务,放置到idle队列中
pidleput(p)
} else {
// P需要继续运行,放入到runnablePs链表中
p.m.set(mget()) // 寻找一个可用的M与其绑定
p.link.set(runnablePs)
runnablePs = p
}
}
stealOrder.reset(uint32(nprocs))
var int32p *int32 = &gomaxprocs // make compiler check that gomaxprocs is an int32
atomic.Store((*uint32)(unsafe.Pointer(int32p)), uint32(nprocs))
// 返回待运行链表
return runnablePs
}
总结
Stop The World会先计算所有P的数量,然后尝试停止所有的P
P的状态分为三种情况
- idle:从idle列表移除,避免其他M获取P
- running:发出抢占信号
- syscsll:切换状态,避免syscall结束时再次获取到P
其中每停止成功一个P,会将需要等待的数量减一
当前Goroutine睡眠等待所有P结束
对于被抢占的P,被停止后也会重新计算需要等待的数量,当数量减为0后,唤醒睡眠等待的Goroutine
当睡眠的Goroutine被唤醒时,World已经完成Stop,函数返回。
另外这篇文章埋了三个坑,go:linkname
(第二次埋坑)、syscall
、runtime.GOMAXPROCS()
有机会来填。
本文由 LeonardWang 创作,采用 知识共享署名4.0
国际许可协议进行许可
本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名
最后编辑时间为: Jul 22,2021