# 15. 网络编程

image-20220827175755343

# 15.1 前置知识

# 15.2 Socket

  • 很多系统都提供 Socket 作为 TCP 网络连接的抽象
  • Linux -> Internet domain socket -> SOCK_STREAM
  • Linux 中 Socket 以“文件描述符” FD 作为标识

# 15.3 IO 模型

image-20220826212254482

在网络编程中,IO 模型指的就是同时操作 Socket 的方案:BIO/NIO/AIO

Go:阻塞模型(协程调度) + 多路复用(系统底层)

# 15.4 Go 对 Epoll 的抽象 - network poller

多路复用器:

  • Linux:Epoll

    • 新建多路复用器:epoll_create()

    • 插入监听事件:epoll_ctl()

    • 查询发生了什么事件:epoll_wait()

  • Mac:kqueue

  • Windows:IOCP

本文仅介绍针对 Linux 的实现。

Go NetWork Poll 是对各个平台多路复用器的抽象和适配:

  • netpollinit -> epoll_create
  • netpollopen -> epoll_ctl
  • netpoll -> epoll_wait

# 15.4.1 netpollinit -> epoll_create

  1. 系统指令:syscall/zsysnum_linux_amd64.go

    	SYS_EPOLL_CREATE = 213
    
  2. 汇编:sys_linux_amd64.s

    // int32 runtime·epollcreate(int32 size);
    TEXT runtime·epollcreate(SB),NOSPLIT,$0
    	MOVL    size+0(FP), DI
    	MOVL    $SYS_epoll_create, AX
    	SYSCALL
    	MOVL	AX, ret+8(FP)
    	RET
    
  3. Go 中的声明:runtime/netpoll_epoll.go

    func epollcreate(size int32) int32
    func epollcreate1(flags int32) int32
    

    epollcreate1 中的 flags

    • 0:此时 epollcreate1 和 epollcreate 功能一致

    • _EPOLL_CLOEXEC:创建的 epfd 会设置 FD_CLOEXEC,它是一个 fd 的标识说明,用来设置文件的 close-on-exec 状态的。

      当 close-on-exec 状态为 0 时,调用 exec 时,fd 不会被关闭;

      非零状态时则会被关系,这样做可以防止 fd 泄露给执行 exec 后的进程。

    • _EPOLL_NONBLOCK:创建的 epfd 会设置为非阻塞

  4. 针对 Linux 的实现:runtime/netpoll_poll.go

    ① 新建 epoll;

    ② 新建一个 pipe 管道用于中断 epoll;

    ③ 将“管道有数据到达”事件注册到 epoll 中。

    // netpollinit 新建多路复用器
    func netpollinit() {
      // 1. 新建一个 epoll fd
    	epfd = epollcreate1(_EPOLL_CLOEXEC)
    	if epfd < 0 {
    		epfd = epollcreate(1024)
    		if epfd < 0 {
    			println("runtime: epollcreate failed with", -epfd)
    			throw("runtime: netpollinit failed")
    		}
    		closeonexec(epfd)
    	}
      // 2. 新建一个 linux 下的非阻塞管道,用来关闭 epoll 的
    	r, w, errno := nonblockingPipe()
    	if errno != 0 {
    		println("runtime: pipe failed with", -errno)
    		throw("runtime: pipe failed")
    	}
      // 3. 创建一个 epoll 事件,并将其与 netpollBreakRd 关联在一起
      
    	ev := epollevent{
    		events: _EPOLLIN,
    	}
    	*(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd
      // 4. 调用 epollctl 将 ev 注册到 epfd 上
      //   _EPOLL_CTL_ADD 表示向 interet list 添加一个需要监视的描述符
      //	这里表示添加对之前创建的 nonblockingPipe 的 _EPOLLIN 事件
      //  即监控是否有数据到达 nonblockingPipe 
      //  综上可以得知,nonblockingPipe 和 netpollBreakRd 是关联在一起的
      //  即 nonblockingPipe 是用来关闭 epoll 的
    	errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev)
    	if errno != 0 {
    		println("runtime: epollctl failed with", -errno)
    		throw("runtime: epollctl failed")
    	}
    	netpollBreakRd = uintptr(r)
    	netpollBreakWr = uintptr(w)
    }
    

# 15.4.2 netpollopen -> epoll_ctl

  1. 系统指令:syscall/zsysnum_linux_amd64.go

    	SYS_EPOLL_CTL = 233
    
  2. 汇编:sys_linux_amd64.s

    // func epollctl(epfd, op, fd int32, ev *epollEvent) int
    TEXT runtime·epollctl(SB),NOSPLIT,$0
    	MOVL	epfd+0(FP), DI
    	MOVL	op+4(FP), SI
    	MOVL	fd+8(FP), DX
    	MOVQ	ev+16(FP), R10
    	MOVL	$SYS_epoll_ctl, AX
    	SYSCALL
    	MOVL	AX, ret+24(FP)
    	RET
    
  3. Go 中的声明:runtime/netpoll_epoll.go

    func epollctl(epfd, op, fd int32, ev *epollevent) int32 // 0: 成功,-1: 失败
    
    • epfd:epoll_create 函数返回的文件描述符,用于标识内核中的 epoll 实例
    • op:对 fd 文件描述符的操作类型:
      • EPOLL_CTL_ADD:向 interest list 添加一个需要监视的描述符
      • EPOLL_CTL_DEL:向 interest list 删除一个描述符
      • EPOLL_CTL_MOD:修改 interst list 中的一个描述符
    • fd:需要被操作的文件描述符
    • ev:一个指向名为 epoll_event 的结构的指针,它存储了我们实际要监视的 fd 的事件
      • EPOLLIN:表示对应的文件描述符可以读(包括对端 SOCKET 正常关闭);
      • EPOLLOUT:表示对应的文件描述符可以写;
      • EPOLLPRI:表示对应的文件描述符有紧急的数据可读;
      • EPOLLERR:表示对应的文件描述符发生错误;
      • EPOLLHUP:表示对应的文件描述符被挂断;
      • EPOLLET: 将 epoll 设为边缘触发(Edge Triggered)模式,相对于水平触发(Level Triggered)来说的;
      • EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个 socket的话,需要再次把这个 socket 加入到 epoll 队列里;
  4. 针对 Linux 的实现:runtime/netpoll_poll.go

    ① 传入一个 socket 的 fd,和 pollDesc 指针,pollDesc 是 Go 中对 socket 的抽象。pollDesc 中记录了 socket 的详细信息,以及哪个协程休眠在等待此 socket;

    ② 将 socket 的可读、可写、断开事件注册到 epoll 中;

    ③ 将 epoll 设置为 ET 模式。

    // 将 fd 的四个事件  _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET 注册到 epfd 上
    func netpollopen(fd uintptr, pd *pollDesc) int32 {
    	var ev epollevent
    	ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
    	*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
    	return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
    }
    

# 15.4.3 netpoll -> epoll_wait

  1. 系统指令:syscall/zsysnum_linux_amd64.go

    	SYS_EPOLL_WAIT  = 232
    
  2. 汇编:sys_linux_amd64.s

    // int32 runtime·epollwait(int32 epfd, EpollEvent *ev, int32 nev, int32 timeout);
    TEXT runtime·epollwait(SB),NOSPLIT,$0
    	// This uses pwait instead of wait, because Android O blocks wait.
    	MOVL	epfd+0(FP), DI
    	MOVQ	ev+8(FP), SI
    	MOVL	nev+16(FP), DX
    	MOVL	timeout+20(FP), R10
    	MOVQ	$0, R8
    	MOVL	$SYS_epoll_pwait, AX
    	SYSCALL
    	MOVL	AX, ret+24(FP)
    	RET
    
  3. Go 中的声明:runtime/netpoll_epoll.go

    func epollwait(epfd int32, ev *epollevent, nev, timeout int32) int32
    
    • epfd:epoll_create 函数返回的文件描述符,用于标识内核中的 epoll 实例
    • ev 已经分配好的 epoll_event 结构体数组的指针(即 ev[0]),epoll 会把发生的事件存入 events 中
    • maxevents:告诉内核 ev 有多大,必须大于 0
    • timeout:超时时间 -1 表示 epoll 将无限制等待下去
  4. 针对 Linux 的实现:runtime/netpoll_poll.go

    ① 根据 delay 确定要轮询多久;

    ② 创建一个长度为 128 的事件列表;

    ③ 调用系统底层的 epollwait,查询有多少事件发生了;

    ④ 新建一个协程列表;

    ⑤ 遍历事件列表;

    ⑥ 获取 go 中对 fd 的抽象结构体的值 pd;

    ⑦ 将 pd 中的 g 取出来加入到 toRun 列表中;

    ⑧ 返回可执行的 goroutine 列表。

    // netpoll checks for ready network connections.
    // Returns list of goroutines that become runnable -> 注意,返回的是 goroutine
    // delay < 0: blocks indefinitely										无限期阻塞
    // delay == 0: does not block, just polls    				不阻塞,只是轮询
    // delay > 0: block for up to that many nanoseconds	阻塞长达数纳秒
    func netpoll(delay int64) gList {
    	if epfd == -1 {
    		return gList{}
    	}
      
      // 1. 根据 delay 确定要轮询多久
    	var waitms int32
    	if delay < 0 {
    		waitms = -1
    	} else if delay == 0 {
    		waitms = 0
    	} else if delay < 1e6 {
    		waitms = 1
    	} else if delay < 1e15 {
    		waitms = int32(delay / 1e6)
    	} else {
    		waitms = 1e9
    	}
      
      // 2. 创建一个长度为 128 的事件列表
    	var events [128]epollevent
    retry:
      // 3. 调用系统底层的 epollwait,查询有多少事件发生了
    	n := epollwait(epfd, &events[0], int32(len(events)), waitms)
    	if n < 0 {
    		if n != -_EINTR {
    			println("runtime: epollwait on fd", epfd, "failed with", -n)
    			throw("runtime: netpoll failed")
    		}
    		// If a timed sleep was interrupted, just return to
    		// recalculate how long we should sleep now.
    		if waitms > 0 {
    			return gList{}
    		}
    		goto retry
    	}
      // 4. 新建一个协程列表
    	var toRun gList
      // 5. 遍历事件列表,
    	for i := int32(0); i < n; i++ {
    		ev := &events[i]
    		if ev.events == 0 {
    			continue
    		}
    		
        // 断开事件
    		if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd {
    			if ev.events != _EPOLLIN {
    				println("runtime: netpoll: break fd ready for", ev.events)
    				throw("runtime: netpoll: break fd ready for something unexpected")
    			}
    			if delay != 0 {
    				// netpollBreak could be picked up by a
    				// nonblocking poll. Only read the byte
    				// if blocking.
    				var tmp [16]byte
    				read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))
    				atomic.Store(&netpollWakeSig, 0)
    			}
    			continue
    		}
    
        // 可读事件
    		var mode int32
    		if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
    			mode += 'r'
    		}
        
        // 可写事件
    		if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
    			mode += 'w'
    		}
        
    		if mode != 0 {
          // 6. 获取 go 中对 fd 的抽象结构体的值 fd
    			pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
    			pd.everr = false
    			if ev.events == _EPOLLERR {
    				pd.everr = true
    			}
          // 7. 将 pd 中的 g 取出来加入到 toRun 列表中
    			netpollready(&toRun, pd, mode)
    		}
    	}
      
      // 8. 返回可执行的 goroutine 列表
    	return toRun
    }
    

    netpollready 表示 pd 底层的 fd 已经可以进行 IO 操作了:

    // netpollready is called by the platform-specific netpoll function.
    // It declares that the fd associated with pd is ready for I/O.
    // The toRun argument is used to build a list of goroutines to return
    // from netpoll. The mode argument is 'r', 'w', or 'r'+'w' to indicate
    // whether the fd is ready for reading or writing or both.
    //
    // This may run while the world is stopped, so write barriers are not allowed.
    //go:nowritebarrier
    func netpollready(toRun *gList, pd *pollDesc, mode int32) {
       var rg, wg *g
       if mode == 'r' || mode == 'r'+'w' {
          rg = netpollunblock(pd, 'r', true)
       }
       if mode == 'w' || mode == 'r'+'w' {
          wg = netpollunblock(pd, 'w', true)
       }
       if rg != nil {
          toRun.push(rg)
       }
       if wg != nil {
          toRun.push(wg)
       }
    }
    

谁在调用 netpoll()?

  • runtime/proc.go 中的 startTheWorldWithSema() 会调用 netpoll()

    func startTheWorldWithSema(emitTraceEvent bool) int64 {
    	...
    	if netpollinited() {
    		list := netpoll(0)   //调用 netpoll
    		injectglist(&list)
      }
      ...
    }
    
  • runtime/mgc.go 中的 gcStart() 会调用 startTheWorldWithSema()

    // gcStart starts the GC. 
    func gcStart(trigger gcTrigger) {
      ...
      // Concurrent mark.
      systemstack(func() {
        now = startTheWorldWithSema(trace.enabled)	// 调用 startTheWorldWithSema
        ...
      })
      ...
    }
    
  • gcStart() 又会被我们的 g0 协程一直循环执行。

综上:netpoll() 不是由应用进程来调用的,而且 g0 协程在循环 gc 的时候,顺带执行了 netpoll() 来检查是否有事件发生。

# 15.5 network poll 对 socket 的抽象 —— pollDesc

# 15.5.1 pollCache

type pollCache struct {
	lock  mutex				// 锁,锁住整个链表
	first *pollDesc		// 表示 pollCache 就是 pollDesc 链表的表头
}

# 15.5.2 pollDesc

// Network poller descriptor.
//
// No heap pointers.
//
//go:notinheap
type pollDesc struct {
	link *pollDesc  // 连接到下一个 pollDesc
	lock    mutex 	// 锁
  
	fd      uintptr		// 指向底层的 socket 文件描述符,不可变
  
  // 以下 4 个操作过程中上锁
  rseq    uintptr   // 避免读计时器过期,用来重置计时器
  wseq    uintptr   // 避免写计时器过期,用来重置计时器
  rt      timer     // 读计时器
  wt      timer     // 写计时器
	
  // 以下 6 个操作过程中不上锁
  everr   bool      // 标记是否有异常事件发生
	rg      uintptr   // pdReady, pdWait, G waiting for read or nil
	rd      int64     // 读截止时间
	wg      uintptr   // pdReady, pdWait, G waiting for write or nil
	wd      int64     // 写截止时间
	closing bool
  
	self    *pollDesc // storage for indirect interface. See (*pollDesc).makeArg.
	user    uint32    // user settable cookie
}

  • link:链接到下一个 pollDesc,说明这是一个链表结构。

  • lock:锁。

    • 该锁可保护 pollOpen、pollSetDeadline、pollUnblock 和 deadlineimpl 操作,这些操作覆盖到 rseq、wseq、rt 和 wt 变量;
    • fd 在整个 pollDesc 生命周期都是不可变的;
    • pollReset,pollWait,pollWaitCanceled 和 runtime·netpollready 执行是不带锁的,因此 everr、rg、rd、wg 和 wd 的所有操作都是以无锁方式进行的。
  • fd:指向底层的 socket 文件描述符。

  • rseqwseq:用来重置读写计时器,防止过期。

  • rgwg:是两个用来休眠读写协程的二进制信号量,该信号量可以有以下 4 种值:

    • nil:即 0,初始状态;
    • pdReady:即 1,网络 IO 就绪,Goroutine 消费完后应置为 nil;
    • pdWait:即 2
      1. Groutine 被调度器挂起,置为 Groutine 地址;
      2. 收到 IO 就绪通知,置改为 pdReady;
      3. 超时或被关闭,置为 nil;
    • G pointer:等待读写当前 socket 的 Goroutine 指针地址,当 io 就绪、超时或者被关闭的时候,此 Goroutine 将被唤醒,同时将值置为 pdReady 或者 nil。

    因为 rgwg 是 Goroutine 的地址,因此当发生 GC 后,如果 Goroutine 在 heap 区的话,就可能被回收,那指针就无效了,会导致代码崩溃。

    所以,在进行 net IO 的 Goroutine 不能在 heap 区分配内存。

总结

  • pollCache:是 pollDesc 链表的表头;

  • pollDesc 主要描述两点:

    1. 封装了操作系统底层的 socket:fd

    2. 对该 socket 有兴趣的协程:wgrg

# 15.6 network poller 工作细节

# 15.6.1 初始化 poll_runtime_pollServerInit

通过原子操作 & 双重检查来执行一次 netpollinit(),创建一个 epoll。

//go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit
func poll_runtime_pollServerInit() {
	netpollGenericInit()
}

func netpollGenericInit() {
  // 类似于 双重检查 的单例模式
  // 保证只执行一次 netpollinit()
	if atomic.Load(&netpollInited) == 0 {
		lockInit(&netpollInitLock, lockRankNetpollInit)
		lock(&netpollInitLock)
		if netpollInited == 0 {
      netpollinit()			// epoll_create() 创建一个多路复用器
			atomic.Store(&netpollInited, 1)
		}
		unlock(&netpollInitLock)
	}
}

补充:go:linkname

The //go:linkname directive instructs the compiler to use “importpath.name” as the object file symbol name for the variable or function declared as “localname” in the source code. Because this directive can subvert the type system and package modularity, it is only enabled in files that have imported “unsafe”.

//go:linkname的目的是告诉编译器使用importpath.name来对本来不可导出的(localname)函数或者变量实现导出功能。由于这种方法是破坏了 Go 语言的模块化规则的,所以必须在导入了"unsafe"包的情况下使用。

即:

由于 Go 语法规则限制,小写字母开头的函数或者变量是本模块私有的,不可被包外的代码访问;但是如果必须要能被外部模块访问到,又要限制为私有方法呢?只能在编译器上做手脚,通过一个特殊的 标记 来实现这种功能。

具体到上面的例子:

//go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit
  • 表示调用 internal/poll.runtime_pollServerInit 相当于调用当前的 poll_runtime_pollServerInit

# 15.6.2 新增监听 poll_runtime_pollOpen

  1. 在 pollcache 链表中分配一个 pollDesc,用来描述要新增将它的 socket;
  2. 初始化 pollDesc,主要是将 rg、wg 置为 0;
  3. 调用 netpollopen,将底层 socket 及其读、写和断开事件注册到 epoll 上;
//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
   // 1. 分配一个 pollDesc,用来描述要新增监听的 socket
   pd := pollcache.alloc()
   // 2. 上锁
   lock(&pd.lock)
   if pd.wg != 0 && pd.wg != pdReady {
      throw("runtime: blocked write on free polldesc")
   }
   if pd.rg != 0 && pd.rg != pdReady {
      throw("runtime: blocked read on free polldesc")
   }
   // 3. 赋值
   pd.fd = fd						// 监听的 socket
   pd.closing = false
   pd.everr = false
   pd.rseq++
   pd.rg = 0			// 初始值,还没感兴趣的 Goroutine
   pd.rd = 0
   pd.wseq++
   pd.wg = 0			// 初始化,还没感兴趣的 Goroutine
   pd.wd = 0
   pd.self = pd
   // 4. 解锁
   unlock(&pd.lock)

   var errno int32
   // 5. 调用 netpollopen  -> epoll_ctl
   // 将 pd 关联的 fd 的相关事件注册到 epoll 上
   errno = netpollopen(fd, pd)
   return pd, int(errno)
}

# 15.6.3 判断是否就绪 poll_runtime_pollWait

  1. 协程要对 socket 进行 read 或者 write 的时候,底层就会调用 poll_runtime_pollWait;
  2. 该方法循环调用 netpollblock(),直到 netpollblock() 返回 true,表明 rg 或 wg 已经置为 pdReady 了,可以进行读或者写了。
  3. netpollblock():
    1. 根据 mode,取出 rg 或者 wg,命名为 gpp;
    2. 如果 gpp 是 pdReady,直接返回 true,否则,置为 pdWait,返回 false。
func (pd *pollDesc) wait(mode int, isFile bool) error {
	if pd.runtimeCtx == 0 {
		return errors.New("waiting for unsupported file type")
	}
	res := runtime_pollWait(pd.runtimeCtx, mode)
	return convertErr(res, isFile)
}
func runtime_pollWait(ctx uintptr, mode int) int
//go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
   ...
   // 循环调用 netpollblock,直到 netpollblock 返回 true
   // 也就是 rg 或 wg 已经置为 pdReady 了,可以读 / 写了
   for !netpollblock(pd, int32(mode), false) {
      ...
   }
   return pollNoError
}
// returns true if IO is ready, or false if timedout or closed
// waitio - wait only for completed IO, ignore errors
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
  
   // 1. 根据 mode,看看是要读还是要写
   gpp := &pd.rg
   if mode == 'w' {
      gpp = &pd.wg
   }

   for {
      old := *gpp
      // 2. 已经 pdReady 了,返回 true,完成
      if old == pdReady {
         *gpp = 0
         return true
      }
      if old != 0 {
         throw("runtime: double wait")
      }
      // 3. 没有 pdReady,则先置为 pdWait,再往下走
      if atomic.Casuintptr(gpp, 0, pdWait) {
         break
      }
     
       if waitio || netpollcheckerr(pd, mode) == 0 {
          // 4. 休眠当前协程,并将调用 netpollblockcommit
         	// 它会将 rg 或者 wg 置为 G Pointer
          gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
    }
   }
   ...
   return old == pdReady
}
func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
   r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
   if r {
      // Bump the count of goroutines waiting for the poller.
      // The scheduler uses this to decide whether to block
      // waiting for the poller if there is nothing else to do.
      atomic.Xadd(&netpollWaiters, 1)
   }
   return r
}

# 15.6.4 调度协程去读写 socket

  • socket 已经可以读写:

    1. runtime 循环调用 netpoll() 方法;

      前面分析过了,是 g0 协程在 gc 的时候顺便调用了 netpoll。

    2. 发现 socket 可读写时,给对应的 rg 或 wg 置为 pdReady(1);

    3. 协程调用 poll_runtime_pollWait() 判断 socket 是否就绪;判断 rg 或者 wg 已经置为 pdReady(1),那就返回 0;

    4. runtime 就知道 socket 可以操作了。

  • socket 暂时不可读写:

    1. runtime 循环调用 netpoll() 方法;
    2. netpoll 中没有监听到任何事件,执行不到 netpollready,没有对 pd 做任何改变;
    3. 协程调用 poll_runtime_pollWait() 判断 socket 是否就绪:
      1. 判断 rg 或 wg 还是 nil(0),就将 rg 或者 wg 置为 pdWait(2)
      2. 调用 gopark 将协程进行休眠等待;
      3. 然后再进入 netpollblockcommit 将 rg 或者 wg 置为 G pointer
    4. 假如 runtime 后面再循环调用 netpoll() 方法;
    5. 发现 socket 可读写时,进入 netpollready 再检查对应的 rg 或者 wg;
    6. netpollready 再进入 netpollunblock,它会检查 rg 或者 wg;
    7. 若为 G pointer,那么就将 rg 或者 wg 置为 pdReady,然后返回协程地址给 runtime;
    8. runtime 就会去调度对应协程进行 socket 的读写操作。
  • 读写后都会再将 rg 或者 wg 置为 nil

总结

image-20220827141814750

Go 的网络操作底层为 阻塞模型(协程调度) + 多路复用(系统底层),具体情况为:

  • BIO:go 协程从网络读取数据,读取失败并且返回syscall.EAGAIN 时,依次调用 waitRead->runtime_pollWait->poll_runtime_pollWait->netpollblock->gopark 将当前协程挂起。
  • NIO:runtime 的 g0 协程在 gc 的时候会顺便调用 netpoll() 检查 socket 事件是否发生,当 socket 可操作的时候,重新唤醒对应协程,进行调度。

具体细节为:

  • runtime
    1. runtime 会一直循环去检查 socket 的可读写状态 —— netpoll()
    2. 然后再看是否有协程在等待对应的 socket:—— netpollready()
      1. 没有,那就单纯记录 pollDesc;
      2. 有那就唤醒协程,将 g 加入 toRun 列表,进行调度 —— netpollunblock()
  • goroutine
    1. 表明想要操作 socket —— poll_runtime_pollWait(pd,mode)
    2. 循环检查自己关心的 socket 是否可操作 —— netpollblock()
      1. 可以操作,goroutine 就会对 socket 进行读或写操作了;
      2. 不可操作:
        1. 就将自己休眠 —— gopark()
        2. 将 rg 或 wg 置为自己的地址 —— netpollblockcommit()

# 15.7 net 包

  • net 包是 go 原生的网络包;

  • net 包实现了 TCP、UDP、HTTP 等网络操作;

  • 使用 net.Listen() 可以得到 LISTEN 状态的 socket —— listener;

  • 使用 listener.Accept() 可以得到 ESTABLISHED 状态的 socket —— conn;

  • conn.Read() / Writer() 可以进行读写 socket 的操作;

  • network poll 作为上述功能的底层支撑;

    本文仅介绍 TCP 相关的部分。

# 15.7.1 net.netFD

netFD 是 Go 中 net 包对 socket 之类的网络文件描述符的抽象。

// Network file descriptor.
type netFD struct {
	pfd poll.FD

	// immutable until Close
	family      int
	sotype      int
	isConnected bool // handshake completed or use of association with peer
	net         string
	laddr       Addr
	raddr       Addr
}

image-20220827160306264

# 15.7.2 net.Listen() Listenter

func Listen(network, address string) (Listener, error) {
	var lc ListenConfig
	return lc.Listen(context.Background(), network, address)
}
  1. 新建 socket,并执行 bind 操作;
  2. 新建一个 netFD,它是 net 包对 socket 的详情描述;
  3. 返回一个 TCPListener 对象,底层是调用了 runtime_pollOpen 方法,将 TCPListener 的 FD 信息加入监听。TCPListener 对象本质是一个 LISTEN 状态的 socket。
image-20220827161726301

# 15.7.3 listener.Accept()

// Accept implements the Accept method in the Listener interface; 
//it waits for the next call and returns a generic Conn.
func (l *TCPListener) Accept() (Conn, error) {
   if !l.ok() {
      return nil, syscall.EINVAL
   }
   c, err := l.accept()
   if err != nil {
      return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
   }
   return c, nil
}
  1. 调用 tcpListener 的 accept,本质上就是调用处于 LISTEN 状态的 socket 的 accept 方法,看看有无新的连接;
  2. 如果失败,休眠等待新的连接,底层调用了 runtime_pollWait;
  3. 如果有新的连接,那就包装成一个新的 socket,最后返回为一个 TCPConn 变量,底层是调用了 runtime_pollOpen 方法,将 TCPConn 的 FD 信息加入监听。TCPConn 对象本质是一个 ESTABLISHED 状态的 socket。

image-20220827164548081

# 15.7.4 conn.Read() / conn.Write()

这两个方法原理差不多,下面以 Read() 为例。

// Read implements the Conn Read method.
func (c *conn) Read(b []byte) (int, error) {
   ...
   n, err := c.fd.Read(b)
	 ...
}
// Read implements io.Reader.
func (fd *FD) Read(p []byte) (int, error) {
   ...
   ...
   // 循环读数据
   for {
      // 1. 调用系统命令 syscall.Read,读取 sysfd 上的数据,然后往 p 写数据
      n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p)
      if err != nil {
         n = 0
         // 2. syscall.EAGAIN 说明还没数据,得先等等
         if err == syscall.EAGAIN && fd.pd.pollable() {
            // 3. 挂起,休眠等待
            if err = fd.pd.waitRead(fd.isFile); err == nil {
               // 4. 当有数据来的时候,会被唤醒走到这里,然后在回到 for 循环读取数据
               continue
            }
         }
      }
      err = fd.eofError(n, err)
      return n, err
   }
}
  1. 底层直接调用 socket 原生读写方法(syscall.Read、syscall.Write);
  2. 成功则直接返回;
  3. 如果失败,休眠等待可读 / 可写事件的发生;
  4. 被唤醒后重新调用系统 socket 进行读写;

image-20220827170751948

# 15.7.3 net.DialTCP()

Dial() 方法支持 TCP、UDP、IP、unix、unixgram 和 unixpacket 网络通讯方式,它是一个统共的方法,通过传入 network 字段来区分不同的网络类型,所以它前面很多的操作,都是在判断当前是什么网络类型。本文主要讲 TCP 的实现底层,故直接进入 DialTCP() 即可,其他的网络类型,也是大同小异的。

func DialTCP(network string, laddr, raddr *TCPAddr) (*TCPConn, error) {
   
   // 1. 看看具体是哪种 tcp 连接
   switch network {
     case "tcp", "tcp4", "tcp6":
     default:
        return nil, &OpError{Op: "dial", Net: network, Source: laddr.opAddr(), Addr: raddr.opAddr(), Err: UnknownNetworkError(network)}
   }
   if raddr == nil {
      return nil, &OpError{Op: "dial", Net: network, Source: laddr.opAddr(), Addr: nil, Err: errMissingAddress}
   }
   // 2. 创建一个系统的网络连接工具
   sd := &sysDialer{network: network, address: raddr.String()}
   // 3. 进行 TCP 连接
   c, err := sd.dialTCP(context.Background(), laddr, raddr)
   if err != nil {
      return nil, &OpError{Op: "dial", Net: network, Source: laddr.opAddr(), Addr: raddr.opAddr(), Err: err}
   }
   return c, nil
}
func (sd *sysDialer) dialTCP(ctx context.Context, laddr, raddr *TCPAddr) (*TCPConn, error) {
   if testHookDialTCP != nil {
      return testHookDialTCP(ctx, sd.network, laddr, raddr)
   }
   // 4. 进入 doDialTCP
   return sd.doDialTCP(ctx, laddr, raddr)
}
func (sd *sysDialer) doDialTCP(ctx context.Context, laddr, raddr *TCPAddr) (*TCPConn, error) {
   // 5. 有了前面的基础,到这就明白了
   // internetSocket 创建一个 fd,生成一个新的 socket,并注册到 epoll 中监听
   fd, err := internetSocket 创建一个 fd(ctx, sd.network, laddr, raddr, syscall.SOCK_STREAM, 0, "dial", sd.Dialer.Control)
		...
   // 6. 返回一个 TCPConn
   return newTCPConn(fd), nil
}
  1. 创建一个系统的网络连接工具 sysDialer;
  2. dial 进行 TCP 连接,连接不上那就是 connect refused;
  3. 连接上的话,创建一个新的 socket,并最后返回为一个 TCPConn 变量,底层是调用了 runtime_pollOpen 方法,将 TCPConn 的 FD 信息加入监听。TCPConn 对象本质是一个 ESTABLISHED 状态的 socket。

image-20220827174234771

# 15.8 goroutine-per-connection

goroutine-per-connection 是一种编程思想,需要开发者自主实现。在 Go 的开发中,我们即希望利用底层多路复用 IO,来支持高并发高性能的 IO 操作。我们又希望编码可以简单些,于是我们可以利用 Go 的协程调度来实现类似阻塞 IO 的方式,一个协程负责一个 socket。

即:阻塞模型(协程调度) + 多路复用(系统底层),兼顾性能与简洁。

func main() {

	listener, err := net.Listen("tcp", ":8888")
	if err != nil {
		panic(err)
	}
	
  // always accpet
	for true {
		conn, err := listener.Accept()
		if err != nil {
			fmt.Printf("error occured: %v", err)
			continue
		}
		// goroutine per connection
		go handleAccept(conn)
	}

}

func handleAccept(conn net.Conn) {
	var body [100]byte
	for true {
		_, err := conn.Read(body[:])
		...
	}
}
上次更新: 8/27/2022, 5:58:36 PM