channel 实现原理
- Goroutine 和 channel 是 Go 语言并发编程的 两大基石。Goroutine 用于执行并发任务,channel 用于 goroutine 之间的同步、通信。
- channel是Golang在语言层面提供的goroutine间的通信方式,比Unix管道更易用也更轻便, channel主要用于进程内各goroutine间通信.
# channel 数据结构 !!!
src/runtime/chan.go:hchan
type hchan struct { qcount uint // 当前队列中剩余元素个数 dataqsiz uint // 环形队列长度,即可以存放的元素个数 buf unsafe.Pointer // 环形队列指针 elemsize uint16 // 每个元素的大小 closed uint32 // 标识关闭状态 elemtype *_type // 元素类型 sendx uint // 队列下标,指示元素写入时存放到队列中的位置 recvx uint // 队列下标,指示元素从队列的该位置读出 recvq waitq // 等待读消息的goroutine队列 sendq waitq // 等待写消息的goroutine队列 lock mutex // 互斥锁,chan不允许并发读写 }
1
2
3
4
5
6
7
8
9
10
11
12
13关闭的chan,并不是nil
Channel是异步进行的, channel存在3种状态:
- nil,未初始化的状态,只进行了声明,或者手动赋值为nil
- active,正常的channel,可读或者可写
- closed,已关闭,千万不要误认为: 关闭channel后channel的值是nil
# 1. 环形队列
优点
- 避免假溢出现象(由于入队和出队的操作,头尾指针只增加不减少,致使被删元素的空间永远无法重新利用,当队列继续存储元素时,出现尾指针已经到达了队列尾,而实际头指针前面并未满的情况),可以将队列空间充分重复利用
- 首尾相连的FIFO的数据结构,采用数据的线性空间,数据组织简单,能快速知道队列是否满/空
- 广泛用于网络数据收发,和不同程序间数据交换,均使用了环形队列
实现原理
- 内存上并没有环形的结构,因此环形队列实际上是数组的线性空间来实现的。
- 当数据到了尾部该如何处理呢?它将转回到原来位置进行处理,通过取模操作来实现
环形队列的几个判断条件
- front:指向队列的第一个元素,初始值front=0
- rear: 指向队列的最后一个元素的后一个位置(预留一个空间作为约定),初始值rear=0
- maxSize: 数组的最大容量
- 队空:front == rear
- 队满:(rear+1)%maxSize == front
- 队列中的有效数据个数:(rear+maxSize-front)% maxSize
chan内部实现了一个环形队列作为其缓冲区,队列的长度是创建chan时指定的。
- dataqsiz: 指示了队列长度为6,即可缓存6个元素;
- buf: 指向队列的内存,队列中还剩余两个元素;
- qcount: 表示队列中还有两个元素;
- sendx : 后续写入的数据存储的位置,取值[0, 6);
- recvx: 指示从该位置读取数据, 取值[0, 6);
# 2. 其他
从channel读数据,如果channel缓冲区为空或者没有缓冲区,当前goroutine会被阻塞。
向channel写数据,如果channel缓冲区已满或者没有缓冲区,当前goroutine会被阻塞。
被阻塞的goroutine将会挂在channel的等待队列中:
因读阻塞的goroutine会被向channel写入数据的goroutine唤醒;
因写阻塞的goroutine会被从channel读数据的goroutine唤醒;
一个channel只能传递一种类型的值,类型信息存储在hchan数据结构中。
- elemtype代表类型,用于数据传递过程中的赋值;
- elemsize代表类型大小,用于在buf中定位元素位置。
一个channel同时仅允许被一个goroutine读写,为简单起见,本章后续部分说明读写过程时不再涉及加锁和解锁。
# channel 创建
创建channel的过程实际上是初始化hchan结构。其中类型信息和缓冲区长度由make语句传入,buf的大小则与元素大小和缓冲区长度共同决定。
// 伪代码 func makechan(t *chantype, size int) *hchan { var c *hchan c = new(hchan) c.buf = malloc(元素类型大小*size) c.elemsize = 元素类型大小 c.elemtype = 元素类型 c.dataqsiz = size return c }
1
2
3
4
5
6
7
8
9
10
11
# channel 读写 !!!
# 1. 写 channel
- 如果等待接收队列
recvq
不为空, 说明channel缓冲区没有数据或者说没有缓冲区, 此时直接从recvq取出G,并把数据写入,最后把该G唤醒,结束发送过程; - 如果等待接收队列
recvq
为空, 如果缓冲区中有空余位置,将数据写入缓冲区,结束发送过程; - 如果等待接收队列
recvq
为空, 如果缓冲区中没有空余位置,将待发送数据写入G,将当前G加入sendq,进入睡眠,等待被读goroutine唤醒;
# 2. 读 channel
如果等待发送队列
send
为空, 缓冲区中有数据, 从缓冲区取出一个数据结束接收过程;如果等待发送队列
send
为空, 且没有缓冲区, 将当前goroutine加入recvq, 等待被写入的goroutine唤醒;如果等待发送队列
send
不为空, 且没有缓冲区, 直接从sendq中取出G,把G中数据读出,最后把G唤醒,结束读取过程;如果等待发送队列sendq不为空,此时说明缓冲区已满,从缓冲区中首部读出数据,把G中数据写入缓冲区尾部,把G唤醒,结束读取过程;
# 3. 关闭 channel
- 关闭channel时会把recvq中的G全部唤醒,本该写入G的数据位置为nil。把sendq中的G全部唤醒,但这些G会panic。
- panic场景:
- 关闭值为nil的channel
- 关闭已经被关闭的channel
- 向已经关闭的channel写数据
# channel 常见用法
# 1. select
select可以监控多channel,比如监控多个channel,当其中某一个channel有数据时,就从其读出数据。
demo
for { select { case e := <- chan1 : fmt.Printf("Get element from chan1: %d\n", e) case e := <- chan2 : fmt.Printf("Get element from chan2: %d\n", e) default: fmt.Printf("No element in chan1 and chan2.\n") time.Sleep(1 * time.Second) } }
1
2
3
4
5
6
7
8
9
10
11select的case语句读channel不会阻塞,尽管channel中没有数据。这是由于case语句编译后调用读channel时会明确传入不阻塞的参数,此时读不到数据时不会将当前goroutine加入到等待队列,而是直接返回。
# 2. range
通过range可以持续从channel中读出数据,好像在遍历一个数组一样,当channel中没有数据时会阻塞当前goroutine,与读channel时阻塞处理机制一样。 但是当channel关闭时, range自动跳出!
func main() { cha := make(chan int) go func() { for i := 0; i < 10; i++ { cha <- 1 } }() for e := range cha { println(e) } }
1
2
3
4
5
6
7
8
9
10
11
# 总结
空读写阻塞,写关闭异常,读关闭空零
什么时候panic:
- 写已经关闭的channel
- 关闭已经关闭的channel
- 关闭值为nil的channel
什么时候阻塞:
- 读写为nil的channel
- 缓冲区满了而且读写未全部就绪
读关闭空零: 读取关闭的channel, 会将缓冲区数据全部读出, 然后读出零值.
当channel关闭时, range自动跳出!