[译]Go Concurrency Best Practice

文章目录

Dave Cheney 2019年在QCon上分享的一些Go编程的原则很有参考价值。我摘录了关于并发的部分,这部分是关于Go编发编程的一些原则和思想。

原文链接:Practical Go: Real world advice for writing maintainable Go programs (cheney.net)

通常选择Go来做项目,往往是因为它的并发特性。Go团队已经不遗余力的让Go在并发方面即能节省资源又能满足性能要求。然而,使用Go的并发特性写出低性能或者不稳定的代码是完全可能的。在最后我想留给你们一些我关于如何避免使用Go并发特定带来的一些隐患的建议。

支持并发特性的语句channelselectgo在Go特性中是一等公民。如果你在一本书或者培训教程中正式的学习过Go,你可能注意到了并发的部分总是在最后你要学习的。今天这个Workshop(工作坊)也是一样,我也是把并发相关的内容放在最后,这样会让人觉得无论怎样Go并发是一个Go程序员除了常规技能之外一定要掌握的内容。

凡事都有其两面性:Go的头等特性是它简单、轻量的并发模型。作为一个产品,Go语言几乎是以这个特性作为卖点。但是从另一个方面来说,有一个说法是Go并发并没有那么容易使用,否则那些作者也不会把这个特性放在他们的书的最后一个章节,我们也不会遗憾的回顾我们曾经做的努力(这里不是很理解想表达什么意思)。

这个部分我们会讨论一些Go并发特性的天真(随意)用法所带来的隐患。

Keep your self busy or do the work yourself

我理解作者的意思是:让你自己保持忙碌有事情可做(而不是干等),或者你自己来做这个工作

下面的这段程度有什么问题:

 1package main
 2
 3import (
 4	"fmt"
 5	"log"
 6	"net/http"
 7)
 8
 9func main() {
10	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
11		fmt.Fprintln(w, "Hello, GopherCon SG")
12	})
13	go func() {
14		if err := http.ListenAndServe(":8080", nil); err != nil {
15			log.Fatal(err)
16		}
17	}()
18
19	for {
20	}
21}

这个程序的预期就是一个很简单的web服务。然而它同时也做了其他事情,它在一个无限循环中浪费CPU资源。这是因为main函数最后通过for{}阻止main Goroutine退出,而不是等待任何IO,也不是在锁上面等待,也不是发送数据到channel或者从channel接收,也不是和调度器通信。

由于 Go 运行时大部分是协作调度的,因此该程序将在单个 CPU 上徒劳无功,最终可能会以活锁结束。

我们应该如何解决这个问题?这里有一个建议。

 1package main
 2
 3import (
 4	"fmt"
 5	"log"
 6	"net/http"
 7	"runtime"
 8)
 9
10func main() {
11	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
12		fmt.Fprintln(w, "Hello, GopherCon SG")
13	})
14	go func() {
15		if err := http.ListenAndServe(":8080", nil); err != nil {
16			log.Fatal(err)
17		}
18	}()
19
20	for {
21		runtime.Gosched()
22	}
23}

这或许看起来很傻,但这是我在外面看到的常见解决方案。 这明显是不了解潜在问题的症状。

现在,如果你与一点点go方面的经验,你可能会写出下面的代码。

 1package main
 2
 3import (
 4	"fmt"
 5	"log"
 6	"net/http"
 7)
 8
 9func main() {
10	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
11		fmt.Fprintln(w, "Hello, GopherCon SG")
12	})
13	go func() {
14		if err := http.ListenAndServe(":8080", nil); err != nil {
15			log.Fatal(err)
16		}
17	}()
18
19	select {}
20}

空的 select 语句将永远阻塞。 这是一个有用的属性,因为现在我们不是为了调用 runtime.GoSched() 而轮转整个 CPU。然而我们只是治标不治本。

我想给你展示另一个方案,我希望你也想到这个方案了。简单的在main goroutine上直接运行http.ListenAndServe,而不是在一个新的goroutine上运行http.ListenAndServe然后把要在main goroutine上做什么这个事情留给我们。

TIP - Go程序中,如果main.main函数返回了那么程序也就无条件退出了,不论程序启动的其他goroutine在正在做什么。

 1package main
 2
 3import (
 4	"fmt"
 5	"log"
 6	"net/http"
 7)
 8
 9func main() {
10	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
11		fmt.Fprintln(w, "Hello, GopherCon SG")
12	})
13	if err := http.ListenAndServe(":8080", nil); err != nil {
14		log.Fatal(err)
15	}
16}

所以这是我的第一个小小的建议:在你的goroutine从其他goroutine获取到结果之前,如果你的goroutine没有任何一点点进展(只能干等),那么这种情况下,你自己的goroutine来做这个工作更合适些,而不是把这个工作代理给其他goroutine。

这通常消除了将结果从 goroutine 返回到其发起者所需的大量状态跟踪和channel操作。

TIP - 许多 Go 程序员过度使用 goroutine,尤其是在他们刚开始的时候。 与生活中的所有事情一样,适度是成功的关键。

Leave concurrency to caller

把并发的权利留给调用者

下面这两个API有什么差别?

1// ListDirectory returns the contents of dir.
2func ListDirectory(dir string) ([]string, error)
3// ListDirectory returns a channel over which
4// directory entries will be published. When the list
5// of entries is exhausted, the channel will be closed.
6func ListDirectory(dir string) chan string

首先,明显的区别是:第一例子是把整个目录都读到一个切片里然后返回这个切片,或者发生错误了就返回错误。这是一个同步操作,ListDirectory的调用者在读取完所有目录项之前是出于阻塞状态。这就取决于目录本身有多大,可能会耗费大量的时间,可能需要大量的内存存放目录项。

我们来看看第二个例子。这个就有点Go风格了,ListDirectory返回了一个channel,读取的目录项是通过这个channel传递。当这个channel关闭的时候,说明没有更多的目录项要读取了。由于通道的填充发生在 ListDirectory 返回之后,ListDirectory 可能正在启动一个 goroutine 来填充通道。

NOTE - 第二个版本没有必要实际使用 goroutine。它可以分配一个足以容纳所有目录条目而不会阻塞的通道,填充channel,关闭它,然后将channel返回给调用者。 但这不太可能,因为这样做也是有一样的问题 - 在channel上缓存所有的目录项一样会耗费大量的内存。

使用channel版本的ListDirectory还有两个额外的问题:

  • 通过关闭的channel作为没有更多目录项要处理的信号,ListDirectory 无法告诉调用者通过channel返回的目录项不完整,因为在获取目录项的中途可能就遇到错误了。调用者也无法区分是空目录还是读取目录时遇到了错误,因为是哪种情况都会导致从ListDirectory返回channel时就立即关闭了
  • 调用者必须继续从channel读取直到它关闭,因为这是调用者知道用于填充channel的 goroutine 已停止的唯一方法。这是使用 ListDirectory 的严重限制,调用者必须花时间从channel读取,即使它可能已经得到了它想要的答案

结果上述问题的方案是使用一个回调函数,该函数在执行时在每个目录条目的上下文中调用。

1func ListDirectory(dir string, fn func(string))

这一点也不奇怪,这就是filepath.WalkDir函数的工作原理。

TIP - 如果你的函数启动了一个 goroutine,你必须为调用者提供一种明确停止该 goroutine 的方法。 将异步执行函数的决定留给该函数的调用者通常更容易。

Never start a goroutine without knowning when it will stop

如果不知道一个goroutine何时会停止,那么你最好永远也不要启动它

前面的例子展示了在没有必要的情况下使用了 goroutine。 但是使用 Go 的一个驱因是该语言提供的一流并发特性。 事实上,在许多情况下,你希望利用硬件提供的可用的并行性。 为此,你必须使用 goroutines。

这个简单的应用程序在两个不同的端口上提供 http 服务,端口 8080 用于应用程序流量,端口 8001 用于访问/debug/pprof端点。

 1package main
 2
 3import (
 4	"fmt"
 5	"net/http"
 6	_ "net/http/pprof"
 7)
 8
 9func main() {
10	mux := http.NewServeMux()
11	mux.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
12		fmt.Fprintln(resp, "Hello, QCon!")
13	})
14	go http.ListenAndServe("127.0.0.1:8001", http.DefaultServeMux) // debug
15	http.ListenAndServe("0.0.0.0:8080", mux)                       // app traffic
16}

虽然上面的程序并没有多么复杂,但一个真正的应用基本上就是这样子了。

应用程序目前存在一些问题,这些问题会随着应用程序的增长而显现出来,所以现在让我们解决其中的一些问题。

 1func serveApp() {
 2	mux := http.NewServeMux()
 3	mux.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
 4		fmt.Fprintln(resp, "Hello, QCon!")
 5	})
 6	http.ListenAndServe("0.0.0.0:8080", mux)
 7}
 8
 9func serveDebug() {
10	http.ListenAndServe("127.0.0.1:8001", http.DefaultServeMux)
11}
12
13func main() {
14	go serveDebug()
15	serveApp()
16}

通过将handler分解为他们自己的函数以解耦。同时我们也遵守了上面的建议,也就是确保把函数的并发性留给他们的调用者main.main

但是这个程序依然存在一些可操作性问题。如果程序返回了那么返回导致程序关闭,接着程序由你使用的进程管理器重新启动。

TIP - 正如在Go中应该把函数的并发性留给函数的调用者一样,应用程序应该把其状态监控和应用重启的职责留给启动这个应该程序的程序。不要让你的应用程序自己负责自己的重启,这个过程应该留给应用程序本身以外的其他程序来控制。

然而在一个单独的goroutine中运行,如果它返回了那么只有该goroutine会退出,程序的其余部分会继续。你们的运维人员可能会不高兴的发现他们无法在需要时从你的应用中获取统计信息,因为获取统计信息的handler早就停止工作了。

我们需要确保当任何一个提供服务的goroutine停止时,那么整个应用程序都要被停止。

 1func serveApp() {
 2	mux := http.NewServeMux()
 3	mux.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
 4		fmt.Fprintln(resp, "Hello, QCon!")
 5	})
 6	if err := http.ListenAndServe("0.0.0.0:8080", mux); err != nil {
 7		log.Fatal(err)
 8	}
 9}
10
11func serveDebug() {
12	if err := http.ListenAndServe("127.0.0.1:8001", http.DefaultServeMux); err != nil {
13		log.Fatal(err)
14	}
15}
16
17func main() {
18	go serveDebug()
19	go serveApp()
20	select {}
21}

现在,serverAppserveDebug检查从ListenAndServe返回的错误,如果发生了错误则调用log.Fatal。因为这两个handlers都在各自的goroutine中运行,main goroutine通过select{}阻塞。这种方式有这些问题:

  1. 如果ListenAndServe返回的错误是nillog.Fatal就不会调用,这样监听在对应端口上的HTTP服务会停止但是应该程序并不会停止
  2. log.Fatal会调用os.Exit这将无条件使程序突出,defer语句也不会执行,其他goroutine也不会被通知要关闭,程序就这样停止了。这使得给这些函数写测试用例变的很困难

TIP - 只在init函数和main.main函数中使用log.Fatal

我们真正想要的是将发生的任何错误传递回 goroutine 的发起者,以便它知道 goroutine 停止的原因,可以干净地关闭进程。

 1func serveApp() error {
 2	mux := http.NewServeMux()
 3	mux.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
 4		fmt.Fprintln(resp, "Hello, QCon!")
 5	})
 6	return http.ListenAndServe("0.0.0.0:8080", mux)
 7}
 8
 9func serveDebug() error {
10	return http.ListenAndServe("127.0.0.1:8001", http.DefaultServeMux)
11}
12
13func main() {
14	done := make(chan error, 2)
15	go func() {
16		done <- serveDebug()
17	}()
18	go func() {
19		done <- serveApp()
20	}()
21
22	for i := 0; i < cap(done); i++ {
23		if err := <-done; err != nil {
24			fmt.Println("error: %v", err)
25		}
26	}
27}

我们可以使用一个通道来收集 goroutine 的返回状态。 通道的大小等于我们要管理的 goroutine 的数量,这样发送到通道就不会阻塞,因为这会阻塞 goroutine 的关闭,导致它泄漏。

由于没有办法安全地关闭通道,我们不能使用for ... range ...来循环的通道直到所有 goroutines 都报告完成,而是循环循环我们启动的 goroutines,数量等于channel的容量。

现在我们有一种方法可以等待每个 goroutine 干净地退出并记录它们遇到的任何错误。 所需要的只是一种将关闭信号从第一个退出的 goroutine 转发给其他 goroutine 的方法。

事实证明,让一个http.Server关闭有一些牵强,因此我把关闭的逻辑分离出来放在一个helper函数serve里。helper函数接收一个地址参数和http.Handler参数(跟http.ListenAndServe类似),同时还有一个用来触发关闭函数Shutdown的channel。

 1func serve(addr string, handler http.Handler, stop <-chan struct{}) error {
 2	s := http.Server{
 3		Addr:    addr,
 4		Handler: handler,
 5	}
 6
 7	go func() {
 8		<-stop // wait for stop signal
 9		s.Shutdown(context.Background())
10	}()
11
12	return s.ListenAndServe()
13}
14
15func serveApp(stop <-chan struct{}) error {
16	mux := http.NewServeMux()
17	mux.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
18		fmt.Fprintln(resp, "Hello, QCon!")
19	})
20	return serve("0.0.0.0:8080", mux, stop)
21}
22
23func serveDebug(stop <-chan struct{}) error {
24	return serve("127.0.0.1:8001", http.DefaultServeMux, stop)
25}
26
27func main() {
28	done := make(chan error, 2)
29	stop := make(chan struct{})
30	go func() {
31		done <- serveDebug(stop)
32	}()
33	go func() {
34		done <- serveApp(stop)
35	}()
36
37	var stopped bool
38	for i := 0; i < cap(done); i++ {
39		if err := <-done; err != nil {
40			fmt.Println("error: %v", err)
41		}
42		if !stopped {
43			stopped = true
44			close(stop)
45		}
46	}
47}

现在,每当我们从channel上接收到值(serveDebug或者serveApp返回的错误),我们就关闭channel stop,这导致所有等待在stop上的goroutine调用Shutdown方法以关闭HTTP服务进而停止启动HTTP服务的goroutine,最终所有的goroutine都将停止。一旦我们启动的goroutine都停止了,程序也就优雅的停止了。

TIP - 自己编写这个逻辑就有些重复了。 可以考虑像这个包 https://github.com/heptio/workgroup这样的东西,它将为你完成大部分工作。