8.9. 併發的退出

有時候我們需要通知goroutine停止它正在乾的事情,比如一個正在執行計算的web服務,然而它的客戶端已經斷開了和服務端的連接。

Go語言並沒有提供在一個goroutine中終止另一個goroutine的方法,由於這樣會導致goroutine之間的共享變量落在未定義的狀態上。在8.7節中的rocket launch程序中,我們往名字叫abort的channel裡發送了一個簡單的值,在countdown的goroutine中會把這個值理解為自己的退出信號。但是如果我們想要退出兩個或者任意多個goroutine怎麼辦呢?

一種可能的手段是向abort的channel裡發送和goroutine數目一樣多的事件來退出它們。如果這些goroutine中已經有一些自己退出了,那麼會導致我們的channel裡的事件數比goroutine還多,這樣導致我們的發送直接被阻塞。另一方面,如果這些goroutine又生成了其它的goroutine,我們的channel裡的數目又太少了,所以有些goroutine可能會無法接收到退出消息。一般情況下我們是很難知道在某一個時刻具體有多少個goroutine在運行著的。另外,當一個goroutine從abort channel中接收到一個值的時候,他會消費掉這個值,這樣其它的goroutine就沒法看到這條信息。為了能夠達到我們退出goroutine的目的,我們需要更靠譜的策略,來通過一個channel把消息廣播出去,這樣goroutine們能夠看到這條事件消息,並且在事件完成之後,可以知道這件事已經發生過了。

回憶一下我們關閉了一個channel並且被消費掉了所有已發送的值,操作channel之後的代碼可以立即被執行,並且會產生零值。我們可以將這個機制擴展一下,來作為我們的廣播機制:不要向channel發送值,而是用關閉一個channel來進行廣播。

只要一些小修改,我們就可以把退出邏輯加入到前一節的du程序。首先,我們創建一個退出的channel,不需要向這個channel發送任何值,但其所在的閉包內要寫明程序需要退出。我們同時還定義了一個工具函數,cancelled,這個函數在被調用的時候會輪詢退出狀態。

gopl.io/ch8/du4

var done = make(chan struct{})

func cancelled() bool {
    select {
    case <-done:
        return true
    default:
        return false
    }
}

下面我們創建一個從標準輸入流中讀取內容的goroutine,這是一個比較典型的連接到終端的程序。每當有輸入被讀到(比如用戶按了回車鍵),這個goroutine就會把取消消息通過關閉done的channel廣播出去。

// Cancel traversal when input is detected.
go func() {
    os.Stdin.Read(make([]byte, 1)) // read a single byte
    close(done)
}()

現在我們需要使我們的goroutine來對取消進行響應。在main goroutine中,我們添加了select的第三個case語句,嘗試從done channel中接收內容。如果這個case被滿足的話,在select到的時候即會返回,但在結束之前我們需要把fileSizes channel中的內容“排”空,在channel被關閉之前,捨棄掉所有值。這樣可以保證對walkDir的調用不要被向fileSizes發送信息阻塞住,可以正確地完成。

for {
    select {
    case <-done:
        // Drain fileSizes to allow existing goroutines to finish.
        for range fileSizes {
            // Do nothing.
        }
        return
    case size, ok := <-fileSizes:
        // ...
    }
}

walkDir這個goroutine一啟動就會輪詢取消狀態,如果取消狀態被設置的話會直接返回,並且不做額外的事情。這樣我們將所有在取消事件之後創建的goroutine改變為無操作。

func walkDir(dir string, n *sync.WaitGroup, fileSizes chan<- int64) {
    defer n.Done()
    if cancelled() {
        return
    }
    for _, entry := range dirents(dir) {
        // ...
    }
}

在walkDir函數的循環中我們對取消狀態進行輪詢可以帶來明顯的益處,可以避免在取消事件發生時還去創建goroutine。取消本身是有一些代價的;想要快速的響應需要對程序邏輯進行侵入式的修改。確保在取消發生之後不要有代價太大的操作可能會需要修改你代碼裡的很多地方,但是在一些重要的地方去檢查取消事件也確實能帶來很大的好處。

對這個程序的一個簡單的性能分析可以揭示瓶頸在dirents函數中獲取一個信號量。下面的select可以讓這種操作可以被取消,並且可以將取消時的延遲從幾百毫秒降低到幾十毫秒。

func dirents(dir string) []os.FileInfo {
    select {
    case sema <- struct{}{}: // acquire token
    case <-done:
        return nil // cancelled
    }
    defer func() { <-sema }() // release token
    // ...read directory...
}

現在當取消發生時,所有後臺的goroutine都會迅速停止並且主函數會返回。當然,當主函數返回時,一個程序會退出,而我們又無法在主函數退出的時候確認其已經釋放了所有的資源(譯註:因為程序都退出了,你的代碼都沒法執行了)。這裡有一個方便的竅門我們可以一用:取代掉直接從主函數返回,我們調用一個panic,然後runtime會把每一個goroutine的棧dump下來。如果main goroutine是唯一一個剩下的goroutine的話,他會清理掉自己的一切資源。但是如果還有其它的goroutine沒有退出,他們可能沒辦法被正確地取消掉,也有可能被取消但是取消操作會很花時間;所以這裡的一個調研還是很有必要的。我們用panic來獲取到足夠的信息來驗證我們上面的判斷,看看最終到底是什麼樣的情況。

練習 8.10: HTTP請求可能會因http.Request結構體中Cancel channel的關閉而取消。修改8.6節中的web crawler來支持取消http請求。(提示:http.Get並沒有提供方便地定製一個請求的方法。你可以用http.NewRequest來取而代之,設置它的Cancel字段,然後用http.DefaultClient.Do(req)來進行這個http請求。)

練習 8.11: 緊接著8.4.4中的mirroredQuery流程,實現一個併發請求url的fetch的變種。當第一個請求返回時,直接取消其它的請求。