Ezreal 书架 Ezreal 书架
Home
  • 《Go程序员面试笔试宝典》
  • 《RabbitMQ 实战指南》
  • 《深入理解kafka》
  • MySQL45讲
  • 透视HTTP协议
  • 结构化数据的分布式存储系统
  • Raft 共识算法
Home
  • 《Go程序员面试笔试宝典》
  • 《RabbitMQ 实战指南》
  • 《深入理解kafka》
  • MySQL45讲
  • 透视HTTP协议
  • 结构化数据的分布式存储系统
  • Raft 共识算法
  • 逃逸分析

  • 延迟语句

  • 数据容器

  • 通道

    • CSP是什么
    • 通道有哪些应用
    • 通道的底结构
    • 通道的关闭过程发生了什么
    • 从一个关闭的通道里仍然能读出数据吗
    • 如何优雅地关闭通道
    • 关于通道的 happens-before 有哪些
    • 通道在什么情况下会引起资源泄漏
    • 通道操作的情况总结
  • 接口

  • unsafe

  • context

  • Go程序员面试笔试宝典
  • 通道
ezreal_rao
2023-05-07

如何优雅地关闭通道

关于 channel 有几个使用不便的地方:

1)在不改变 channel 自身状态的情况下,无法得知一个 channel 是否关闭。

2)关闭一个 closed channel 会导致 panic。所以,如果关闭 channel 的一方在不知道 channel 是否处于关闭状态时就去贸然关闭 channel 是很危险的事情。

3)向一个 closed channel 发送数据会导致 panic。所以,如果向 channel 发送数据的一方不知道 channel 是否处于关闭状态时就去贸然向 channel 发送数据也是很危险的事情。

一个比较粗糙的检查 channel 是否关闭的函数如下:

func IsClosed(ch <-chan T) bool {
    select {
    case <-ch:
        return true
    default:
    }
    return false
}
func main() {
    c := make(chan T)
    fmt.Println(IsClosed(c)) // false
    close(c)
    fmt.Println(IsClosed(c)) // true
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

仔细看一下代码,其实存在很多问题。首先,IsClosed 函数是一个有副作用的函数:每调用一次,都会读出 channel 里的一个元素,改变了 channel 的状态。这不是一个好的函数。

其次,IsClosed 函数返回的结果仅代表调用的那个瞬间,并不能保证调用之后会不会有其他 goroutine 对它进行了一些操作,改变了它的状态。例如,IsClosed 函数返回 true,但这时有另一个 goroutine 关闭了 channel,如果还拿着这个过时的 “channel 未关闭” 的信息,向其发送数据,就会导致 panic。当然,一个 channel 不会被重复关闭两次,如果 IsClosed 函数返回的结果是 true,说明 channel 是真的关闭了。

有一条广泛流传的关闭 channel 的原则:

Don’t close a channel from the receiver side and don’t close a channel if the channel has multiple concurrent senders.
1

翻译过来:不要从 receiver 侧关闭 channel;也不要在有多个 sender 时,关闭 channel。

比较好理解,向 channel 发送元素的就是 sender,因此 sender 可以决定何时不发送数据,并且关闭 channel。但是如果有多个 sender,某个 sender 同样无法确定其他 sender 的情况,这时也不能贸然关闭 channel。

其实上面所说的并不是最本质的,最本质的原则就只有一条:Don’t close (or send values to) closed Channels.

也就是不要关闭一个 closed channel,也不要向一个 closed channel 发送数据。

有两个不那么优雅地关闭 channel 的方法:

1)使用 defer-recover 机制,放心大胆地关闭 channel 或者向 channel 发送数据。即使发生了 panic,也有 defer-recover 在兜底。

2)使用 sync.Once 来保证只关闭一次。

那到底应该如何优雅地关闭 channel?根据 sender 和 receiver 的个数,分下面几种情况:

1)一个 sender,一个 receiver。

2)一个 sender,M 个 receiver。

3)N 个 sender,一个 receiver。

4)N 个 sender,M 个 receiver。

对于第 1、2 种情况,只有一个 sender 的情况就不用说了,直接从 sender 端关闭就好了,没有问题。重点关注第 3、4 种情况。

第 3 种情形下, 优雅关闭 channel 的方法是:The only receiver says “please stop sending more” by closing an additional signal channel。翻译一下:唯一的接收者通过关闭一个第三方充当信号的 channel,来关闭 channel。

所以,第 3 种情形的解决方案就是增加一个传递关闭信号的 channel,receiver 通过关闭信号 channel 下达关闭数据 channel 的指令。当 senders 监听到关闭信号后,停止发送数据。代码如下:

func main() {
    rand.Seed(time.Now().UnixNano())
    const Max = 100000
    const NumSenders = 1000
    dataCh := make(chan int, 100)
    stopCh := make(chan struct{})
    // senders
    for i := 0; i < NumSenders; i++ {
        go func() {
            for {
                select {
                case <- stopCh:
                    return
                case dataCh <- rand.Intn(Max):
                }
            }
        }()
    }
    // the receiver
    go func() {
        for value := range dataCh {
            if value == Max-1 {
                fmt.Println("send stop signal to senders.")
                close(stopCh)
                return
            }
            fmt.Println(value)
        }
    }()
    select {
    case <- time.After(time.Hour):
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

这里的 stopCh 就是信号 channel,它本身只有一个 sender,因此可以直接关闭它。senders 收到了关闭信号后,select 分支 “case <- stopCh” 被选中,return 退出函数,不再发送数据。

需要说明的是,上面的代码并没有明确关闭 dataCh。在 Go 语言中,对于一个 channel,如果最终没有任何 goroutine 引用它,不管 channel 有没有被关闭,最终都会被 GC 回收。所以,在这种情形下,所谓的优雅地关闭 channel 就是不关闭 channel,让 GC 代劳。

第 4 种情形下,优雅关闭 channel 的方法是:Any one of them says“let’s end the game”by notifying a moderator to close an additional signal channel(通知中间人来关闭一个额外的信号 channel,从而关闭 channel)。

和第 3 种情形不同,这里有 M 个 receiver,如果直接采取第 3 种解决方案,由 receiver 直接关闭 stopCh 的话,M 个 receiver 就会重复关闭一个 channel,导致 panic。因此需要增加一个 “中间人”,M 个 receiver 都向它发送关闭 dataCh 的 “请求”,中间人收到第一个请求后,就会直接下达关闭 dataCh 的指令。通过关闭 stopCh,这时就不会发生重复关闭的情况,因为 stopCh 的发送方只有中间人一个。另外,这里的 N 个 sender 也可以向中间人发送关闭 dataCh 的请求。代码如下:

// senders 和 receivers 都可以发送停止信息
func demo02() {
	rand.Seed(time.Now().Unix())

	const Max = 100000
	const NumReceivers = 10
	const NumSenders = 1000

	dataCh := make(chan int, 100)
	stopCh := make(chan struct{})

	// It must be a buffered channel.
	toStop := make(chan string, 1)
	var stoppedBy string
	go func() {
		stoppedBy = <-toStop
		fmt.Println(stoppedBy)
		close(stopCh)
	}()

	// senders
	for i := 0; i < NumSenders; i++ {
		go func(id string) {
			for {
				value := rand.Intn(Max)
				// 如果 value 为0,就发送停止信号
				if value == 0 {
					select {
					case toStop <- "sender#" + id:
					default:
					}
					return
				}
				// 如果接收到停止信号,就跳出
				// 否则就发送数据
				select {
				case <-stopCh:
					return
				case dataCh <- value:
				}
			}
		}(strconv.Itoa(i))
	}

	// receivers
	for i := 0; i < NumReceivers; i++ {
		go func(id string) {
			for {
				// 如果接收到停止信号 就跳出
				// 否则就接受数据
				// 如果接收到的数据为指定数据,也停止接受
				select {
				case <-stopCh:
					return
				case value := <-dataCh:
					if value == Max-1 {
						select {
						case toStop <- "receiver#" + id:
						default:
						}
						return
					}
					fmt.Println(value)
				}
			}
		}(strconv.Itoa(i))
	}

	select {
	case <-time.After(time.Hour):
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72

代码里 toStop 就是中间人的角色,用它来接收 senders 和 receivers 发送过来的关闭 dataCh 请求信号。

代码里将 toStop 声明成了一个缓冲型的 channel。 假设 toStop 声明的是一个非缓冲型的 channel,那么第一个发送的关闭 dataCh 请求可能会丢失,因为无论是 sender 还是 receiver 都是通过 select 语句来发送请求, 如果中间人所在的 goroutine 没有准备好接收信号,那 sender 或 receiver 发送信号的 select 语句就不会被选中,而是直接执行 default 选项,什么也不做。这样, 第一个关闭 dataCh 的请求就会丢失。

如果把 toStop 的容量声明成 Num (senders) + Num (receivers),那发送 dataCh 请求的部分可以改成更简洁的形式:

...
toStop := make(chan string, NumReceivers + NumSenders)
...
    value := rand.Intn(Max)
    if value == 0 {
        toStop <- "sender#" + id
        return
    }
...
    if value == Max-1 {
        toStop <- "receiver#" + id
        return
    }
...
1
2
3
4
5
6
7
8
9
10
11
12
13
14

直接向 toStop 发送请求,因为 toStop 容量足够大,所以不用担心阻塞,自然也就不用 select 语句再加一个 default case 来避免阻塞。

可以看到,这里同样没有真正关闭 dataCh,原因同第 3 种情形。

以上,就是最基本的一些情形,但已经能覆盖几乎所有的情况及其变种了。读者只要记住:

Don’t close a channel from the receiver side and don’t close a channel if the channel has multiple concurrent senders.
1

以及更本质的原则:

Don’t close (or send values to) closed Channels.
1
#通道#csp#channel
上次更新: 5/9/2023, 10:58:32 AM
从一个关闭的通道里仍然能读出数据吗
关于通道的 happens-before 有哪些

← 从一个关闭的通道里仍然能读出数据吗 关于通道的 happens-before 有哪些→

最近更新
01
为什么我的MySQL会抖一下
07-15
02
HTTP 性能优化面面观
07-12
03
WebSocket:沙盒里的 TCP
07-12
更多文章>
Theme by Vdoing | Copyright © 2022-2024 Ezreal Rao | CC BY-NC-SA 4.0
豫ICP备2023001810号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式