# 12. Goroutine
# 12.1 底层
协程的本质是
runtime/runtime2.go
中的g
结构体:type g struct { stack stack // 当前协程的协程栈 sched gobuf // 保存协程运行现场 atomicstatus uint32 // 协程状态 goid int64 // 协程 ID m *m // 当前线程 ... }
线程在 Go 中对应是的
runtime/runtime2.go
中的m
结构体:type m struct { g0 *g // g0 协程,Go 中的主协程 curg *g // 现在正在运行的协程 id int64 // 线程ID mOS // 当前操作系统对线程的额外描述信息 ... }
# 12.2 执行
# 12.2.1 单线程循环(Go 0.x)
总结图:
底层源码:
runtime/proc.go
中的schedule()
// schedule 寻找可以可以运行的协程并执行它 func schedule() { // 1. 获得当前运行的协程 _g_ := getg() ... // 永远不调度正在调用 cgo 的协程,因为它要用到 g0 栈 if _g_.m.incgo { throw("schedule: in cgo") } top: ... // 2. 即将要调用的协程 var gp *g // 3. 想办法在本地队列和全局队列中拿到一个可以运行的协程 ... // 4. 调用 execute() execute(gp, inheritTime) }
runtime/proc.go
中的execute()
// execute 让 gp 运行在当前线程上 func execute(gp *g, inheritTime bool) { _g_ := getg() // 1. 将当前协程 _g_ 的线程信息复制到即将要调用的协程 gp _g_.m.curg = gp gp.m = _g_.m casgstatus(gp, _Grunnable, _Grunning) gp.waitsince = 0 gp.preempt = false gp.stackguard0 = gp.stack.lo + _StackGuard ... // 2. 调用 gogo() gogo(&gp.sched) }
runtime/stubs.go
中的gogo()
是用汇编写的// func gogo(buf *gobuf) // 往 g stack 插入一个 goexit() 栈帧并跳到程序计数器的位置,去执行业务方法 TEXT runtime·gogo(SB), NOSPLIT, $16-8 MOVQ buf+0(FP), BX MOVQ gobuf_g(BX), DX MOVQ 0(DX), CX get_tls(CX) // 使用 gobuf 拿到 g stack MOVQ DX, g(CX) MOVQ gobuf_sp(BX), SP // 往 g stack 中插入一个 goexit 的栈帧 MOVQ gobuf_ret(BX), AX MOVQ gobuf_ctxt(BX), DX MOVQ gobuf_bp(BX), BP MOVQ $0, gobuf_sp(BX) MOVQ $0, gobuf_ret(BX) MOVQ $0, gobuf_ctxt(BX) MOVQ $0, gobuf_bp(BX) MOVQ gobuf_pc(BX), BX // 跳转到现在协程的程序计数器的位置 JMP BX
runtime/stubs.go
中的goexit()
也是用汇编写的TEXT runtime·goexit(SB),NOSPLIT,$0-0 BYTE $0x90 CALL runtime·goexit1(SB) // 实际是调用了 runtime.goexit1() BYTE $0x90
runtime/proc.g
中的goexit1()
// 退出当前协程 func goexit1() { if raceenabled { racegoend() } if trace.enabled { traceGoEnd() } // 跳转到 g0 stack 然后调用 goexit0 方法 mcall(goexit0) }
runtime/stubs.go
中的goexit0()
// goexit0 重置当前协程 gp, func goexit0(gp *g) { // 1. 重置 gp 的一些信息 // 2. 丢掉 gp dropg() ... // 3. 重新调度协程执行 schedule() }
抽象图:
# 12.2.2 多线程循环(Go 1.0)
- 在操作系统层面还是线程在运行,对 Goroutine 无感知;
- 操作系统线程执行一个协程调度,顺序执行 Goroutine;
- 调度循环非常像 线程池;
- 就目前(Go1.0)的模型,有多少线程,就最多有多少并发,且会抢夺协程队列的全局锁;
# 12.3 GMP
# 12.3.1 P 的底层
P 即处理器,用来将 G 送到 M 上去执行。它维持了一个本地队列,这样每次获取 G 不一定都要去全局队列拿,大大减少了并发冲突的情况。
它的本质是 runtime/runtime2.go
中的 p
结构体
type p struct {
...
m muintptr // 当前负责的线程
// 可运行的协程的队列,可无锁访问
runqhead uint32 // 队头
runqtail uint32 // 队尾
runq [256]guintptr // 长度为 256
runnext guintptr // 下一个可用的协程的指针
...
}
# 12.3.2 概述
- GMP 模型即每个 P 维护一个本地队列 runq,一般情况下,P 负责在 runq 上挑选 G 送到 M 上去运行,当 runq 上没有 G,就抢占锁去全局队列 globalq 上寻找 G。
# 12.3.3 源码(Go1.16)
再来看
runtime/proc.go
中的schedule()
// schedule 寻找可以可以运行的协程并执行它 func schedule() { // 1. 获得当前运行的协程 _g_ := getg() ... // 永远不调度正在调用 cgo 的协程,因为它要用到 g0 栈 if _g_.m.incgo { throw("schedule: in cgo") } top: ... // 2. 即将要调用的协程 var gp *g // 3. 想办法在本地队列和全局队列中拿到一个可以运行的协程 if gp == nil { // 3.1 在本地队列中寻找 G gp, inheritTime = runqget(_g_.m.p.ptr()) } if gp == nil { gp, inheritTime = findrunnable() // blocks until work is available } ... // 4. 调用 execute() execute(gp, inheritTime) }
runtime/proc.go
中的runqget()
// runqget 从本地队列中获取 g
func runqget(_p_ *p) (gp *g, inheritTime bool) {
// 先找 p 中的 runnext,有就直接拿去执行
for {
next := _p_.runnext
if next == 0 {
break
}
if _p_.runnext.cas(next, 0) {
return next.ptr(), true
}
}
}
findrunnable()
就是先在全局队列中找,还找不到,就去别的 P 下的队列偷一半来,分为globrunqget()
和runqsteal()
runtime/proc.go
的globrunqget()
// 从全局队列中获取 G // 上锁 func globrunqget(_p_ *p, max int32) *g { }
runtime/proc.go
的runqsteal()
// 从 p2 中偷一半的 G 给 _p_ 执行 func runqsteal(_p_, p2 *p, stealRunNextG bool) *g { ... }
# 12.3.4 新建协程
- 新建协程的时候,会随机放到一个本地队列中;
- 如果本地队列都满了的话,才会放在全局队列;
- Go 会优先执行新建的协程;
// newproc 新建一个协程
func newproc(siz int32, fn *funcval) {
argp := add(unsafe.Pointer(&fn), sys.PtrSize)
gp := getg()
pc := getcallerpc()
systemstack(func() {
// 先建 g 结构体值
newg := newproc1(fn, argp, siz, gp, pc)
_p_ := getg().m.p.ptr()
// 将 g 放到队列中
runqput(_p_, newg, true)
if mainStarted {
wakep()
}
})
}
// 将 g 放到队列中
func runqput(_p_ *p, gp *g, next bool) {
...
// 1. 先看看能不能放到本地队列
h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with consumers
t := _p_.runqtail
if t-h < uint32(len(_p_.runq)) {
_p_.runq[t%uint32(len(_p_.runq))].set(gp)
atomic.StoreRel(&_p_.runqtail, t+1) // store-release, makes the item available for consumption
return
}
// 2. 本地队列满了,就放全局队列
if runqputslow(_p_, gp, h, t) {
return
}
...
}
# 12.3.5 总原理图
代号 | 名称 | 定义位置 | 作用 |
---|---|---|---|
Sched | 调度器 | proc.c | 维护有存储 M 和 G 的队列以及调度器的一些状态信息等。 |
M | Machine 系统线程 | runtime.h | 它由操作系统管理的,Goroutine 就是跑在 M 之上的;M 是一个很大的结构,里面维护小对象内存 cache(mcache)、当前执行的 Goroutine、随机数发生器等等非常多的信息。 |
P | Processor 处理器 | runtime.h | 它的主要用途就是用来执行 Goroutine 的,它维护了一个 Goroutine 队列,即 runqueue。Processor 是让我们从 N:1 调度到 M:N 调度的重要部分。 |
G | Goroutine 实现的核心结构 | runtime.h | 它包含了栈,指令指针,以及其他对调度 Goroutine 很重要的信息,例如其阻塞的 channel。 |
# 12.4 并发
# 12.4.1 协程饥饿
- 当有的协程执行时间过长,导致占用线程过久,其他协程得不到运行的机会,就出现了协程饥饿问题。
# 12.4.2 触发切换
- 全局队列协程饥饿问题:线程每循环 61 次,便会从全局队列中挑一个 G 去执行。
// runtime/proc.go
func schedule() {
...
if gp == nil {
// 每循环 61 次,便会从全局队列中挑一个 G 去执行
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock)
gp = globrunqget(_g_.m.p.ptr(), 1)
unlock(&sched.lock)
}
}
...
}
# 12.4.3 切换时机
总结
- 基于协作的抢占式调度
- 主动挂起:
runtime.gopark()
- 系统调用结束时:
exitsyscall()
- 函数跳转时:
morestack()
- 主动挂起:
- 基于信号的抢占式调度
- 信号调度:
doSigPreempt()
- 信号调度:
主动挂起:
runtime.gopark()
// runtime/proc.go // 挂起当前协程 func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) { ... // 切换到 g0 stack,进行线程循环 mcall(park_m) }
系统调用结束时:会执行
exitsyscall()
,它里面会调用mcall()
切换到g0 stack
。// runtime/proc.go // 系统调用完成时会调用 exitsyscall func exitsyscall() { ... // 切换到 g0 stack,进行线程循环 mcall(exitsyscall0) ... }
标记抢占:
morestack()
当函数跳转时都会调用这个方法,它的本意在于检查当前协程栈空间是否有足够内存,如果不够就要扩大该栈空间。当系统监控到协程运行超过
10ms
,就将g.stackguard0
置为0xfffffade
(该值是一个抢占标志),让程序在只执行morestack()
函数时顺便判断一下是否将g
中的stackguard
置为抢占preempt
,如果的确被标记抢占,就回到schedule()
方法,并将当前协程放回队列中。morestack()
在需要创建更多栈空间的时候,会执行newstack()
// runtime/stack.go // newstack 创建新的栈空间 func newstack() { ... // 1. 判断 gp.stackguard0 是否被标记为抢占 preempt := atomic.Loaduintptr(&gp.stackguard0) == stackPreempt ... // 2. 如果被标记位抢占,调用 gogo() if preempt { // 3. 最终会去调用 schedule() 去调新的协程执行 gopreempt_m(gp) // never return } } func gopreempt_m(gp *g) { if trace.enabled { traceGoPreempt() } goschedImpl(gp) } func goschedImpl(gp *g) { ... // 调用 schedule() schedule() }
信号调度:
doSigPreempt()
当程序在执行过程中既无法主动挂起,也不能进行系统调用,且无法进行函数调用时,就可以使用信号来调度。
信号其实就是线程信号,在操作系统中有很多基于信号的底层通信方式(SIGPIPE / SIGURG / SIGHUP),而我们的线程可以注册对应信号的处理函数。
Go 中是注册了
SIGURG
信号的处理函数doSigPreempt()
,在 GC 工作时,向目标线程发送信号。线程收到信号后,会触发调度。// runtime/signal_unix.go // doSigPreempt 处理 gp 上的抢占信号 func doSigPreempt(gp *g, ctxt *sigctxt) { // 1. 检查此 g 是否要被抢占并且安全抢占; if wantAsyncPreempt(gp) { if ok, newpc := isAsyncSafePoint(gp, ctxt.sigpc(), ctxt.sigsp(), ctxt.siglr()); ok { // 2. 调整程序计数器 PC 并异步调用 asyncPreempt ctxt.pushCall(funcPC(asyncPreempt), newpc) } } ... }
// runtime/preempt.go // asyncPreempt 保存所有用户寄存器并调用 asyncPreempt2 func asyncPreempt() func asyncPreempt2() { gp := getg() gp.asyncSafePoint = true // 调用 mcall,跳转到 g0 stack,去执行 schedule() if gp.preemptStop { mcall(preemptPark) } else { mcall(gopreempt_m) } gp.asyncSafePoint = false }
# 12.4 弊端
# 12.4.1 问题
协程太多会导致:
- 文件打开数限制;
- 内存限制;
- 调度开销过大;
# 12.4.2 解决
优化业务逻辑;(优先考虑)
利用 channel 的缓冲区;
func do(i int, ch chan struct{}) { fmt.Println(i) time.Sleep(time.Second) <-ch } func main() { // 限制 channel 缓冲区为 3000,最多支持 3000 协程并发 c := make(chan struct{}, 3000) for i := 0; i < math.MaxInt32; i++ { c <- struct{}{} go do(i, c) } time.Sleep(time.Hour) }
协程池;(一般不)
调整系统资源;