# 14. channel

总结

channel 底层是 hchan 结构体,包含了:

  • 一个环形缓存队列;
  • 接受者队列、发送者队列;
  • 锁;
  • 关闭标志;

发送:chansend()

  • 直接发送
  • 塞入缓存
  • 休眠等待

接收:chanrecv()

  • 直接接收
  • 从缓存拿
  • 休眠等待

# 14.1 底层

channel 底层是 runtime/chan.gohchan 结构体:

type hchan struct {
	qcount   uint           // total data in the queue
	dataqsiz uint           // size of the circular queue
	buf      unsafe.Pointer // points to an array of dataqsiz elements
	elemsize uint16
	closed   uint32
	elemtype *_type // element type
	sendx    uint   // send index
	recvx    uint   // receive index
	recvq    waitq  // list of recv waiters
	sendq    waitq  // list of send waiters

	// lock protects all fields in hchan, as well as several
	// fields in sudogs blocked on this channel.
	//
	// Do not change another G's status while holding this lock
	// (in particular, do not ready a G), as this can deadlock
	// with stack shrinking.
	lock mutex	
}

type waitq struct {
	first *sudog
	last  *sudog
}

其中以下五个字段组成了一个 环形缓冲队列

	qcount   uint           // 当前在队列中的数据个数
	dataqsiz uint           // 环形队列大小
	buf      unsafe.Pointer // 指向环形队列的指针
	elemsize uint16					// 每个数据大小
	elemtype *_type 				// 数据类型
  • 环形缓存可以大幅降低 GC 的开销。

其中还有四个字段组成了两个 链表

sendx    uint   // 下次要发送的数据的 index
recvx    uint   // 下个要接收的数据的 index
recvq    waitq  // 接受者等待队列
sendq    waitq  // 发送者等待队列

还有一个

lock mutex		// 保存 hchan 中的所有字段
  • 互斥锁并不是排队发送 / 接收数据,它保护的是 hchan 结构体本身。

还有一个 标记

closed   uint32   // 标记 channel 是否已经关闭

image-20220823000852833

# 14.2 发送

  1. 对整个 channel 上锁;
  2. 检查 channel 是否已经关闭,若关系,这 panic;
  3. 检查是否有正在等待中的协程:
    1. 有的话,直接将数据拷贝给它,然后唤醒它;
    2. 没有,则检查缓存队列是否已满:
      1. 没有满,则将数据塞入缓存队列中;
      2. 已满,则把自己包装成 sudog 放入 sendq 队列,休眠并解锁,等待唤醒。被唤醒后数据已经被取走了,当下 sudog 负责维护其他的数据;
  4. 解锁。
// entry point for c <- x from compiled code
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
	chansend(c, elem, true, getcallerpc())
}

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
   ...
   // 1. 对整个 channel 上锁
   lock(&c.lock)
   // 2. 检查 channel 是否已经关闭了,不能发送数据到已经关闭的 channel
   if c.closed != 0 {
      unlock(&c.lock)
      panic(plainError("send on closed channel"))
   }

   // 3. 检查是否有等待中的协程,有的话,直接将数据拷贝给它,然后唤醒它
   if sg := c.recvq.dequeue(); sg != nil {
      send(c, sg, ep, func() { unlock(&c.lock) }, 3)
      return true
   }
  
   // 没有等待中的协程,往下走

   // 4. 如果缓存队列还没满,则将数据放入缓存中
   if c.qcount < c.dataqsiz {
      qp := chanbuf(c, c.sendx)
      if raceenabled {
         racenotify(c, c.sendx, nil)
      }
      typedmemmove(c.elemtype, qp, ep)
      c.sendx++
      if c.sendx == c.dataqsiz {
         c.sendx = 0
      }
      c.qcount++
      unlock(&c.lock)
      return true
   }

   if !block {
      unlock(&c.lock)
      return false
   }

   // 5. 缓存队列满了,则休眠等待,等待唤醒
   gp := getg()							// 获取当前协程,当下是 sender
   mysg := acquireSudog()		// 将自己包装成 sudog,并赋初始值
   mysg.releasetime = 0
   if t0 != 0 {
      mysg.releasetime = -1
   }
   mysg.elem = ep			// 赋值数据
   mysg.waitlink = nil
   mysg.g = gp
   mysg.isSelect = false
   mysg.c = c
   gp.waiting = mysg
   gp.param = nil
   c.sendq.enqueue(mysg)    // 入队
   atomic.Store8(&gp.parkingOnChan, 1)
   gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)			// 休眠并解锁
   KeepAlive(ep)

   // 6. 被唤醒
   if mysg != gp.waiting {
      throw("G waiting list is corrupted")
   }
   gp.waiting = nil
   gp.activeStackChans = false
   closed := !mysg.success
   gp.param = nil
   if mysg.releasetime > 0 {
      blockevent(mysg.releasetime-t0, 2)
   }
   mysg.c = nil
   releaseSudog(mysg)		// 被唤醒的时候,数据其实已经被取走了,mysg 负责维护其他数据
   if closed {
      if c.closed == 0 {
         throw("chansend: spurious wakeup")
      }
      panic(plainError("send on closed channel"))
   }
   return true
}

# 14.3 接收

  1. 对整个 channel 上锁;
  2. 如果 channel 已经关闭,且缓存中没有数据,如果这个时候 eq 指向的地址有数据,则清空数据;
  3. 检查是否有等待中的 sender:
    1. 有,则看 channel 有无缓存:
      1. 没有,则直接从 sender 中取走数据,唤醒 sender;
      2. 有,则说明缓存已满,从缓存队列队头取走数据,然后将 sender 数据塞到队尾,唤醒 sender;
    2. 无,则看 channel 有无缓存:
      1. 有,则直接从缓存中取走数据,维护队列索引,解锁返回;
      2. 无,则将自己包装成 sudog,放入 recvq 休眠等待唤醒,被唤醒的时候,sender 已经将数据拷贝到位了;
  4. 解锁。
// entry points for <- c from compiled code
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
   chanrecv(c, elem, true)
}
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {

   ...

   var t0 int64
   if blockprofilerate > 0 {
      t0 = cputicks()
   }

   // 1. 对整个 channel 上锁
   lock(&c.lock)

   // 2. 检查 channel 是否已经被关闭,且缓存中没有数据
   if c.closed != 0 && c.qcount == 0 {
      if raceenabled {
         raceacquire(c.raceaddr())
      }
      unlock(&c.lock)
      if ep != nil {
         // 清除 ep 指向的数据
         typedmemclr(c.elemtype, ep)
      }
      return true, false
   }

   // 3. 如果有等待中的 sender,则:
   // ① 如果 channel 没有缓存,则直接从 sender 中取走数据;
   // ② 如果 channel 有缓存,则缓存必然满了,那么就取缓存队列的队友数据,然后将 sender 的数据塞到队尾
   if sg := c.sendq.dequeue(); sg != nil {
      recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
      return true, true
   }

   // 4. 没有等待中的 sender,且缓存中有数据,则时间从缓存队列中取出数据,并解锁返回
   if c.qcount > 0 {
      // Receive directly from queue
      qp := chanbuf(c, c.recvx)
      if raceenabled {
         racenotify(c, c.recvx, nil)
      }
      if ep != nil {
         typedmemmove(c.elemtype, ep, qp)
      }
      typedmemclr(c.elemtype, qp)
      c.recvx++
      if c.recvx == c.dataqsiz {
         c.recvx = 0
      }
      c.qcount--
      unlock(&c.lock)
      return true, true
   }

   if !block {
      unlock(&c.lock)
      return false, false
   }

   // 5. 缓存中没有数据,则将自己包装成 sudog,放入 recvq 队列中,休眠等待唤醒
   gp := getg()
   mysg := acquireSudog()
   mysg.releasetime = 0
   if t0 != 0 {
      mysg.releasetime = -1
   }
   mysg.elem = ep
   mysg.waitlink = nil
   gp.waiting = mysg
   mysg.g = gp
   mysg.isSelect = false
   mysg.c = c
   gp.param = nil
   c.recvq.enqueue(mysg)		// 入队
   atomic.Store8(&gp.parkingOnChan, 1)
   gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)	// 休眠并解锁

   // 6. 被唤醒,唤醒的时候,sender 已经将数据拷贝到 receiver 的 ep 所指向的位置了(也就是 chansend 的第 3 步)
   if mysg != gp.waiting {
      throw("G waiting list is corrupted")
   }
   gp.waiting = nil
   gp.activeStackChans = false
   if mysg.releasetime > 0 {
      blockevent(mysg.releasetime-t0, 2)
   }
   success := mysg.success
   gp.param = nil
   mysg.c = nil
   releaseSudog(mysg)
   return true, success
}

# 14.4 非阻塞 channel

# 14.4.1 select

  • 同时存在接收、发送和默认路径;
  • 首先查看是否有可以立即执行的 case:
    • 有:执行相关 case;
    • 无:
      • 有 default 则执行 default;
      • 没有 default 则将当前协程注册到所有的 channel 中,休眠等待
c1 := make(chan int, 5)
c2 := make(chan int, 5)

select {
  case data := <-c1:
  fmt.Println("c1:", data)
  case data := <-c2:
  fmt.Println("c2:", data)
  default:
  fmt.Println("no data")
}

# 14.4.2 Timer

<- time.NewTimer(time.Second).C
上次更新: 8/27/2022, 5:58:36 PM