[译]Concurrency Trap #2: Incomplete Work

文章目录

原文链接:Concurrency Trap #2: Incomplete Work (ardanlabs.com)

简介

在我的上一篇博文[译]Goroutine Leaks - The Forgotten Sender中,我提到了并发是一个非常有用的工具,但是并发往往伴随着特定的陷阱,而这些不会在同步编程中出现。本文会继续这个主题,在本文中我将介绍一个名为未完成的工作的陷阱。未完成的工作发生于在仍然有未完成的Goroutine(非main goroutine)存在的时候程序终止退出了。发生这种情况时,Goroutine的本质使得它会被强制退出,这可能是一个严重的问题。

未完成的工作

为了说明什么是未完成的工作,请观察下面的例子。

Listing 1

https://play.golang.org/p/VORJoAD2oAh

15 func main() {
26     fmt.Println("Hello")
37     go fmt.Println("Goodbye")
48 }

Listing 1中的程序在第6行打印了”Hello“,第7行在另一个Goroutine里再次调用了fmt.Println,在这个Goroutine之后,程序立即到达了main函数的尾部接着终止退出。如果你运行这个程序,你不会看到”Goodbye“打印出来,这是基于 Go specification中的一个规则:

”程序的运行始于main包的初始化和main函数的调用。当main方法的调用返回后,程序就退出了,它并不会等待其他非main Goroutine完成以后才退出。“

这个说明已经非常清晰了 - 程序并不会等待未完成的Goroutine,main函数返回以后程序就退出了。这是一个好事情!考虑一下,让一个Goroutine泄露或者让一个Goroutine运行很长一段时间是多么容易发生。如果程序在可以突出之前一直等待非main Goroutine运行,它可能在某种僵尸状态卡住一直无法退出。

然而,当你启动一个Goroutine去做一些很重要的工作时,程序这种退出的行为就造成问题了,因为main函数不知道要等待Goroutine完成。这种情景会导致完整性问题,比如数据库或者文件系统错误,或者数据丢失。

一个真实的例子

在Ardan Labs,我的团队构建了一个Web服务提供给那些需要追踪特定的事件的客户端使用。用于记录事件的那个系统有一个方法,类似于如下Listing 2中所示定义在Tracker类型上的方法:

Listing 2

https://play.golang.org/p/8LoUoCdrT7T

1 9 // Tracker knows how to track events for the application.
210 type Tracker struct{}
311 
412 // Event records an event to a database or stream.
513 func (t *Tracker) Event(data string) {
614     time.Sleep(time.Millisecond) // Simulate network write latency.
715     log.Println(data)
816 }

客户端担心追踪这些事件会带来不必要的响应时延增大,所以想以异步的方式追踪。对性能做假设实际上是不明智的,因此我们的第一个任务是用直接的、同步的方式来追踪事件然后观察服务的时延。在我们的例子中,我们观察到这个时延太长无法接受,因此团队决定用异步方式来实现。如果同步的方式足够快那么这个故事也就结束了,接着我们就把注意力放在更重要的事情上。

基于上面判断,用于追踪事件的handler最初的实现如下:

Listing 3

https://play.golang.org/p/8LoUoCdrT7T

 118 // App holds application state.
 219 type App struct {
 320     track Tracker
 421 }
 522 
 623 // Handle represents an example handler for the web service.
 724 func (a *App) Handle(w http.ResponseWriter, r *http.Request) {
 825 
 926     // Do some actual work.
1027 
1128     // Respond to the client.
1229     w.WriteHeader(http.StatusCreated)
1330 
1431     // Fire and Hope.
1532     // BUG: We are not managing this goroutine.
1633     go a.track.Event("this event")
1734 }

最重要的代码是在Listing 3中的第33行 - 在一个新的Goroutine中调用a.track.Event的地方。这个效果和预期一样即不给请求增加时延的前提下异步的追踪事件。然而这个代码会调入未完成的工作陷阱,因此必须要重构。任何像第33行那种创建的Goroutine,无论是运行还是结束都是没有保障的。这就是一个完成性问题,因为由于服务器关闭,事件有可能会丢失。

重构来获取保障

为了避开这个陷阱,我们团队修改了Tracker类型来自我管理Goroutine。现在这个类型使用了sync.WaitGroup来记录启动的Goroutine,并且提供了一个Shutdown函数给main,这样main就可以等待所有Goroutine完成。

首先,handler被修改为不直接创建Goroutine - 唯一的变化就是Listing 4中第53行去掉了go关键字。

Listing 4

https://play.golang.org/p/BMah6_C57-l

 144 // Handle represents an example handler for the web service.
 245 func (a *App) Handle(w http.ResponseWriter, r *http.Request) {
 346 
 447     // Do some actual work.
 548 
 649     // Respond to the client.
 750     w.WriteHeader(http.StatusCreated)
 851 
 952     // Track the event.
1053     a.track.Event("this event")
1154 }

接下来,修改Tracker类型使得它能自己管理Goroutine。

Listing 5

https://play.golang.org/p/BMah6_C57-l

 110 // Tracker knows how to track events for the application.
 211 type Tracker struct {
 312     wg sync.WaitGroup
 413 }
 514 
 615 // Event starts tracking an event. It runs asynchronously to
 716 // not block the caller. Be sure to call the Shutdown function
 817 // before the program exits so all tracked events finish.
 918 func (t *Tracker) Event(data string) {
1019 
1120     // Increment counter so Shutdown knows to wait for this event.
1221     t.wg.Add(1)
1322 
1423     // Track event in a goroutine so caller is not blocked.
1524     go func() {
1625 
1726         // Decrement counter to tell Shutdown this goroutine finished.
1827         defer t.wg.Done()
1928 
2029         time.Sleep(time.Millisecond) // Simulate network write latency.
2130         log.Println(data)
2231     }()
2332 }
2433 
2534 // Shutdown waits for all tracked events to finish processing.
2635 func (t *Tracker) Shutdown() {
2736     t.wg.Wait()
2837 }

Listing 5中第12行,在Tracker类型中添加了sync.WaitGroup。在Event方法中第21行调用了t.wg.Add(1),这使得Goroutine(第24行创建)计数器加1。一旦Goroutine被创建,Event方法就返回了,这符合客户端想减少事件追踪的时延的需求。创建出来的Goroutine就去做它该做的工作,然后在做完以后在第27行调用t.wg.Done()。调用Done方法就减少了Goroutine计数器,因此WaitGroup知道这个Goroutine结束了。

AddDone对于跟踪Goroutine的数量是非常有用的,但我们依然需要告诉程序去等待所有的Goroutine完成。为了达成这个目的,Tracker类型在第35行添加了一个新的方法Shutdown - 最简单的一个实现就是调用t.wg.Wait(),这会阻塞程序知道Goroutine计数器变为0。最后,这个方法会再func main中调用,如下面的Listing 6:

Listing 6

https://play.golang.org/p/BMah6_C57-l

 156 func main() {
 257 
 358     // Start a server.
 459     // Details not shown...
 560     var a App
 661 
 762     // Shut the server down.
 863     // Details not shown...
 964 
1065     // Wait for all event goroutines to finish.
1166     a.track.Shutdown()
1267 }

Listing 6中很重要的部分就是第66行,它阻塞了程序退出直到a.track.Shutdown()完成。

或许不要等待太久

Shown方法的实现很简单,但确实也做了它该做的事情 - 它等待所有的Goroutine执行完成。但不幸的是,这里面无法限制要等待多久。如果是生产环境,你不能不会愿意看要无限制的等待你的程序退出。为了给Shutdown方法一个等待的期限,我们团队修改成如下方式:

Listing 7

https://play.golang.org/p/p4gsDkpw1Gh

 136 // Shutdown waits for all tracked events to finish processing
 237 // or for the provided context to be canceled.
 338 func (t *Tracker) Shutdown(ctx context.Context) error {
 439 
 540     // Create a channel to signal when the waitgroup is finished.
 641     ch := make(chan struct{})
 742 
 843     // Create a goroutine to wait for all other goroutines to
 944     // be done then close the channel to unblock the select.
1045     go func() {
1146         t.wg.Wait()
1247         close(ch)
1348     }()
1449 
1550     // Block this function from returning. Wait for either the
1651     // waitgroup to finish or the context to expire.
1752     select {
1853     case <-ch:
1954         return nil
2055     case <-ctx.Done():
2156         return errors.New("timeout")
2257     }
2358 }

现在Listing 7中第38行,Shutdown方法接收一个context.context参数。这就是调用者用于限制Shutdown等待的时长。第41行,创建了一个channel,第45行启动了一个Goroutine。这个Goroutine的唯一工作就是等待WaitGroup进而关闭channel。第52行,通过一个select块阻塞程序,最终程序退出要么是因为等待时间超时要么通道被关闭了(注:所有Goroutine都执行完成)。

接下来,我们团队在func main中修改了调用方式:

Listing 8

https://play.golang.org/p/p4gsDkpw1Gh

186     // Wait up to 5 seconds for all event goroutines to finish.
287     const timeout = 5 * time.Second
388     ctx, cancel := context.WithTimeout(context.Background(), timeout)
489     defer cancel()
590 
691     err := a.track.Shutdown(ctx)

在Listing 8中创建了一个超时时间为5秒的context,然后这个context传递给a.track.Shutdown以设置main函数期望等待的时间。

结论

随着Goroutine的引入,这个服务器的handler能够最小化跟踪事件的API调用的时延。最简单的方式就是使用go关键字在后台运行但是这会有完整性问题。合适的方法是要确保在程序终止前所有的相关Goroutine完成各自的工作然后退出。

并发是非常有用的工具,但是你一定要非常小心的使用它。