# 14. channel
总结
channel 底层是 hchan
结构体,包含了:
- 一个环形缓存队列;
- 接受者队列、发送者队列;
- 锁;
- 关闭标志;
发送:chansend()
- 直接发送
- 塞入缓存
- 休眠等待
接收:chanrecv()
- 直接接收
- 从缓存拿
- 休眠等待
# 14.1 底层
channel 底层是 runtime/chan.go
的 hchan
结构体:
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
type waitq struct {
first *sudog
last *sudog
}
其中以下五个字段组成了一个 环形缓冲队列:
qcount uint // 当前在队列中的数据个数
dataqsiz uint // 环形队列大小
buf unsafe.Pointer // 指向环形队列的指针
elemsize uint16 // 每个数据大小
elemtype *_type // 数据类型
- 环形缓存可以大幅降低 GC 的开销。
其中还有四个字段组成了两个 链表:
sendx uint // 下次要发送的数据的 index
recvx uint // 下个要接收的数据的 index
recvq waitq // 接受者等待队列
sendq waitq // 发送者等待队列
还有一个 锁:
lock mutex // 保存 hchan 中的所有字段
- 互斥锁并不是排队发送 / 接收数据,它保护的是 hchan 结构体本身。
还有一个 标记:
closed uint32 // 标记 channel 是否已经关闭
# 14.2 发送
- 对整个 channel 上锁;
- 检查 channel 是否已经关闭,若关系,这 panic;
- 检查是否有正在等待中的协程:
- 有的话,直接将数据拷贝给它,然后唤醒它;
- 没有,则检查缓存队列是否已满:
- 没有满,则将数据塞入缓存队列中;
- 已满,则把自己包装成 sudog 放入 sendq 队列,休眠并解锁,等待唤醒。被唤醒后数据已经被取走了,当下 sudog 负责维护其他的数据;
- 解锁。
// entry point for c <- x from compiled code
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
// 1. 对整个 channel 上锁
lock(&c.lock)
// 2. 检查 channel 是否已经关闭了,不能发送数据到已经关闭的 channel
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
// 3. 检查是否有等待中的协程,有的话,直接将数据拷贝给它,然后唤醒它
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// 没有等待中的协程,往下走
// 4. 如果缓存队列还没满,则将数据放入缓存中
if c.qcount < c.dataqsiz {
qp := chanbuf(c, c.sendx)
if raceenabled {
racenotify(c, c.sendx, nil)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
if !block {
unlock(&c.lock)
return false
}
// 5. 缓存队列满了,则休眠等待,等待唤醒
gp := getg() // 获取当前协程,当下是 sender
mysg := acquireSudog() // 将自己包装成 sudog,并赋初始值
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep // 赋值数据
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg) // 入队
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) // 休眠并解锁
KeepAlive(ep)
// 6. 被唤醒
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg) // 被唤醒的时候,数据其实已经被取走了,mysg 负责维护其他数据
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
return true
}
# 14.3 接收
- 对整个 channel 上锁;
- 如果 channel 已经关闭,且缓存中没有数据,如果这个时候 eq 指向的地址有数据,则清空数据;
- 检查是否有等待中的 sender:
- 有,则看 channel 有无缓存:
- 没有,则直接从 sender 中取走数据,唤醒 sender;
- 有,则说明缓存已满,从缓存队列队头取走数据,然后将 sender 数据塞到队尾,唤醒 sender;
- 无,则看 channel 有无缓存:
- 有,则直接从缓存中取走数据,维护队列索引,解锁返回;
- 无,则将自己包装成 sudog,放入 recvq 休眠等待唤醒,被唤醒的时候,sender 已经将数据拷贝到位了;
- 有,则看 channel 有无缓存:
- 解锁。
// entry points for <- c from compiled code
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 1. 对整个 channel 上锁
lock(&c.lock)
// 2. 检查 channel 是否已经被关闭,且缓存中没有数据
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
// 清除 ep 指向的数据
typedmemclr(c.elemtype, ep)
}
return true, false
}
// 3. 如果有等待中的 sender,则:
// ① 如果 channel 没有缓存,则直接从 sender 中取走数据;
// ② 如果 channel 有缓存,则缓存必然满了,那么就取缓存队列的队友数据,然后将 sender 的数据塞到队尾
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
// 4. 没有等待中的 sender,且缓存中有数据,则时间从缓存队列中取出数据,并解锁返回
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
if !block {
unlock(&c.lock)
return false, false
}
// 5. 缓存中没有数据,则将自己包装成 sudog,放入 recvq 队列中,休眠等待唤醒
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg) // 入队
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2) // 休眠并解锁
// 6. 被唤醒,唤醒的时候,sender 已经将数据拷贝到 receiver 的 ep 所指向的位置了(也就是 chansend 的第 3 步)
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success
}
# 14.4 非阻塞 channel
# 14.4.1 select
- 同时存在接收、发送和默认路径;
- 首先查看是否有可以立即执行的 case:
- 有:执行相关 case;
- 无:
- 有 default 则执行 default;
- 没有 default 则将当前协程注册到所有的 channel 中,休眠等待
c1 := make(chan int, 5)
c2 := make(chan int, 5)
select {
case data := <-c1:
fmt.Println("c1:", data)
case data := <-c2:
fmt.Println("c2:", data)
default:
fmt.Println("no data")
}
# 14.4.2 Timer
<- time.NewTimer(time.Second).C