STW实现原理
in Go with 0 comment

STW实现原理

in Go with 0 comment

STW实现原理

STW(Stop The World)是Go Runtime内部的一个方法,它的作用如它的名字所言,停止世界,在stopTheWorld调用返回后,有且仅有调用该方法的goroutine可以运行。

用途

主要在GC(垃圾回收)和runtime.GOMAXPROCS()中使用

当然你可以可以通过//go:linkname 的方式来使用,进行些hack操作

实现方式

Go调度模型

Go的调度模型分为G M P 三部分

这里简单说下,详细讲解Go调度模型的文章有很多,读者有兴趣可以学习下。

注:本文需要对Go调度器流程和实现有些基本的了解

G: Goroutine,使用go 关键字创建的协程,代表要执行的任务,在Go中,main函数也是一个goroutine。

M:Machine,对应操作系统线程,提供cpu执行资源,M的目的是获取一个P,并找到一个可运行的G去执行。

P:Processor,中间层,包含Goroutine执行所需的资源,比如说内存和待执行的Goroutine等,也可以用来控制程序并发度,M可以运行G的前提是获得到一个P

分析

那么如果让你来实现Stop The World功能,应该对调度器中的哪个部分进行干预呢?

首先再进一步明确一下,Stop The World的目的,程序中所有goroutine(除去调用方)停止运行即可。

另外Stop The World还有一个前置依赖功能,抢占调度,需要依赖于这个功能来让Goroutine可以在一个合适的安全点停止运行。对于协作式抢占和信号抢占的调度方式其实不care。

源码解析

Go是选择了抢占P的方案,直接来看源码

// stopTheWorld stops all P's from executing goroutines, interrupting
// all goroutines at GC safe points and records reason as the reason
// for the stop. On return, only the current goroutine's P is running.
// stopTheWorld must not be called from a system stack and the caller
// must not hold worldsema. The caller must call startTheWorld when
// other P's should resume execution.
//
// stopTheWorld is safe for multiple goroutines to call at the
// same time. Each will execute its own stop, and the stops will
// be serialized.
//
// This is also used by routines that do stack dumps. If the system is
// in panic or being exited, this may not reliably stop all
// goroutines.
func stopTheWorld(reason string) {
	semacquire(&worldsema)
	gp := getg()
	gp.m.preemptoff = reason
	// 切换到g0系统栈上执行该函数
	systemstack(func() {
		// 先切换调用方goroutine到_Gwaiting状态
		casgstatus(gp, _Grunning, _Gwaiting)
		// stopTheWorld
		stopTheWorldWithSema()
		// 切换调用方goroutine到_Grunning状态,返回,此时 World 已经Stop
		casgstatus(gp, _Gwaiting, _Grunning)
	})
}
// stopTheWorldWithSema是stopTheWorld的核心实现.
// The caller is responsible for acquiring worldsema and disabling
// preemption first and then should stopTheWorldWithSema on the system
// stack:
//
//	semacquire(&worldsema, 0)
//	m.preemptoff = "reason"
//	systemstack(stopTheWorldWithSema)
//
// When finished, the caller must either call startTheWorld or undo
// these three operations separately:
//
//	m.preemptoff = ""
//	systemstack(startTheWorldWithSema)
//	semrelease(&worldsema)
//
// It is allowed to acquire worldsema once and then execute multiple
// startTheWorldWithSema/stopTheWorldWithSema pairs.
// Other P's are able to execute between successive calls to
// startTheWorldWithSema and stopTheWorldWithSema.
// Holding worldsema causes any other goroutines invoking
// stopTheWorld to block.
func stopTheWorldWithSema() {
	_g_ := getg() // 获取当前g,这个已经变为g0

	lock(&sched.lock)
	sched.stopwait = gomaxprocs // P的总数量(需要停止的P的数量)
	atomic.Store(&sched.gcwaiting, 1) // 设置sched.gcwaiting为1
	preemptall() // 对所有处于运行状态的P执行抢占操作
	// 停止当前的P
	_g_.m.p.ptr().status = _Pgcstop // Pgcstop is only diagnostic.
	sched.stopwait--
	// 尝试抢占处于syscall状态的P,其实仅修改了P的状态
	for _, p := range allp {
		s := p.status
		if s == _Psyscall && atomic.Cas(&p.status, s, _Pgcstop) {
			if trace.enabled {
				traceGoSysBlock(p)
				traceProcStop(p)
			}
			p.syscalltick++
			sched.stopwait-- // 如果状态修改成功,计数减一
		}
	}
	// 处于闲置的P,由于闲置的P没有在运行,仅需从闲置列表中取出,避免其他M获取到即可,并设置对应状态,计数减一
	for {
		p := pidleget()
		if p == nil {
			break
		}
		p.status = _Pgcstop
		sched.stopwait--
	}
	wait := sched.stopwait > 0 // 是有还有需要等待停止的P
	unlock(&sched.lock)

	// wait for remaining P's to stop voluntarily
	// 等待剩余的P停止(上方preemptall已经尝试抢占所有的运行的P)
	if wait {
		for {
			// wait for 100us, then try to re-preempt in case of any races
			// 等待,底层为futex系统调用,信号量机制,超时时间为100us
			// 如果是因为超时而唤醒,则返回false
			// 如果是因为其他线程调用notetwakeup而唤醒,则返回true
			if notetsleep(&sched.stopnote, 100*1000) {
				// 如果因为notetwakeup而唤醒,清除该标志,并退出for循环
				noteclear(&sched.stopnote)
				break
			}
			preemptall()
		}
	}

	// 正常情况下,这里应该没有需要等待被停止的P了,再检查一下
	bad := ""
	if sched.stopwait != 0 {
		bad = "stopTheWorld: not stopped (stopwait != 0)"
	} else {
		for _, p := range allp {
			if p.status != _Pgcstop {
				bad = "stopTheWorld: not stopped (status != _Pgcstop)"
			}
		}
	}
	if atomic.Load(&freezing) != 0 {
		// Some other thread is panicking. This can cause the
		// sanity checks above to fail if the panic happens in
		// the signal handler on a stopped thread. Either way,
		// we should halt this thread.
		lock(&deadlock)
		lock(&deadlock)
	}
	if bad != "" {
		throw(bad)
	}

	worldStopped()
}

如何停止其他P,或者说其他P是如何停止的

G处理抢占信号后会将自身状态切换为_Grunnable,放入global run queue,并重新调用schedule()

记住这个schedule()

// Tell all goroutines that they have been preempted and they should stop.
// This function is purely best-effort. It can fail to inform a goroutine if a
// processor just started running it.
// No locks need to be held.
// Returns true if preemption request was issued to at least one goroutine.
func preemptall() bool {
	res := false
	// 遍历所有的P
	for _, _p_ := range allp {
		// 只关心处于_Prunning状态的P
		if _p_.status != _Prunning {
			continue
		}
		// 对每个处于_Prunning状态的P调用 preemptone
		if preemptone(_p_) {
			res = true
		}
	}
	return res
}

// 抢占入参指定的P,这里分为协作式抢占和信号抢占
// 协作式抢占是设置G的preempt和stackguard0字段值,等待函数调用时在函数头插入的more stack检查,停止运行的G,切换G的状态为_Grunnable,放入global run queue,并重新调用schedule

// 信号抢占,是向当前P锁绑定的线程 M 发送信号,使得线程触发信号处理函数,在满足安全点的前提下响应该抢占信号,停止运行的G
// 信号处理函数响应后,会在该G的调用栈中强行插入asyncPreempt,然后return
// 等到G继续运行时,会先运行asyncPreempt,进而执行asyncPreempt2->g0->gopreempt_m->goschedImpl->切换G的状态为_Grunnable,放入global run queue,并重新调用schedule
func preemptone(_p_ *p) bool {
	mp := _p_.m.ptr()
	if mp == nil || mp == getg().m {
		return false
	}
	gp := mp.curg
	if gp == nil || gp == mp.g0 {
		return false
	}

	gp.preempt = true

	// Every call in a goroutine checks for stack overflow by
	// comparing the current stack pointer to gp->stackguard0.
	// Setting gp->stackguard0 to StackPreempt folds
	// preemption into the normal stack overflow check.
	gp.stackguard0 = stackPreempt

	// Request an async preemption of this P.
	if preemptMSupported && debug.asyncpreemptoff == 0 {
		_p_.preempt = true
		preemptM(mp)
	}

	return true
}

谁会调用notewakeup

// Stops the current m for stopTheWorld.
// Returns when the world is restarted.
func gcstopm() {
	_g_ := getg()

	if sched.gcwaiting == 0 {
		throw("gcstopm: not waiting for gc")
	}
	if _g_.m.spinning {
		_g_.m.spinning = false
		// OK to just drop nmspinning here,
		// startTheWorld will unpark threads as necessary.
		if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
			throw("gcstopm: negative nmspinning")
		}
	}
	_p_ := releasep()
	lock(&sched.lock)
	_p_.status = _Pgcstop
	sched.stopwait--
	if sched.stopwait == 0 {
		// 当所有需要停止的 P 已经被停止时,调用 notewakeup 唤醒等待该信号量的G
		notewakeup(&sched.stopnote)
	}
	unlock(&sched.lock)
	stopm()
}

进一步 stopm会做什么?

// Stops execution of the current m until new work is available.
// Returns with acquired P.
func stopm() {
	_g_ := getg()

	lock(&sched.lock)
	mput(_g_.m) // 将当前 M 放入idel空闲列表
	unlock(&sched.lock)
	mPark()     // 会在这里睡眠等待被唤醒
	acquirep(_g_.m.nextp.ptr()) // 被唤醒后从_g_.m.nextp重新获得P
	_g_.m.nextp = 0
}

func mPark() {
	g := getg()
	for {
		notesleep(&g.m.park) // 调用futex进行睡眠,在g.m.park信号量上等待,被唤醒后返回
		// Note, because of signal handling by this parked m,
		// a preemptive mDoFixup() may actually occur via
		// mDoFixupAndOSYield(). (See golang.org/issue/44193)
		noteclear(&g.m.park)
		if !mDoFixup() {
			return
		}
	}
}

M如何被唤醒在番外篇中进行讲述

进一步 谁会调用gcstopm

scheduleTopfindrunnable函数中

在调度循环中,会检查当前是否处于STW的过程(sched.gcwaiting != 0),如果是的话,则会调用gcstopm

// One round of scheduler: find a runnable goroutine and execute it.
// Never returns.
func schedule() {
	_g_ := getg()

	if _g_.m.locks != 0 {
		throw("schedule: holding locks")
	}

	if _g_.m.lockedg != 0 {
		stoplockedm()
		execute(_g_.m.lockedg.ptr(), false) // Never returns.
	}

	// We should not schedule away from a g that is executing a cgo call,
	// since the cgo call is using the m's g0 stack.
	if _g_.m.incgo {
		throw("schedule: in cgo")
	}

top:
	pp := _g_.m.p.ptr()
	pp.preempt = false

	if sched.gcwaiting != 0 {
		gcstopm()
		goto top
	}
	if pp.runSafePointFn != 0 {
		runSafePointFn()
	}

	// Sanity check: if we are spinning, the run queue should be empty.
	// Check this before calling checkTimers, as that might call
	// goready to put a ready goroutine on the local run queue.
	if _g_.m.spinning && (pp.runnext != 0 || pp.runqhead != pp.runqtail) {
		throw("schedule: spinning with local work")
	}

	checkTimers(pp, 0)

	var gp *g
	var inheritTime bool

	// Normal goroutines will check for need to wakeP in ready,
	// but GCworkers and tracereaders will not, so the check must
	// be done here instead.
	tryWakeP := false
	if trace.enabled || trace.shutdown {
		gp = traceReader()
		if gp != nil {
			casgstatus(gp, _Gwaiting, _Grunnable)
			traceGoUnpark(gp, 0)
			tryWakeP = true
		}
	}
	if gp == nil && gcBlackenEnabled != 0 {
		gp = gcController.findRunnableGCWorker(_g_.m.p.ptr())
		if gp != nil {
			tryWakeP = true
		}
	}
	if gp == nil {
		// Check the global runnable queue once in a while to ensure fairness.
		// Otherwise two goroutines can completely occupy the local runqueue
		// by constantly respawning each other.
		if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
			lock(&sched.lock)
			gp = globrunqget(_g_.m.p.ptr(), 1)
			unlock(&sched.lock)
		}
	}
	if gp == nil {
		gp, inheritTime = runqget(_g_.m.p.ptr())
		// We can see gp != nil here even if the M is spinning,
		// if checkTimers added a local goroutine via goready.
	}
	if gp == nil {
		gp, inheritTime = findrunnable() // blocks until work is available
	}

至此所有的信息都已经完美拼接上了。

番外篇

如何开启世界(Start The Workd)

注:这里只关注GOMAXPROCS没有发生变更的情况,GOMAXPROCS的实现原理后续再详细分析。

// startTheWorld undoes the effects of stopTheWorld.
func startTheWorld() {
	systemstack(func() { startTheWorldWithSema(false) })

	// worldsema must be held over startTheWorldWithSema to ensure
	// gomaxprocs cannot change while worldsema is held.
	//
	// Release worldsema with direct handoff to the next waiter, but
	// acquirem so that semrelease1 doesn't try to yield our time.
	//
	// Otherwise if e.g. ReadMemStats is being called in a loop,
	// it might stomp on other attempts to stop the world, such as
	// for starting or ending GC. The operation this blocks is
	// so heavy-weight that we should just try to be as fair as
	// possible here.
	//
	// We don't want to just allow us to get preempted between now
	// and releasing the semaphore because then we keep everyone
	// (including, for example, GCs) waiting longer.
	mp := acquirem()
	mp.preemptoff = ""
	semrelease1(&worldsema, true, 0)
	releasem(mp)
}

核心实现

先假设proc没有发生变更

func startTheWorldWithSema(emitTraceEvent bool) int64 {
	assertWorldStopped()

	mp := acquirem() // disable preemption because it can be holding p in a local var
	if netpollinited() { // 检查netpoll中是否有已完成的任务
		list := netpoll(0) // non-blocking
		injectglist(&list)
	}
	lock(&sched.lock)

	procs := gomaxprocs

	p1 := procresize(procs) // 获取所有需要继续运行的P
	sched.gcwaiting = 0 // 清空 sched.gcwaiting 标志位
	if sched.sysmonwait != 0 { // 唤醒sysmon
		sched.sysmonwait = 0
		notewakeup(&sched.sysmonnote)
	}
	unlock(&sched.lock)

	worldStarted()
	// 遍历可运行的P链表
	for p1 != nil {
		p := p1
		p1 = p1.link.ptr()
		if p.m != 0 { // 如果有已绑定的M,直接唤醒该M
			mp := p.m.ptr()
			p.m = 0
			if mp.nextp != 0 {
				throw("startTheWorld: inconsistent mp->nextp")
			}
			mp.nextp.set(p) // 唤醒之前放置到m的nextp字段中
			notewakeup(&mp.park) // 唤醒m,m在mPark()函数中等待
		} else {
			// 如果没有可用的M,创建一个新的M来绑定这个P
			// Start M to run P.  Do not start another M below.
			newm(nil, p, -1)
		}
	}

	// Capture start-the-world time before doing clean-up tasks.
	startTime := nanotime()
	if emitTraceEvent {
		traceGCSTWDone()
	}

	// Wakeup an additional proc in case we have excessive runnable goroutines
	// in local queues or in the global queue. If we don't, the proc will park itself.
	// If we have lots of excessive work, resetspinning will unpark additional procs as necessary.
	// 唤醒一个额外的P,以避免任务不均匀的分布在本地队列或全局队列
	wakep()

	releasem(mp)

	return startTime
}
func procresize(nprocs int32) *p {
	assertLockHeld(&sched.lock)
	assertWorldStopped()

	old := gomaxprocs

	// update statistics
	now := nanotime()
	if sched.procresizetime != 0 {
		sched.totaltime += int64(old) * (now - sched.procresizetime)
	}
	sched.procresizetime = now

	maskWords := (nprocs + 31) / 32

	_g_ := getg()
	if _g_.m.p != 0 && _g_.m.p.ptr().id < nprocs {
		// continue to use the current P
		// 处理当前的P
		_g_.m.p.ptr().status = _Prunning
		_g_.m.p.ptr().mcache.prepareForSweep()
	}

	var runnablePs *p
	// 处理剩余的P
	for i := nprocs - 1; i >= 0; i-- {
		p := allp[i]
		if _g_.m.p.ptr() == p {
			continue
		}
		// 设置P的状态 -> _Pidle
		p.status = _Pidle
		if runqempty(p) {
			// 如果P的runque中没有待运行的任务,放置到idle队列中
			pidleput(p)
		} else {
			// P需要继续运行,放入到runnablePs链表中
			p.m.set(mget()) // 寻找一个可用的M与其绑定
			p.link.set(runnablePs)
			runnablePs = p
		}
	}
	stealOrder.reset(uint32(nprocs))
	var int32p *int32 = &gomaxprocs // make compiler check that gomaxprocs is an int32
	atomic.Store((*uint32)(unsafe.Pointer(int32p)), uint32(nprocs))
	// 返回待运行链表
	return runnablePs
}

总结

Stop The World会先计算所有P的数量,然后尝试停止所有的P

P的状态分为三种情况

其中每停止成功一个P,会将需要等待的数量减一

当前Goroutine睡眠等待所有P结束

对于被抢占的P,被停止后也会重新计算需要等待的数量,当数量减为0后,唤醒睡眠等待的Goroutine

当睡眠的Goroutine被唤醒时,World已经完成Stop,函数返回。

另外这篇文章埋了三个坑,go:linkname(第二次埋坑)、syscallruntime.GOMAXPROCS()有机会来填。