如何优雅地关闭通道
关于 channel 有几个使用不便的地方:
1)在不改变 channel 自身状态的情况下,无法得知一个 channel 是否关闭。
2)关闭一个 closed channel 会导致 panic。所以,如果关闭 channel 的一方在不知道 channel 是否处于关闭状态时就去贸然关闭 channel 是很危险的事情。
3)向一个 closed channel 发送数据会导致 panic。所以,如果向 channel 发送数据的一方不知道 channel 是否处于关闭状态时就去贸然向 channel 发送数据也是很危险的事情。
一个比较粗糙的检查 channel 是否关闭的函数如下:
func IsClosed(ch <-chan T) bool {
select {
case <-ch:
return true
default:
}
return false
}
func main() {
c := make(chan T)
fmt.Println(IsClosed(c)) // false
close(c)
fmt.Println(IsClosed(c)) // true
}
2
3
4
5
6
7
8
9
10
11
12
13
14
仔细看一下代码,其实存在很多问题。首先,IsClosed 函数是一个有副作用的函数:每调用一次,都会读出 channel 里的一个元素,改变了 channel 的状态。这不是一个好的函数。
其次,IsClosed 函数返回的结果仅代表调用的那个瞬间,并不能保证调用之后会不会有其他 goroutine 对它进行了一些操作,改变了它的状态。例如,IsClosed 函数返回 true,但这时有另一个 goroutine 关闭了 channel,如果还拿着这个过时的 “channel 未关闭” 的信息,向其发送数据,就会导致 panic。当然,一个 channel 不会被重复关闭两次,如果 IsClosed 函数返回的结果是 true,说明 channel 是真的关闭了。
有一条广泛流传的关闭 channel 的原则:
Don’t close a channel from the receiver side and don’t close a channel if the channel has multiple concurrent senders.
翻译过来:不要从 receiver 侧关闭 channel;也不要在有多个 sender 时,关闭 channel。
比较好理解,向 channel 发送元素的就是 sender,因此 sender 可以决定何时不发送数据,并且关闭 channel。但是如果有多个 sender,某个 sender 同样无法确定其他 sender 的情况,这时也不能贸然关闭 channel。
其实上面所说的并不是最本质的,最本质的原则就只有一条:Don’t close (or send values to) closed Channels.
也就是不要关闭一个 closed channel,也不要向一个 closed channel 发送数据。
有两个不那么优雅地关闭 channel 的方法:
1)使用 defer-recover 机制,放心大胆地关闭 channel 或者向 channel 发送数据。即使发生了 panic,也有 defer-recover 在兜底。
2)使用 sync.Once 来保证只关闭一次。
那到底应该如何优雅地关闭 channel?根据 sender 和 receiver 的个数,分下面几种情况:
1)一个 sender,一个 receiver。
2)一个 sender,M 个 receiver。
3)N 个 sender,一个 receiver。
4)N 个 sender,M 个 receiver。
对于第 1、2 种情况,只有一个 sender 的情况就不用说了,直接从 sender 端关闭就好了,没有问题。重点关注第 3、4 种情况。
第 3 种情形下, 优雅关闭 channel 的方法是:The only receiver says “please stop sending more” by closing an additional signal channel。翻译一下:唯一的接收者通过关闭一个第三方充当信号的 channel,来关闭 channel。
所以,第 3 种情形的解决方案就是增加一个传递关闭信号的 channel,receiver 通过关闭信号 channel 下达关闭数据 channel 的指令。当 senders 监听到关闭信号后,停止发送数据。代码如下:
func main() {
rand.Seed(time.Now().UnixNano())
const Max = 100000
const NumSenders = 1000
dataCh := make(chan int, 100)
stopCh := make(chan struct{})
// senders
for i := 0; i < NumSenders; i++ {
go func() {
for {
select {
case <- stopCh:
return
case dataCh <- rand.Intn(Max):
}
}
}()
}
// the receiver
go func() {
for value := range dataCh {
if value == Max-1 {
fmt.Println("send stop signal to senders.")
close(stopCh)
return
}
fmt.Println(value)
}
}()
select {
case <- time.After(time.Hour):
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
这里的 stopCh 就是信号 channel,它本身只有一个 sender,因此可以直接关闭它。senders 收到了关闭信号后,select 分支 “case <- stopCh” 被选中,return 退出函数,不再发送数据。
需要说明的是,上面的代码并没有明确关闭 dataCh。在 Go 语言中,对于一个 channel,如果最终没有任何 goroutine 引用它,不管 channel 有没有被关闭,最终都会被 GC 回收。所以,在这种情形下,所谓的优雅地关闭 channel 就是不关闭 channel,让 GC 代劳。
第 4 种情形下,优雅关闭 channel 的方法是:Any one of them says“let’s end the game”by notifying a moderator to close an additional signal channel(通知中间人来关闭一个额外的信号 channel,从而关闭 channel)。
和第 3 种情形不同,这里有 M 个 receiver,如果直接采取第 3 种解决方案,由 receiver 直接关闭 stopCh 的话,M 个 receiver 就会重复关闭一个 channel,导致 panic。因此需要增加一个 “中间人”,M 个 receiver 都向它发送关闭 dataCh 的 “请求”,中间人收到第一个请求后,就会直接下达关闭 dataCh 的指令。通过关闭 stopCh,这时就不会发生重复关闭的情况,因为 stopCh 的发送方只有中间人一个。另外,这里的 N 个 sender 也可以向中间人发送关闭 dataCh 的请求。代码如下:
// senders 和 receivers 都可以发送停止信息
func demo02() {
rand.Seed(time.Now().Unix())
const Max = 100000
const NumReceivers = 10
const NumSenders = 1000
dataCh := make(chan int, 100)
stopCh := make(chan struct{})
// It must be a buffered channel.
toStop := make(chan string, 1)
var stoppedBy string
go func() {
stoppedBy = <-toStop
fmt.Println(stoppedBy)
close(stopCh)
}()
// senders
for i := 0; i < NumSenders; i++ {
go func(id string) {
for {
value := rand.Intn(Max)
// 如果 value 为0,就发送停止信号
if value == 0 {
select {
case toStop <- "sender#" + id:
default:
}
return
}
// 如果接收到停止信号,就跳出
// 否则就发送数据
select {
case <-stopCh:
return
case dataCh <- value:
}
}
}(strconv.Itoa(i))
}
// receivers
for i := 0; i < NumReceivers; i++ {
go func(id string) {
for {
// 如果接收到停止信号 就跳出
// 否则就接受数据
// 如果接收到的数据为指定数据,也停止接受
select {
case <-stopCh:
return
case value := <-dataCh:
if value == Max-1 {
select {
case toStop <- "receiver#" + id:
default:
}
return
}
fmt.Println(value)
}
}
}(strconv.Itoa(i))
}
select {
case <-time.After(time.Hour):
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
代码里 toStop 就是中间人的角色,用它来接收 senders 和 receivers 发送过来的关闭 dataCh 请求信号。
代码里将 toStop 声明成了一个缓冲型的 channel。 假设 toStop 声明的是一个非缓冲型的 channel,那么第一个发送的关闭 dataCh 请求可能会丢失,因为无论是 sender 还是 receiver 都是通过 select 语句来发送请求, 如果中间人所在的 goroutine 没有准备好接收信号,那 sender 或 receiver 发送信号的 select 语句就不会被选中,而是直接执行 default 选项,什么也不做。这样, 第一个关闭 dataCh 的请求就会丢失。
如果把 toStop 的容量声明成 Num (senders) + Num (receivers),那发送 dataCh 请求的部分可以改成更简洁的形式:
...
toStop := make(chan string, NumReceivers + NumSenders)
...
value := rand.Intn(Max)
if value == 0 {
toStop <- "sender#" + id
return
}
...
if value == Max-1 {
toStop <- "receiver#" + id
return
}
...
2
3
4
5
6
7
8
9
10
11
12
13
14
直接向 toStop 发送请求,因为 toStop 容量足够大,所以不用担心阻塞,自然也就不用 select 语句再加一个 default case 来避免阻塞。
可以看到,这里同样没有真正关闭 dataCh,原因同第 3 种情形。
以上,就是最基本的一些情形,但已经能覆盖几乎所有的情况及其变种了。读者只要记住:
Don’t close a channel from the receiver side and don’t close a channel if the channel has multiple concurrent senders.
以及更本质的原则:
Don’t close (or send values to) closed Channels.