
go select 是一种仅能用于channel发送和接收消息的专用语句,此语句运行期间是阻塞的。select是go在语言层面提供的I/O多路复用的机制,专门检测多个channel是否准备完毕,可读或可写。它的调用栈是:
func Select(cases []SelectCase) (chosen int, recv Value, recvOK bool) -> reflect/value.Select
func reflect_rselect(cases []runtimeSelect) (int, bool) -> runtime/select.go
func selectgo(cas0 scase, order0 uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) -> reflect/select.go
需要注意的是,编译器在中间代码生成期间会根据 select 中 case 的不同对控制语句进行优化:
1) case数量为0的话,compiler会优化为gopark阻塞线程
select{} -> runtime.block() -> gopark()
// gc/select/walkselectcases
// optimization: zero-case select
if n == 0 {
return []*Node{mkcall("block", nil, nil)}// 这里编译器优化为调用runtime.block
}
// runtime.block. 最终调用还是 gopark() 阻塞goroutine
func block() {
gopark(nil, nil, waitReasonSelectNoCases, traceEvGoStop, 1) // forever
}2) case数量为1的话,compiler会优化为if
select{ if v<-foo {
case <-foo: -> ...
} }3) case数量为1,有default的话compiler会优化为 if else
select{ if v<-foo {
case <-foo: -> ....
default: } else {
} }除了以上这些特殊的的情况,默认的情况下,select 语句会在编译阶段经过如下过程的处理:
1) 在reflect_rselect中,将所有的 case 转换成包含 channel 以及类型等信息的 scase 结构体
2) 调用运行时函数 selectgo 获取被选择的 scase 结构体索引,如果当前的 scase 是一个接收数据的操作,还会返回一个指示当前 case 是否是接收的布尔值
3) 通过 for 循环生成一组 if 语句,在语句中判断自己是不是被选中的 case
runtimeSelect这个结构体是select的case的描述,它的结构如下:
type runtimeSelect struct {
dir selectDir // 表示case的类型,读chan、写chan、default
typ unsafe.Pointer // channel type (not used here)
ch *hchan // channel
val unsafe.Pointer // ptr to data (SendDir) or ptr to receive buffer (RecvDir) case读或写的数据指针
}在reflect_rselect中,我们将所有select的case转化为scase,然后调用selectgo函数,下面是scase结构描述和reflect_rselect的一些重要步骤:
type scase struct {
c *hchan // chan
elem unsafe.Pointer // data element
}reflect_rselect的关键代码流程:
func reflect_rselect(cases []runtimeSelect) (int, bool) {
if len(cases) == 0 {
block()
}
sel := make([]scase, len(cases))
orig := make([]int, len(cases))
nsends, nrecvs := 0, 0
dflt := -1
// 将select中的case转化为scase
for i, rc := range cases {
var j int
switch rc.dir {
case selectDefault:
dflt = i
continue
case selectSend:
j = nsends
nsends++
case selectRecv:
nrecvs++
j = len(cases) - nrecvs
}
// 这儿的处理方式是,sel数组中前面是send的case,后面是recv的case。
sel[j] = scase{c: rc.ch, elem: rc.val}
orig[j] = i
}
// 只有default的case,直接就选择default了。
if nsends+nrecvs == 0 {
return dflt, false
}
...
//调用selectgo函数,选择对应的case执行
chosen, recvOK := selectgo(&sel[0], &order[0], pc0, nsends, nrecvs, dflt == -1)
// 根据selectgo返回值选择case
if chosen < 0 {
chosen = dflt
} else {
chosen = orig[chosen]
}
return chosen, recvOK
}接下来可以展开介绍 selectgo 函数的实现原理了。
func selectgo(cas0 scase, order0 uint16, nsends, nrecvs int, block bool) (int, bool) { } selectgo 是会在运行期间运行的函数,这个函数的主要作用就是从 select 控制结构中的多个 case 中选择一个需要执行的 case,随后的多个 if 条件语句就会根据 selectgo 的返回值执行相应的语句。
selectgo 函数首先会进行执行必要的一些初始化操作,也就是决定处理 case 的两个顺序,其中一个是 pollOrder 另一个是 lockOrder。
func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) {
cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))
scases := cas1[:ncases:ncases]
pollorder := order1[:ncases:ncases]
lockorder := order1[ncases:][:ncases:ncases]
norder := 0
for i := range scases {
cas := &scases[i]
// Omit cases without channels from the poll and lock orders.
if cas.c == nil {
cas.elem = nil // allow GC
continue
}
j := fastrandn(uint32(norder + 1))
pollorder[norder] = pollorder[j]
pollorder[j] = uint16(i)
norder++
}
pollorder = pollorder[:norder]
lockorder = lockorder[:norder]
// ... 代码较长不粘贴了,没有什么特殊的就是实现一个堆排序
// 根据channel的内存地址使用堆排序的方式进行排序
// 为什么要排序呢,因为case中可能存在操作相同的chan,如果不排序,那么
// 就可能出现对同一个chan上锁,导致卡死。排序后如果是相同的chan就不
// 加锁了
// ... 代码较长不粘贴了 . . .
// lock all the channels involved in the select
sellock(scases, lockorder)
}channel 的轮询顺序是通过 fastrandn函数 随机生成的,这其实就导致了如果多个 channel 同时响应,select 会随机选择其中的一个执行;而另一个 lockOrder 就是根据 channel 的地址确定的,根据相同的顺序锁定 channel 能够避免死锁的发生,最后调用的 sellock 就会按照之前生成的顺序锁定所有的 channel。
当我们为 select 语句确定了轮询和锁定的顺序并锁定了所有的 channel 之后就会开始进入 select 的主循环,查找或者等待 channel 准备就绪,在这段循环的代码中,我们会分三种不同的情况处理 select 中的多个 case:
● caseRecv — 当前 case 会从 channel 中接收数据;
1)如果当前 channel 的 sendq 上有等待的 goroutine 就会直接跳到 recv 标签所在的代码段,从 goroutine 中获取最新发送的数据;
2)如果当前 channel 的缓冲区不为空就会跳到 bufrecv 标签处从缓冲区中获取数据;
3)如果当前 channel 已经被关闭就会跳到 rclose 做一些清除的收尾工作;
● caseSend — 当前 case 会向 channel 发送数据;
1)如果当前 channel 已经被关闭就会直接跳到 rclose 代码段;
2)如果当前 channel 的 recvq 上有等待的 goroutine 就会跳到 send 代码段向 channel 直接发送数据;
3)如果当前 channel 的缓冲区有空间可以接受数据,就会跳到 bufsend标签处往缓冲区中发送数据;
● caseDefault — 当前 case 表示默认情况,如果循环执行到了这种情况就表示前面的所有 case 都没有被执行,所以这里会直接解锁所有的 channel 并退出 selectgo 函数,这时也就意味着当前 select 结构中的其他收发语句都是非阻塞的。
下面是关键代码片段:
// pass 1 - look for something already waiting
// 看看是否有准备就绪的chan
for _, casei := range pollorder {
casi = int(casei)
cas = &scases[casi]
c = cas.c
// 是接受数据的case
if casi >= nsends {
// chan有等待的goroutine
sg = c.sendq.dequeue()
if sg != nil {
goto recv
}
// chan的缓冲区不为空
if c.qcount > 0 {
goto bufrecv
}
// chan已经关闭
if c.closed != 0 {
goto rclose
}
// 是发送数据的case
} else {
// chan已经关闭
if c.closed != 0 {
goto sclose
}
// chan的recvq上有等待的goroutine
sg = c.recvq.dequeue()
if sg != nil {
goto send
}
// 往chan的缓冲区写数据
if c.qcount < c.dataqsiz {
goto bufsend
}
}
}
// 如果select语句中有default分支
if !block {
selunlock(scases, lockorder)
casi = -1
goto retc
}以上其实是循环执行的第一次遍历,主要作用就是寻找所有 case 中 channel 是否有可以立刻被处理的情况,无论是在包含等待的 goroutine 还是缓冲区中存在数据,只要满足条件就会立刻处理,如果不能立刻找到活跃的 channel 就会进入循环的下一个过程,按照需要将当前的 goroutine 加入到所有 channel 的 sendq 或者 recvq 队列中:
// pass 2 - enqueue on all chans
// 没有准备好的chan,就将G加入到所有的chan中sendq或者recvq中,并挂起等待
var(
gp *g
nextp **sudog
)
// 获取当前的g
gp = getg()
// 下面会给根据每个case创建一个sudog,加入到当前g的waiting列表
nextp = &gp.waiting
for _, casei := range lockorder {
casi = int(casei)
cas = &scases[casi]
c = cas.c
// 创建一个sudog,管理当前的g
// 这块的处理和chan阻塞的处理方式一样,也是要保存自己当前的g
// 目的都是后面需要将当前g挂起
sg := acquireSudog()
sg.g = gp
sg.isSelect = true
sg.elem = cas.elem
sg.releasetime = 0
if t0 != 0 {
sg.releasetime = -1
}
sg.c = c
// sudog结构体都会被串成链表附着在当前 goroutine 上
*nextp = sg
nextp = &sg.waitlink
if casi < nsends {
c.sendq.enqueue(sg)
} else {
c.recvq.enqueue(sg)
}
}
// 调用 gopark 函数挂起当前的 goroutine 等待调度器的唤醒
// 我们创建的sudog加入到各个chan中,如果有一个sudog被唤醒,将会执行
// gopark后面的逻辑
gp.param = nil
atomic.Store8(&gp.parkingOnChan, 1)
gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1)这里创建 sudog 并入队的过程其实和 channel 中直接进行发送和接收时的过程几乎完全相同,只是除了在入队之外,这些 sudog 结构体都会被串成链表附着在当前 goroutine 上,在入队之后会调用 gopark 函数挂起当前的 goroutine 等待调度器的唤醒。

等到 select 对应的某个channel 准备好之后,当前 goroutine 就会被调度器唤醒,这时就会继续执行 selectgo 函数中剩下的逻辑,也就是从上面 入队的 sudog 结构体中获取数据。
gp.selectDone = 0
// 这儿的gp的param应该是gopark之后被唤醒,导致被唤醒的那个sudog
sg = (*sudog)(gp.param)
gp.param = nil
casi := -1
cas := nil
caseSuccess := false
sglist := gp.waiting
// 被唤醒后,清除我们之前生成的sudog的一些数据.
for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
sg1.isSelect = false
sg1.elem = nil
sg1.c = nil
}
gp.waiting = nil
// 遍历case
for _, casei := range lockorder {
k = &scases[casei]
// 找到被唤醒的sudog
if sg == sglist {
// sg has already been dequeued by the G that woke us up.
// 获取到case里面的chan
casi = int(casei)
cas = k
caseSuccess = sglist.success
if sglist.releasetime > 0 {
caseReleaseTime = sglist.releasetime
}
// 不是唤醒的sudog,我们也需要将它从其他case的chan里面的队列拿出来
// 因为被唤醒的sudog已经被它对应的chan执行出队的操作,不再需要
} else {
c = k.c
if int(casei) < nsends {
c.sendq.dequeueSudoG(sglist)
} else {
c.recvq.dequeueSudoG(sglist)
}
}
// 持续寻找被唤醒的那个sudog,类十余一个迭代器
sgnext = sglist.waitlink
sglist.waitlink = nil
// 我们已经不再需要之前创建的sudog了,清理掉
releaseSudog(sglist)
sglist = sgnext
}
// 就是这个case的chan有数据被唤醒的,我们可以操作这个chan了
c := cas.c
// 这儿的意思是,如果我们的sudog被唤醒的原因是因为chan被关闭了而被唤醒的
// 那么就走sclose逻辑,sclose里面会panic。因为不能往一个被关闭的chan
// 里面send数据,但是可以往关闭的chan里面读取数据
if casi < nsends {
if !caseSuccess {
goto sclose
}
} else {
recvOK = caseSuccess
}
// 解锁所有的case
selunlock(scases, lockorder)
goto retc在第三次根据 lockOrder 遍历全部 case 的过程中,我们会先获取 goroutine 接收到的参数 param,这个参数其实就是被唤醒的 sudog 结构,我们会依次对比所有 case 对应的 sudog 结构找到被唤醒的 case 并释放其他未被使用的 sudog 结构。由于当前的 select 结构已经挑选了其中的一个 case 进行执行,那么剩下 case 中没有被用到的 sudog 其实就会直接忽略并且释放掉了,为了不影响 channel 的正常使用,我们还是需要将这些废弃的 sudog 从 channel 中出队;而除此之外的发生事件导致我们被唤醒的 sudog 结构已经在 channel 进行收发时就已经出队了,不需要我们再次处理。
到这儿我们的select逻辑分析完毕了,还有个遗留的问题,上文我们提及到如果select的时候,不需要阻塞等待chan的响应。而是直接可以从准备好的case里面的chan读或者取,描述那一段流程的时候,提及的一些列goto的标签。这些goto标签的代码的执行过程其实都非常简单,都只是向 channel 中发送或者从缓冲区中直接获取新的数据,相当于是实现了间接完成chan的一些读写操作流程。
1) recv
recv标签是case是从一个chan接收数据,如果当前 channel 的 sendq 上有等待的goroutine,之前讲解chan的recv操作时候有讲到。
// 这个recv是chan的recv函数,其实就是从chan获取数据
recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
recvOK = true
goto retc2) bufrecv
bufrecv标签是从一个chan接收数据,如果chan的缓冲区有数据,就可以直接拿了。
// 向chan的缓冲区获取数据
recvOK = true
// chan的chanbuf函数,获取chan的buf指针
qp = chanbuf(c, c.recvx)
// move数据到case的变量
if cas.elem != nil {
typedmemmove(c.elemtype, cas.elem, qp)
}
// 实现chan从缓冲区读数据的操作
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
selunlock(scases, lockorder)
goto retc3)rclose
rclose标签是对关闭的chan进行读取数据
selunlock(scases, lockorder)
recvOK = false
// 实现了从关闭的chan读取数据。
if cas.elem != nil {
typedmemclr(c.elemtype, cas.elem)
}
goto retc4)sclose
往关闭的chan写数据
// send on closed channel
selunlock(scases, lockorder)
// 王close的chan写数据是panic的
panic(plainError("send on closed channel"))5)send
send标签是case是从一个chan发送数据,当前channel 的 recvq 上有等待的goroutine 就会跳到 send 代码段向channel 直接发送数据。
// chan的send函数,往chan发送数据
send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
goto retc6)bufsend
bufsend标签是case从一个chan中send数据,chan的缓冲区有空间可以写数据
// move case的数据到chan中。
typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
// 下面间接实现了chan的一些send操作
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
selunlock(scases, lockorder)
goto retcselect的selectgo函数会按照以下的过程执行:
1.随机生成一个遍历的轮询顺序 pollOrder 并根据 channel 地址生成一个用于遍历的锁定顺序 lockOrder;
2.根据 pollOrder 遍历所有的 case 查看是否有可以立刻处理的 channel 消息;如果有消息就直接获取 case 对应的索引并返回;如果没有消息就会创建 sudog 结构体,将当前 goroutine 加入到所有相关 channel 的 sendq 和 recvq 队列中并调用 gopark 触发调度器的调度;
3.当调度器唤醒当前 goroutine 时就会再次按照 lockOrder 遍历所有的 case,从中查找需要被处理的 sudog 结构并返回对应的索引;
然而并不是所有的 select 控制结构都会走到 selectgo 上,很多情况都会被直接优化掉,没有机会调用 selectgo 函数,例如我们之前说的case中只有一个default等情况。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。