# 12. Goroutine

# 12.1 底层

  • 协程的本质是 runtime/runtime2.go 中的 g 结构体:

    type g struct {
    	stack       stack   	// 当前协程的协程栈
    	sched        gobuf		// 保存协程运行现场
    	atomicstatus uint32		// 协程状态
    	goid         int64		// 协程 ID
      m            *m 			// 当前线程
    	...
    }
    

    image-20220814141535906

  • 线程在 Go 中对应是的 runtime/runtime2.go 中的 m 结构体:

    type m struct {
    	g0      *g     					// g0 协程,Go 中的主协程
    	curg          *g       	// 现在正在运行的协程
    	id            int64		 	// 线程ID
    	mOS											// 当前操作系统对线程的额外描述信息
    	...
    }
    

# 12.2 执行

# 12.2.1 单线程循环(Go 0.x)

总结图:

image-20220815205709047

底层源码:

  • runtime/proc.go 中的 schedule()

    // schedule 寻找可以可以运行的协程并执行它
    func schedule() {
      // 1. 获得当前运行的协程
    	_g_ := getg()
    	...
    	// 永远不调度正在调用 cgo 的协程,因为它要用到 g0 栈
    	if _g_.m.incgo {
    		throw("schedule: in cgo")
    	}
    
    top:
    	...
      // 2. 即将要调用的协程
    	var gp *g
    	// 3. 想办法在本地队列和全局队列中拿到一个可以运行的协程
      ...
      // 4. 调用 execute()
    	execute(gp, inheritTime)
    }
    
  • runtime/proc.go 中的 execute()

    // execute 让 gp 运行在当前线程上
    func execute(gp *g, inheritTime bool) {
    	_g_ := getg()
    
    	// 1. 将当前协程 _g_ 的线程信息复制到即将要调用的协程 gp
    	_g_.m.curg = gp
    	gp.m = _g_.m
    	casgstatus(gp, _Grunnable, _Grunning)
    	gp.waitsince = 0
    	gp.preempt = false
    	gp.stackguard0 = gp.stack.lo + _StackGuard
    	...
    	
      // 2. 调用 gogo()
    	gogo(&gp.sched)
    }
    
  • runtime/stubs.go 中的 gogo() 是用汇编写的

    // func gogo(buf *gobuf)
    // 往 g stack 插入一个 goexit() 栈帧并跳到程序计数器的位置,去执行业务方法
    TEXT runtime·gogo(SB), NOSPLIT, $16-8
    	MOVQ	buf+0(FP), BX		
    	MOVQ	gobuf_g(BX), DX
    	MOVQ	0(DX), CX		
    	get_tls(CX)								// 使用 gobuf 拿到 g stack
    	MOVQ	DX, g(CX)						
    	MOVQ	gobuf_sp(BX), SP		// 往 g stack 中插入一个 goexit 的栈帧
    	MOVQ	gobuf_ret(BX), AX
    	MOVQ	gobuf_ctxt(BX), DX
    	MOVQ	gobuf_bp(BX), BP
    	MOVQ	$0, gobuf_sp(BX)	
    	MOVQ	$0, gobuf_ret(BX)
    	MOVQ	$0, gobuf_ctxt(BX)
    	MOVQ	$0, gobuf_bp(BX)
    	MOVQ	gobuf_pc(BX), BX		// 跳转到现在协程的程序计数器的位置
    	JMP	BX
    
  • runtime/stubs.go 中的 goexit() 也是用汇编写的

    TEXT runtime·goexit(SB),NOSPLIT,$0-0
    	BYTE	$0x90	
    	CALL	runtime·goexit1(SB)	// 实际是调用了 runtime.goexit1()
    	BYTE	$0x90	
    
  • runtime/proc.g 中的 goexit1()

    // 退出当前协程
    func goexit1() {
    	if raceenabled {
    		racegoend()
    	}
    	if trace.enabled {
    		traceGoEnd()
    	}
      // 跳转到 g0 stack 然后调用 goexit0 方法
    	mcall(goexit0)
    }
    
  • runtime/stubs.go 中的 goexit0()

    // goexit0 重置当前协程 gp,
    func goexit0(gp *g) {
    	// 1. 重置 gp 的一些信息
      // 2. 丢掉 gp
    	dropg()
    	...
      // 3. 重新调度协程执行
    	schedule()
    }
    

抽象图:

image-20220815213427093

# 12.2.2 多线程循环(Go 1.0)

  • 在操作系统层面还是线程在运行,对 Goroutine 无感知;
  • 操作系统线程执行一个协程调度,顺序执行 Goroutine;
  • 调度循环非常像 线程池
  • 就目前(Go1.0)的模型,有多少线程,就最多有多少并发,且会抢夺协程队列的全局锁;

image-20220815213750855

# 12.3 GMP

# 12.3.1 P 的底层

P 即处理器,用来将 G 送到 M 上去执行。它维持了一个本地队列,这样每次获取 G 不一定都要去全局队列拿,大大减少了并发冲突的情况。

它的本质是 runtime/runtime2.go 中的 p 结构体

type p struct {
	...
	m           muintptr   // 当前负责的线程
	// 可运行的协程的队列,可无锁访问
	runqhead uint32					 // 队头
	runqtail uint32					 // 队尾
	runq     [256]guintptr   // 长度为 256
	runnext guintptr				 // 下一个可用的协程的指针
	...
}
image-20220815215416942

# 12.3.2 概述

  • GMP 模型即每个 P 维护一个本地队列 runq,一般情况下,P 负责在 runq 上挑选 G 送到 M 上去运行,当 runq 上没有 G,就抢占锁去全局队列 globalq 上寻找 G。

# 12.3.3 源码(Go1.16)

  • 再来看 runtime/proc.go 中的 schedule()

    // schedule 寻找可以可以运行的协程并执行它
    func schedule() {
      // 1. 获得当前运行的协程
    	_g_ := getg()
    	...
    	// 永远不调度正在调用 cgo 的协程,因为它要用到 g0 栈
    	if _g_.m.incgo {
    		throw("schedule: in cgo")
    	}
    
    top:
    	...
      // 2. 即将要调用的协程
    	var gp *g
    	// 3. 想办法在本地队列和全局队列中拿到一个可以运行的协程
      if gp == nil {
        // 3.1 在本地队列中寻找 G
    		gp, inheritTime = runqget(_g_.m.p.ptr())
    	}
    	if gp == nil {
    		gp, inheritTime = findrunnable() // blocks until work is available
    	}
      ...
      // 4. 调用 execute()
    	execute(gp, inheritTime)
    }
    
  • runtime/proc.go 中的 runqget()

// runqget 从本地队列中获取 g
func runqget(_p_ *p) (gp *g, inheritTime bool) {
	// 先找 p 中的 runnext,有就直接拿去执行
	for {
		next := _p_.runnext
		if next == 0 {
			break
		}
		if _p_.runnext.cas(next, 0) {
			return next.ptr(), true
		}
	}
}
  • findrunnable() 就是先在全局队列中找,还找不到,就去别的 P 下的队列偷一半来,分为 globrunqget()runqsteal()

  • runtime/proc.goglobrunqget()

    // 从全局队列中获取 G
    // 上锁
    func globrunqget(_p_ *p, max int32) *g {
    
    }
    
  • runtime/proc.gorunqsteal()

    // 从 p2 中偷一半的 G 给 _p_ 执行
    func runqsteal(_p_, p2 *p, stealRunNextG bool) *g {
    	...
    }
    

# 12.3.4 新建协程

  • 新建协程的时候,会随机放到一个本地队列中;
  • 如果本地队列都满了的话,才会放在全局队列;
  • Go 会优先执行新建的协程;
// newproc 新建一个协程
func newproc(siz int32, fn *funcval) {
	argp := add(unsafe.Pointer(&fn), sys.PtrSize)
	gp := getg()
	pc := getcallerpc()
	systemstack(func() {
    // 先建 g 结构体值
		newg := newproc1(fn, argp, siz, gp, pc)

		_p_ := getg().m.p.ptr()
    // 将 g 放到队列中
		runqput(_p_, newg, true)

		if mainStarted {
			wakep()
		}
	})
}
// 将 g 放到队列中
func runqput(_p_ *p, gp *g, next bool) {

	...
  // 1. 先看看能不能放到本地队列
	h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with consumers
	t := _p_.runqtail
	if t-h < uint32(len(_p_.runq)) {
		_p_.runq[t%uint32(len(_p_.runq))].set(gp)
		atomic.StoreRel(&_p_.runqtail, t+1) // store-release, makes the item available for consumption
		return
	}
  // 2. 本地队列满了,就放全局队列
	if runqputslow(_p_, gp, h, t) {
		return
	}
  ...
}

# 12.3.5 总原理图

img
代号 名称 定义位置 作用
Sched 调度器 proc.c 维护有存储 M 和 G 的队列以及调度器的一些状态信息等。
M Machine 系统线程 runtime.h 它由操作系统管理的,Goroutine 就是跑在 M 之上的;M 是一个很大的结构,里面维护小对象内存 cache(mcache)、当前执行的 Goroutine、随机数发生器等等非常多的信息。
P Processor 处理器 runtime.h 它的主要用途就是用来执行 Goroutine 的,它维护了一个 Goroutine 队列,即 runqueue。Processor 是让我们从 N:1 调度到 M:N 调度的重要部分。
G Goroutine 实现的核心结构 runtime.h 它包含了栈,指令指针,以及其他对调度 Goroutine 很重要的信息,例如其阻塞的 channel。

# 12.4 并发

# 12.4.1 协程饥饿

  • 当有的协程执行时间过长,导致占用线程过久,其他协程得不到运行的机会,就出现了协程饥饿问题。

# 12.4.2 触发切换

image-20220815233517856

  • 全局队列协程饥饿问题:线程每循环 61 次,便会从全局队列中挑一个 G 去执行。
// runtime/proc.go
func schedule() {
	...
	if gp == nil {
		// 每循环 61 次,便会从全局队列中挑一个 G 去执行
		if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
			lock(&sched.lock)
			gp = globrunqget(_g_.m.p.ptr(), 1)
			unlock(&sched.lock)
		}
	}
	...
}

# 12.4.3 切换时机

总结

  • 基于协作的抢占式调度
    • 主动挂起:runtime.gopark()
    • 系统调用结束时:exitsyscall()
    • 函数跳转时:morestack()
  • 基于信号的抢占式调度
    • 信号调度:doSigPreempt()
  • 主动挂起:runtime.gopark()

    image-20220815234253697

    // runtime/proc.go
    // 挂起当前协程
    func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
    	...
      // 切换到 g0 stack,进行线程循环
    	mcall(park_m)
    }
    
  • 系统调用结束时:会执行 exitsyscall(),它里面会调用 mcall() 切换到 g0 stack

    image-20220815234932384

    // runtime/proc.go
    // 系统调用完成时会调用 exitsyscall
    func exitsyscall() {
    	...
    	// 切换到 g0 stack,进行线程循环
    	mcall(exitsyscall0)
      ...
    }
    
  • 标记抢占:morestack()

    当函数跳转时都会调用这个方法,它的本意在于检查当前协程栈空间是否有足够内存,如果不够就要扩大该栈空间。当系统监控到协程运行超过 10ms,就将 g.stackguard0 置为 0xfffffade(该值是一个抢占标志),让程序在只执行 morestack() 函数时顺便判断一下是否将 g 中的 stackguard 置为抢占 preempt,如果的确被标记抢占,就回到 schedule() 方法,并将当前协程放回队列中。

    image-20220815235915661

    morestack() 在需要创建更多栈空间的时候,会执行 newstack()

    // runtime/stack.go
    // newstack 创建新的栈空间
    func newstack() {
    	...
    	// 1. 判断 gp.stackguard0 是否被标记为抢占
    	preempt := atomic.Loaduintptr(&gp.stackguard0) == stackPreempt
    	...
      // 2. 如果被标记位抢占,调用 gogo()
    	if preempt {
        // 3. 最终会去调用  schedule() 去调新的协程执行
    		gopreempt_m(gp) // never return
    	}
    }
    
    func gopreempt_m(gp *g) {
    	if trace.enabled {
    		traceGoPreempt()
    	}
    	goschedImpl(gp)
    }
    
    func goschedImpl(gp *g) {
    	...
      // 调用 schedule()
    	schedule()
    }
    
  • 信号调度:doSigPreempt()

    当程序在执行过程中既无法主动挂起,也不能进行系统调用,且无法进行函数调用时,就可以使用信号来调度。

    信号其实就是线程信号,在操作系统中有很多基于信号的底层通信方式(SIGPIPE / SIGURG / SIGHUP),而我们的线程可以注册对应信号的处理函数。

    Go 中是注册了 SIGURG 信号的处理函数 doSigPreempt(),在 GC 工作时,向目标线程发送信号。线程收到信号后,会触发调度。

    image-20220816000612561

    // runtime/signal_unix.go
    
    // doSigPreempt 处理 gp 上的抢占信号
    func doSigPreempt(gp *g, ctxt *sigctxt) {
    	// 1. 检查此 g 是否要被抢占并且安全抢占;
    	if wantAsyncPreempt(gp) {
    		if ok, newpc := isAsyncSafePoint(gp, ctxt.sigpc(), ctxt.sigsp(), ctxt.siglr()); ok {
    			// 2. 调整程序计数器 PC 并异步调用 asyncPreempt
    			ctxt.pushCall(funcPC(asyncPreempt), newpc)
    		}
    	}
    	...
    }
    
    // runtime/preempt.go
    
    // asyncPreempt 保存所有用户寄存器并调用 asyncPreempt2
    func asyncPreempt()
    
    func asyncPreempt2() {
    	gp := getg()
    	gp.asyncSafePoint = true
      // 调用 mcall,跳转到 g0 stack,去执行 schedule()
    	if gp.preemptStop {
    		mcall(preemptPark)
    	} else {
    		mcall(gopreempt_m)
    	}
    	gp.asyncSafePoint = false
    }
    

# 12.4 弊端

# 12.4.1 问题

协程太多会导致:

  • 文件打开数限制;
  • 内存限制;
  • 调度开销过大;

# 12.4.2 解决

  • 优化业务逻辑;(优先考虑)

  • 利用 channel 的缓冲区;

    func do(i int, ch chan struct{}) {
      fmt.Println(i)
      time.Sleep(time.Second)
      <-ch
    }
    
    func main() {
      // 限制 channel 缓冲区为 3000,最多支持 3000 协程并发
      c := make(chan struct{}, 3000)
      for i := 0; i < math.MaxInt32; i++ {
        c <- struct{}{}
        go do(i, c)
      }
      time.Sleep(time.Hour)
    }
    
  • 协程池;(一般不)

  • 调整系统资源;

上次更新: 8/16/2022, 12:18:14 AM