8.4. Channels

如果說goroutine是Go語言程序的併發體的話,那麼channels則是它們之間的通信機制。一個channel是一個通信機制,它可以讓一個goroutine通過它給另一個goroutine發送值信息。每個channel都有一個特殊的類型,也就是channels可發送數據的類型。一個可以發送int類型數據的channel一般寫為chan int。

使用內置的make函數,我們可以創建一個channel:

ch := make(chan int) // ch has type 'chan int'

和map類似,channel也對應一個make創建的底層數據結構的引用。當我們複製一個channel或用於函數參數傳遞時,我們只是拷貝了一個channel引用,因此調用者和被調用者將引用同一個channel對象。和其它的引用類型一樣,channel的零值也是nil。

兩個相同類型的channel可以使用==運算符比較。如果兩個channel引用的是相同的對象,那麼比較的結果為真。一個channel也可以和nil進行比較。

一個channel有發送和接受兩個主要操作,都是通信行為。一個發送語句將一個值從一個goroutine通過channel發送到另一個執行接收操作的goroutine。發送和接收兩個操作都使用<-運算符。在發送語句中,<-運算符分割channel和要發送的值。在接收語句中,<-運算符寫在channel對象之前。一個不使用接收結果的接收操作也是合法的。

ch <- x  // a send statement
x = <-ch // a receive expression in an assignment statement
<-ch     // a receive statement; result is discarded

Channel還支持close操作,用於關閉channel,隨後對基於該channel的任何發送操作都將導致panic異常。對一個已經被close過的channel進行接收操作依然可以接受到之前已經成功發送的數據;如果channel中已經沒有數據的話將產生一個零值的數據。

使用內置的close函數就可以關閉一個channel:

close(ch)

以最簡單方式調用make函數創建的是一個無緩存的channel,但是我們也可以指定第二個整型參數,對應channel的容量。如果channel的容量大於零,那麼該channel就是帶緩存的channel。

ch = make(chan int)    // unbuffered channel
ch = make(chan int, 0) // unbuffered channel
ch = make(chan int, 3) // buffered channel with capacity 3

我們將先討論無緩存的channel,然後在8.4.4節討論帶緩存的channel。

8.4.1. 不帶緩存的Channels

一個基於無緩存Channels的發送操作將導致發送者goroutine阻塞,直到另一個goroutine在相同的Channels上執行接收操作,當發送的值通過Channels成功傳輸之後,兩個goroutine可以繼續執行後面的語句。反之,如果接收操作先發生,那麼接收者goroutine也將阻塞,直到有另一個goroutine在相同的Channels上執行發送操作。

基於無緩存Channels的發送和接收操作將導致兩個goroutine做一次同步操作。因為這個原因,無緩存Channels有時候也被稱為同步Channels。當通過一個無緩存Channels發送數據時,接收者收到數據發生在再次喚醒喚醒發送者goroutine之前(譯註:happens before,這是Go語言併發內存模型的一個關鍵術語!)。

在討論併發編程時,當我們說x事件在y事件之前發生(happens before),我們並不是說x事件在時間上比y時間更早;我們要表達的意思是要保證在此之前的事件都已經完成了,例如在此之前的更新某些變量的操作已經完成,你可以放心依賴這些已完成的事件了。

當我們說x事件既不是在y事件之前發生也不是在y事件之後發生,我們就說x事件和y事件是併發的。這並不是意味著x事件和y事件就一定是同時發生的,我們只是不能確定這兩個事件發生的先後順序。在下一章中我們將看到,當兩個goroutine併發訪問了相同的變量時,我們有必要保證某些事件的執行順序,以避免出現某些併發問題。

在8.3節的客戶端程序,它在主goroutine中(譯註:就是執行main函數的goroutine)將標準輸入複製到server,因此當客戶端程序關閉標準輸入時,後臺goroutine可能依然在工作。我們需要讓主goroutine等待後臺goroutine完成工作後再退出,我們使用了一個channel來同步兩個goroutine:

gopl.io/ch8/netcat3

func main() {
    conn, err := net.Dial("tcp", "localhost:8000")
    if err != nil {
        log.Fatal(err)
    }
    done := make(chan struct{})
    go func() {
        io.Copy(os.Stdout, conn) // NOTE: ignoring errors
        log.Println("done")
        done <- struct{}{} // signal the main goroutine
    }()
    mustCopy(conn, os.Stdin)
    conn.Close()
    <-done // wait for background goroutine to finish
}

當用戶關閉了標準輸入,主goroutine中的mustCopy函數調用將返回,然後調用conn.Close()關閉讀和寫方向的網絡連接。關閉網絡連接中的寫方向的連接將導致server程序收到一個文件(end-of-file)結束的信號。關閉網絡連接中讀方向的連接將導致後臺goroutine的io.Copy函數調用返回一個“read from closed connection”(“從關閉的連接讀”)類似的錯誤,因此我們臨時移除了錯誤日誌語句;在練習8.3將會提供一個更好的解決方案。(需要注意的是go語句調用了一個函數字面量,這是Go語言中啟動goroutine常用的形式。)

在後臺goroutine返回之前,它先打印一個日誌信息,然後向done對應的channel發送一個值。主goroutine在退出前先等待從done對應的channel接收一個值。因此,總是可以在程序退出前正確輸出“done”消息。

基於channels發送消息有兩個重要方面。首先每個消息都有一個值,但是有時候通訊的事實和發生的時刻也同樣重要。當我們更希望強調通訊發生的時刻時,我們將它稱為消息事件。有些消息事件並不攜帶額外的信息,它僅僅是用作兩個goroutine之間的同步,這時候我們可以用struct{}空結構體作為channels元素的類型,雖然也可以使用bool或int類型實現同樣的功能,done <- 1語句也比done <- struct{}{}更短。

練習 8.3: 在netcat3例子中,conn雖然是一個interface類型的值,但是其底層真實類型是*net.TCPConn,代表一個TCP連接。一個TCP連接有讀和寫兩個部分,可以使用CloseRead和CloseWrite方法分別關閉它們。修改netcat3的主goroutine代碼,只關閉網絡連接中寫的部分,這樣的話後臺goroutine可以在標準輸入被關閉後繼續打印從reverb1服務器傳回的數據。(要在reverb2服務器也完成同樣的功能是比較困難的;參考練習 8.4。)

8.4.2. 串聯的Channels(Pipeline)

Channels也可以用於將多個goroutine連接在一起,一個Channel的輸出作為下一個Channel的輸入。這種串聯的Channels就是所謂的管道(pipeline)。下面的程序用兩個channels將三個goroutine串聯起來,如圖8.1所示。

第一個goroutine是一個計數器,用於生成0、1、2、……形式的整數序列,然後通過channel將該整數序列發送給第二個goroutine;第二個goroutine是一個求平方的程序,對收到的每個整數求平方,然後將平方後的結果通過第二個channel發送給第三個goroutine;第三個goroutine是一個打印程序,打印收到的每個整數。為了保持例子清晰,我們有意選擇了非常簡單的函數,當然三個goroutine的計算很簡單,在現實中確實沒有必要為如此簡單的運算構建三個goroutine。

gopl.io/ch8/pipeline1

func main() {
    naturals := make(chan int)
    squares := make(chan int)

    // Counter
    go func() {
        for x := 0; ; x++ {
            naturals <- x
        }
    }()

    // Squarer
    go func() {
        for {
            x := <-naturals
            squares <- x * x
        }
    }()

    // Printer (in main goroutine)
    for {
        fmt.Println(<-squares)
    }
}

如您所料,上面的程序將生成0、1、4、9、……形式的無窮數列。像這樣的串聯Channels的管道(Pipelines)可以用在需要長時間運行的服務中,每個長時間運行的goroutine可能會包含一個死循環,在不同goroutine的死循環內部使用串聯的Channels來通信。但是,如果我們希望通過Channels只發送有限的數列該如何處理呢?

如果發送者知道,沒有更多的值需要發送到channel的話,那麼讓接收者也能及時知道沒有多餘的值可接收將是有用的,因為接收者可以停止不必要的接收等待。這可以通過內置的close函數來關閉channel實現:

close(naturals)

當一個channel被關閉後,再向該channel發送數據將導致panic異常。當一個被關閉的channel中已經發送的數據都被成功接收後,後續的接收操作將不再阻塞,它們會立即返回一個零值。關閉上面例子中的naturals變量對應的channel並不能終止循環,它依然會收到一個永無休止的零值序列,然後將它們發送給打印者goroutine。

沒有辦法直接測試一個channel是否被關閉,但是接收操作有一個變體形式:它多接收一個結果,多接收的第二個結果是一個布爾值ok,ture表示成功從channels接收到值,false表示channels已經被關閉並且裡面沒有值可接收。使用這個特性,我們可以修改squarer函數中的循環代碼,當naturals對應的channel被關閉並沒有值可接收時跳出循環,並且也關閉squares對應的channel.

// Squarer
go func() {
    for {
        x, ok := <-naturals
        if !ok {
            break // channel was closed and drained
        }
        squares <- x * x
    }
    close(squares)
}()

因為上面的語法是笨拙的,而且這種處理模式很常見,因此Go語言的range循環可直接在channels上面迭代。使用range循環是上面處理模式的簡潔語法,它依次從channel接收數據,當channel被關閉並且沒有值可接收時跳出循環。

在下面的改進中,我們的計數器goroutine只生成100個含數字的序列,然後關閉naturals對應的channel,這將導致計算平方數的squarer對應的goroutine可以正常終止循環並關閉squares對應的channel。(在一個更復雜的程序中,可以通過defer語句關閉對應的channel。)最後,主goroutine也可以正常終止循環並退出程序。

gopl.io/ch8/pipeline2

func main() {
    naturals := make(chan int)
    squares := make(chan int)

    // Counter
    go func() {
        for x := 0; x < 100; x++ {
            naturals <- x
        }
        close(naturals)
    }()

    // Squarer
    go func() {
        for x := range naturals {
            squares <- x * x
        }
        close(squares)
    }()

    // Printer (in main goroutine)
    for x := range squares {
        fmt.Println(x)
    }
}

其實你並不需要關閉每一個channel。只有當需要告訴接收者goroutine,所有的數據已經全部發送時才需要關閉channel。不管一個channel是否被關閉,當它沒有被引用時將會被Go語言的垃圾自動回收器回收。(不要將關閉一個打開文件的操作和關閉一個channel操作混淆。對於每個打開的文件,都需要在不使用的時候調用對應的Close方法來關閉文件。)

試圖重複關閉一個channel將導致panic異常,試圖關閉一個nil值的channel也將導致panic異常。關閉一個channels還會觸發一個廣播機制,我們將在8.9節討論。

8.4.3. 單方向的Channel

隨著程序的增長,人們習慣於將大的函數拆分為小的函數。我們前面的例子中使用了三個goroutine,然後用兩個channels來連接它們,它們都是main函數的局部變量。將三個goroutine拆分為以下三個函數是自然的想法:

func counter(out chan int)
func squarer(out, in chan int)
func printer(in chan int)

其中計算平方的squarer函數在兩個串聯Channels的中間,因此擁有兩個channel類型的參數,一個用於輸入一個用於輸出。兩個channel都擁有相同的類型,但是它們的使用方式相反:一個只用於接收,另一個只用於發送。參數的名字in和out已經明確表示了這個意圖,但是並無法保證squarer函數向一個in參數對應的channel發送數據或者從一個out參數對應的channel接收數據。

這種場景是典型的。當一個channel作為一個函數參數時,它一般總是被專門用於只發送或者只接收。

為了表明這種意圖並防止被濫用,Go語言的類型系統提供了單方向的channel類型,分別用於只發送或只接收的channel。類型chan<- int表示一個只發送int的channel,只能發送不能接收。相反,類型<-chan int表示一個只接收int的channel,只能接收不能發送。(箭頭<-和關鍵字chan的相對位置表明了channel的方向。)這種限制將在編譯期檢測。

因為關閉操作只用於斷言不再向channel發送新的數據,所以只有在發送者所在的goroutine才會調用close函數,因此對一個只接收的channel調用close將是一個編譯錯誤。

這是改進的版本,這一次參數使用了單方向channel類型:

gopl.io/ch8/pipeline3

func counter(out chan<- int) {
    for x := 0; x < 100; x++ {
        out <- x
    }
    close(out)
}

func squarer(out chan<- int, in <-chan int) {
    for v := range in {
        out <- v * v
    }
    close(out)
}

func printer(in <-chan int) {
    for v := range in {
        fmt.Println(v)
    }
}

func main() {
    naturals := make(chan int)
    squares := make(chan int)
    go counter(naturals)
    go squarer(squares, naturals)
    printer(squares)
}

調用counter(naturals)時,naturals的類型將隱式地從chan int轉換成chan<- int。調用printer(squares)也會導致相似的隱式轉換,這一次是轉換為<-chan int類型只接收型的channel。任何雙向channel向單向channel變量的賦值操作都將導致該隱式轉換。這裡並沒有反向轉換的語法:也就是不能將一個類似chan<- int類型的單向型的channel轉換為chan int類型的雙向型的channel。

8.4.4. 帶緩存的Channels

帶緩存的Channel內部持有一個元素隊列。隊列的最大容量是在調用make函數創建channel時通過第二個參數指定的。下面的語句創建了一個可以持有三個字符串元素的帶緩存Channel。圖8.2是ch變量對應的channel的圖形表示形式。

ch = make(chan string, 3)

向緩存Channel的發送操作就是向內部緩存隊列的尾部插入元素,接收操作則是從隊列的頭部刪除元素。如果內部緩存隊列是滿的,那麼發送操作將阻塞直到因另一個goroutine執行接收操作而釋放了新的隊列空間。相反,如果channel是空的,接收操作將阻塞直到有另一個goroutine執行發送操作而向隊列插入元素。

我們可以在無阻塞的情況下連續向新創建的channel發送三個值:

ch <- "A"
ch <- "B"
ch <- "C"

此刻,channel的內部緩存隊列將是滿的(圖8.3),如果有第四個發送操作將發生阻塞。

如果我們接收一個值,

fmt.Println(<-ch) // "A"

那麼channel的緩存隊列將不是滿的也不是空的(圖8.4),因此對該channel執行的發送或接收操作都不會發生阻塞。通過這種方式,channel的緩存隊列解耦了接收和發送的goroutine。

在某些特殊情況下,程序可能需要知道channel內部緩存的容量,可以用內置的cap函數獲取:

fmt.Println(cap(ch)) // "3"

同樣,對於內置的len函數,如果傳入的是channel,那麼將返回channel內部緩存隊列中有效元素的個數。因為在併發程序中該信息會隨著接收操作而失效,但是它對某些故障診斷和性能優化會有幫助。

fmt.Println(len(ch)) // "2"

在繼續執行兩次接收操作後channel內部的緩存隊列將又成為空的,如果有第四個接收操作將發生阻塞:

fmt.Println(<-ch) // "B"
fmt.Println(<-ch) // "C"

在這個例子中,發送和接收操作都發生在同一個goroutine中,但是在真實的程序中它們一般由不同的goroutine執行。Go語言新手有時候會將一個帶緩存的channel當作同一個goroutine中的隊列使用,雖然語法看似簡單,但實際上這是一個錯誤。Channel和goroutine的調度器機制是緊密相連的,如果沒有其他goroutine從channel接收,發送者——或許是整個程序——將會面臨永遠阻塞的風險。如果你只是需要一個簡單的隊列,使用slice就可以了。

下面的例子展示了一個使用了帶緩存channel的應用。它併發地向三個鏡像站點發出請求,三個鏡像站點分散在不同的地理位置。它們分別將收到的響應發送到帶緩存channel,最後接收者只接收第一個收到的響應,也就是最快的那個響應。因此mirroredQuery函數可能在另外兩個響應慢的鏡像站點響應之前就返回了結果。(順便說一下,多個goroutines併發地向同一個channel發送數據,或從同一個channel接收數據都是常見的用法。)

func mirroredQuery() string {
    responses := make(chan string, 3)
    go func() { responses <- request("asia.gopl.io") }()
    go func() { responses <- request("europe.gopl.io") }()
    go func() { responses <- request("americas.gopl.io") }()
    return <-responses // return the quickest response
}

func request(hostname string) (response string) { /* ... */ }

如果我們使用了無緩存的channel,那麼兩個慢的goroutines將會因為沒有人接收而被永遠卡住。這種情況,稱為goroutines洩漏,這將是一個BUG。和垃圾變量不同,洩漏的goroutines並不會被自動回收,因此確保每個不再需要的goroutine能正常退出是重要的。

關於無緩存或帶緩存channels之間的選擇,或者是帶緩存channels的容量大小的選擇,都可能影響程序的正確性。無緩存channel更強地保證了每個發送操作與相應的同步接收操作;但是對於帶緩存channel,這些操作是解耦的。同樣,即使我們知道將要發送到一個channel的信息的數量上限,創建一個對應容量大小的帶緩存channel也是不現實的,因為這要求在執行任何接收操作之前緩存所有已經發送的值。如果未能分配足夠的緩存將導致程序死鎖。

Channel的緩存也可能影響程序的性能。想象一家蛋糕店有三個廚師,一個烘焙,一個上糖衣,還有一個將每個蛋糕傳遞到它下一個廚師的生產線。在狹小的廚房空間環境,每個廚師在完成蛋糕後必須等待下一個廚師已經準備好接受它;這類似於在一個無緩存的channel上進行溝通。

如果在每個廚師之間有一個放置一個蛋糕的額外空間,那麼每個廚師就可以將一個完成的蛋糕臨時放在那裡而馬上進入下一個蛋糕的製作中;這類似於將channel的緩存隊列的容量設置為1。只要每個廚師的平均工作效率相近,那麼其中大部分的傳輸工作將是迅速的,個體之間細小的效率差異將在交接過程中彌補。如果廚師之間有更大的額外空間——也是就更大容量的緩存隊列——將可以在不停止生產線的前提下消除更大的效率波動,例如一個廚師可以短暫地休息,然後再加快趕上進度而不影響其他人。

另一方面,如果生產線的前期階段一直快於後續階段,那麼它們之間的緩存在大部分時間都將是滿的。相反,如果後續階段比前期階段更快,那麼它們之間的緩存在大部分時間都將是空的。對於這類場景,額外的緩存並沒有帶來任何好處。

生產線的隱喻對於理解channels和goroutines的工作機制是很有幫助的。例如,如果第二階段是需要精心製作的複雜操作,一個廚師可能無法跟上第一個廚師的進度,或者是無法滿足第三階段廚師的需求。要解決這個問題,我們可以再僱傭另一個廚師來幫助完成第二階段的工作,他執行相同的任務但是獨立工作。這類似於基於相同的channels創建另一個獨立的goroutine。

我們沒有太多的空間展示全部細節,但是gopl.io/ch8/cake包模擬了這個蛋糕店,可以通過不同的參數調整。它還對上面提到的幾種場景提供對應的基準測試(§11.4) 。