Linux信号处理
Linux 上也存在不可靠信号和可靠信号的 概念。其中不可靠信号可能丢失,多次发送相同的信号只能收到一次,取值从 1 至 31; 可靠信号则可以进行排队,取值从 32 至 64。
https://www.cnblogs.com/coding-my-life/p/4782529.html
https://www.cnblogs.com/cobbliu/p/5592659.html
Go中的信号处理
参考:https://golang.design/under-the-hood/zh-cn/part2runtime/ch06sched/signal/
遗留问题:
- 如果保证连续发送两个相同信号,信号不丢失的?
- 对于可靠信号和不可靠信号是否有不同处理?
- 不同
M
所监听的信号mask
需要保持完全一致吗?
由于对于Go程序员来说,并不感知线程(M)的存在,所以Go中的程序接管了操作系统的信号,并进行转发处理。
信号处理主要函数
//go:nowritebarrierrec
func sighandler(sig uint32, info *siginfo, ctxt unsafe.Pointer, gp *g) {
_g_ := getg()
c := &sigctxt{info, ctxt}
if sig == _SIGPROF {
sigprof(c.sigpc(), c.sigsp(), c.siglr(), gp, _g_.m)
return
}
if sig == _SIGTRAP && testSigtrap != nil && testSigtrap(info, (*sigctxt)(noescape(unsafe.Pointer(c))), gp) {
return
}
if sig == _SIGUSR1 && testSigusr1 != nil && testSigusr1(gp) {
return
}
// 抢占调度
if sig == sigPreempt {
// Might be a preemption signal.
doSigPreempt(gp, c)
// Even if this was definitely a preemption signal, it
// may have been coalesced with another signal, so we
// still let it through to the application.
}
flags := int32(_SigThrow)
if sig < uint32(len(sigtable)) {
flags = sigtable[sig].flags
}
if flags&_SigPanic != 0 && gp.throwsplit {
// We can't safely sigpanic because it may grow the
// stack. Abort in the signal handler instead.
flags = (flags &^ _SigPanic) | _SigThrow
}
if isAbortPC(c.sigpc()) {
// On many architectures, the abort function just
// causes a memory fault. Don't turn that into a panic.
flags = _SigThrow
}
if c.sigcode() != _SI_USER && flags&_SigPanic != 0 {
// The signal is going to cause a panic.
// Arrange the stack so that it looks like the point
// where the signal occurred made a call to the
// function sigpanic. Then set the PC to sigpanic.
// Have to pass arguments out of band since
// augmenting the stack frame would break
// the unwinding code.
gp.sig = sig
gp.sigcode0 = uintptr(c.sigcode())
gp.sigcode1 = uintptr(c.fault())
gp.sigpc = c.sigpc()
c.preparePanic(sig, gp)
return
}
// 对用户注册的信号进行转发
if c.sigcode() == _SI_USER || flags&_SigNotify != 0 {
if sigsend(sig) {
return
}
}
// 设置为可忽略的用户信号
if c.sigcode() == _SI_USER && signal_ignored(sig) {
return
}
// 正常情况在这里已返回,下方是kill、throw、crash等
if flags&_SigKill != 0 {
dieFromSignal(sig)
}
if flags&_SigThrow == 0 {
return
}
_g_.m.throwing = 1
_g_.m.caughtsig.set(gp)
if crashing == 0 {
startpanic_m()
}
if sig < uint32(len(sigtable)) {
print(sigtable[sig].name, "\n")
} else {
print("Signal ", sig, "\n")
}
print("PC=", hex(c.sigpc()), " m=", _g_.m.id, " sigcode=", c.sigcode(), "\n")
if _g_.m.lockedg != 0 && _g_.m.ncgo > 0 && gp == _g_.m.g0 {
print("signal arrived during cgo execution\n")
gp = _g_.m.lockedg.ptr()
}
print("\n")
level, _, docrash := gotraceback()
if level > 0 {
goroutineheader(gp)
tracebacktrap(c.sigpc(), c.sigsp(), c.siglr(), gp)
if crashing > 0 && gp != _g_.m.curg && _g_.m.curg != nil && readgstatus(_g_.m.curg)&^_Gscan == _Grunning {
// tracebackothers on original m skipped this one; trace it now.
goroutineheader(_g_.m.curg)
traceback(^uintptr(0), ^uintptr(0), 0, _g_.m.curg)
} else if crashing == 0 {
tracebackothers(gp)
print("\n")
}
dumpregs(c)
}
if docrash {
crashing++
if crashing < mcount()-int32(extraMCount) {
// There are other m's that need to dump their stacks.
// Relay SIGQUIT to the next m by sending it to the current process.
// All m's that have already received SIGQUIT have signal masks blocking
// receipt of any signals, so the SIGQUIT will go to an m that hasn't seen it yet.
// When the last m receives the SIGQUIT, it will fall through to the call to
// crash below. Just in case the relaying gets botched, each m involved in
// the relay sleeps for 5 seconds and then does the crash/exit itself.
// In expected operation, the last m has received the SIGQUIT and run
// crash/exit and the process is gone, all long before any of the
// 5-second sleeps have finished.
print("\n-----\n\n")
raiseproc(_SIGQUIT)
usleep(5 * 1000 * 1000)
}
crash()
}
printDebugLog()
exit(2)
}
函数 sigsend
会将用户信号发送到信号队列 sig
中,将sig.mask
对应槽位设置标志位。
$GOROOT\src\runtime\sigqueue.go
var sig struct {
note note
mask [(_NSIG + 31) / 32]uint32
wanted [(_NSIG + 31) / 32]uint32
ignored [(_NSIG + 31) / 32]uint32
recv [(_NSIG + 31) / 32]uint32
state uint32
delivering uint32
inuse bool
}
// sigsend delivers a signal from sighandler to the internal signal delivery queue.
// It reports whether the signal was sent. If not, the caller typically crashes the program.
// It runs from the signal handler, so it's limited in what it can do.
func sigsend(s uint32) bool {
bit := uint32(1) << uint(s&31)
if !sig.inuse || s >= uint32(32*len(sig.wanted)) {
return false
}
atomic.Xadd(&sig.delivering, 1)
// We are running in the signal handler; defer is not available.
if w := atomic.Load(&sig.wanted[s/32]); w&bit == 0 {
atomic.Xadd(&sig.delivering, -1)
return false
}
// Add signal to outgoing queue.
for {
mask := sig.mask[s/32]
if mask&bit != 0 {
atomic.Xadd(&sig.delivering, -1)
return true // signal already in queue
}
if atomic.Cas(&sig.mask[s/32], mask, mask|bit) {
break
}
}
// Notify receiver that queue has new bit.
Send:
for {
switch atomic.Load(&sig.state) {
default:
throw("sigsend: inconsistent state")
case sigIdle:
if atomic.Cas(&sig.state, sigIdle, sigSending) {
break Send
}
case sigSending:
// notification already pending
break Send
case sigReceiving:
if atomic.Cas(&sig.state, sigReceiving, sigIdle) {
if GOOS == "darwin" {
sigNoteWakeup(&sig.note)
break Send
}
notewakeup(&sig.note)
break Send
}
}
}
atomic.Xadd(&sig.delivering, -1)
return true
}
信号接收
// Called to receive the next queued signal.
// Must only be called from a single goroutine at a time.
//go:linkname signal_recv os/signal.signal_recv
func signal_recv() uint32 {
for {
// Serve any signals from local copy.
for i := uint32(0); i < _NSIG; i++ {
if sig.recv[i/32]&(1<<(i&31)) != 0 {
sig.recv[i/32] &^= 1 << (i & 31)
return i
}
}
// Wait for updates to be available from signal sender.
Receive:
for {
switch atomic.Load(&sig.state) {
default:
throw("signal_recv: inconsistent state")
case sigIdle:
if atomic.Cas(&sig.state, sigIdle, sigReceiving) {
if GOOS == "darwin" {
sigNoteSleep(&sig.note)
break Receive
}
notetsleepg(&sig.note, -1)
noteclear(&sig.note)
break Receive
}
case sigSending:
if atomic.Cas(&sig.state, sigSending, sigIdle) {
break Receive
}
}
}
// Incorporate updates from sender into local copy.
// 从sig.mask中转移到sig.recv中,并清零
for i := range sig.mask {
sig.recv[i] = atomic.Xchg(&sig.mask[i], 0)
}
}
}
signal_recv
会在os/signal
中被调用
os/signal
初始化函数
func init() {
signal_enable(0) // first call - initialize
watchSignalLoop = loop
}
循环处理接收到的信号
signal_recv
对应上方信号接收函数,通过go:linkname
进行链接
func loop() {
for {
process(syscall.Note(signal_recv()))
}
}
对于收到的信号,转到至对应的channel
(Notify
时传入的参数)
func process(sig os.Signal) {
n := signum(sig)
if n < 0 {
return
}
handlers.Lock()
defer handlers.Unlock()
for c, h := range handlers.m {
if h.want(n) {
// send but do not block for it
select {
case c <- sig:
default:
}
}
}
// Avoid the race mentioned in Stop.
for _, d := range handlers.stopping {
if d.h.want(n) {
select {
case d.c <- sig:
default:
}
}
}
}
func Notify(c chan<- os.Signal, sig ...os.Signal) {
if c == nil {
panic("os/signal: Notify using nil channel")
}
// 启动后台转发goroutine
watchSignalLoopOnce.Do(func() {
if watchSignalLoop != nil {
go watchSignalLoop()
}
})
handlers.Lock()
defer handlers.Unlock()
// 将channel和对应要处理的的信号,存储到map中
h := handlers.m[c]
if h == nil {
if handlers.m == nil {
handlers.m = make(map[chan<- os.Signal]*handler)
}
h = new(handler) // handler是一个存储mask的结构体
handlers.m[c] = h
}
add := func(n int) {
if n < 0 {
return
}
if !h.want(n) {
h.set(n)
// 如果该信号是第一次监听,会调用enableSignal
if handlers.ref[n] == 0 {
enableSignal(n)
}
handlers.ref[n]++
}
}
// 如果为空,监听所有信号?
if len(sig) == 0 {
for n := 0; n < numSig; n++ {
add(n)
}
} else {
// 设置handler对应的mask
for _, s := range sig {
add(signum(s))
}
}
}
func enableSignal(sig int) {
signal_enable(uint32(sig))
}
// Must only be called from a single goroutine at a time.
//go:linkname signal_enable os/signal.signal_enable
func signal_enable(s uint32) {
if !sig.inuse {
// The first call to signal_enable is for us
// to use for initialization. It does not pass
// signal information in m.
// 设置信号监听标识
sig.inuse = true // enable reception of signals; cannot disable
if GOOS == "darwin" {
sigNoteSetup(&sig.note)
return
}
noteclear(&sig.note)
return
}
if s >= uint32(len(sig.wanted)*32) {
return
}
w := sig.wanted[s/32]
w |= 1 << (s & 31)
atomic.Store(&sig.wanted[s/32], w)
i := sig.ignored[s/32]
i &^= 1 << (s & 31)
atomic.Store(&sig.ignored[s/32], i)
sigenable(s)
}
// sigenable enables the Go signal handler to catch the signal sig.
// It is only called while holding the os/signal.handlers lock,
// via os/signal.enableSignal and signal_enable.
func sigenable(sig uint32) {
if sig >= uint32(len(sigtable)) {
return
}
// SIGPROF is handled specially for profiling.
if sig == _SIGPROF {
return
}
t := &sigtable[sig]
if t.flags&_SigNotify != 0 {
ensureSigM()
enableSigChan <- sig
<-maskUpdatedChan
if atomic.Cas(&handlingSig[sig], 0, 1) {
atomic.Storeuintptr(&fwdSig[sig], getsig(sig))
// 重新设置要监听的信号?
setsig(sig, funcPC(sighandler))
}
}
}
这里会确保,后台一定有一个M
(虽然启动的是一个goroutine
,但是调用了LockOSThread
,所以这个G
是独占一个M
的)会监听所有需要监听的信号。
// ensureSigM starts one global, sleeping thread to make sure at least one thread
// is available to catch signals enabled for os/signal.
func ensureSigM() {
if maskUpdatedChan != nil {
return
}
maskUpdatedChan = make(chan struct{})
disableSigChan = make(chan uint32)
enableSigChan = make(chan uint32)
go func() {
// Signal masks are per-thread, so make sure this goroutine stays on one
// thread.
LockOSThread()
defer UnlockOSThread()
// The sigBlocked mask contains the signals not active for os/signal,
// initially all signals except the essential. When signal.Notify()/Stop is called,
// sigenable/sigdisable in turn notify this thread to update its signal
// mask accordingly.
sigBlocked := sigset_all
for i := range sigtable {
if !blockableSig(uint32(i)) {
sigdelset(&sigBlocked, i)
}
}
sigprocmask(_SIG_SETMASK, &sigBlocked, nil)
for {
select {
case sig := <-enableSigChan:
if sig > 0 {
sigdelset(&sigBlocked, int(sig))
}
case sig := <-disableSigChan:
if sig > 0 && blockableSig(sig) {
sigaddset(&sigBlocked, int(sig))
}
}
sigprocmask(_SIG_SETMASK, &sigBlocked, nil)
maskUpdatedChan <- struct{}{}
}
}()
}
//go:nosplit
//go:nowritebarrierrec
func sigprocmask(how int32, new, old *sigset) {
rtsigprocmask(how, new, old, int32(unsafe.Sizeof(*new)))
}
TEXT runtime·rtsigprocmask(SB),NOSPLIT,$0-28
MOVL how+0(FP), DI
MOVQ new+8(FP), SI
MOVQ old+16(FP), DX
MOVL size+24(FP), R10
MOVL $SYS_rt_sigprocmask, AX
SYSCALL
CMPQ AX, $0xfffffffffffff001
JLS 2(PC)
MOVL $0xf1, 0xf1 // crash
RET
本文由 LeonardWang 创作,采用 知识共享署名4.0
国际许可协议进行许可
本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名
最后编辑时间为: Dec 11,2020