Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

5.3 協程

這篇文章的大部分專注於將複雜程序解構為小型、模塊化組件的技巧。當一個帶有複雜行為的函數邏輯劃分為幾個獨立的、本身為函數的步驟時,這些函數叫做輔助函數或者子過程。子過程由主函數調用,主函數負責協調子函數的使用。

這一節中,我們使用協程,引入了一種不同的方式來解構複雜的計算。它是一種針對有序數據的任務處理方式。就像子過程那樣,協程會計算複雜計算的一小步。但是,在使用協程時,沒有主函數來協調結果。反之,協程會自發鏈接到一起來組成流水線。可能有一些協程消耗輸入數據,並把它發送到其它協程。也可能有一些協程,每個都對發送給它的數據執行簡單的處理步驟。最後可能有另外一些協程輸出最終結果。

協程和子過程的差異是概念上的:子過程在主函數中位於下級,但是協程都是平等的,它們協作組成流水線,不帶有任何上級函數來負責以特定順序調用它們。

這一節中,我們會學到 Python 如何通過yieldsend()語句來支持協程的構建。之後,我們會看到協程在流水線中的不同作用,以及協程如何支持多任務。

5.3.1 Python 協程

在之前一節中,我們介紹了生成器函數,它使用yield來返回一個值。Python 的生成器函數也可以使用(yield)語句來接受一個值。生成器對象上有兩個額外的方法:send()close(),創建了一個模型使對象可以消耗或產出值。定義了這些對象的生成器函數叫做協程。

協程可以通過(yield)語句來消耗值,向像下面這樣:

value = (yield)

使用這個語法,在帶參數調用對象的send方法之前,執行流會停留在這條語句上。

coroutine.send(data)

之後,執行會恢復,value會被賦為data的值。為了發射計算終止的信號,我們需要使用close()方法來關閉協程。這會在協程內部產生GeneratorExit異常,它可以由try/except子句來捕獲。

下面的例子展示了這些概念。它是一個協程,用於打印匹配所提供的模式串的字符串。

>>> def match(pattern):
        print('Looking for ' + pattern)
        try:
            while True:
                s = (yield)
                if pattern in s:
                    print(s)
        except GeneratorExit:
            print("=== Done ===")

我們可以使用一個模式串來初始化它,之後調用__next__()來開始執行:

>>> m = match("Jabberwock")
>>> m.__next__()
Looking for Jabberwock

__next__()的調用會執行函數體,所以"Looking for jabberwock"會被打印。語句會一直持續執行,直到遇到line = (yield)語句。之後,執行會暫停,並且等待一個發送給m的值。我們可以使用send來將值發送給它。

>>> m.send("the Jabberwock with eyes of flame")
the Jabberwock with eyes of flame
>>> m.send("came whiffling through the tulgey wood")
>>> m.send("and burbled as it came")
>>> m.close()
=== Done ===

當我們以一個值調用m.send時,協程m內部的求值會在line = (yield)語句處恢復,這裡會把發送的值賦給line變量。m中的語句會繼續求值,如果匹配的話會打印出那一行,並繼續執行循環,直到再次進入line = (yield)。之後,m中的求值會暫停,並在m.send調用後恢復。

我們可以將使用send()yield的函數鏈到一起來完成複雜的行為。例如,下面的函數將名為text的字符串分割為單詞,並把每個單詞發送給另一個協程。

每個單詞都發送給了綁定到next_coroutine的協程,使next_coroutine開始執行,而且這個函數暫停並等待。它在next_coroutine暫停之前會一直等待,隨後這個函數恢復執行,發送下一個單詞或執行完畢。

如果我們將上面定義的match和這個函數鏈到一起,我們就可以創建出一個程序,只打印出匹配特定單詞的單詞。

>>> text = 'Commending spending is offending to people pending lending!'
>>> matcher = match('ending')
>>> matcher.__next__()
Looking for ending
>>> read(text, matcher)
Commending
spending
offending
pending
lending!
=== Done ===

read函數向協程matcher發送每個單詞,協程打印出任何匹配pattern的輸入。在matcher協程中,s = (yield)一行等待每個發送進來的單詞,並且在執行到這一行之後將控制流交還給read

5.3.2 生產、過濾和消耗

協程基於如何使用yieldsend()而具有不同的作用:

  • 生產者創建序列中的物品,並使用send(),而不是(yield)
  • 過濾器使用(yield)來消耗物品並將結果使用send()發送給下一個步驟。
  • 消費者使用(yield)來消耗物品,但是從不發送。

上面的read函數是一個生產者的例子。它不使用(yield),但是使用send來生產數據。函數match是個消費者的例子。它不使用send發送任何東西,但是使用(yield)來消耗數據。我們可以將match拆分為過濾器和消費者。過濾器是一個協程,只發送與它的模式相匹配的字符串。

>>> def match_filter(pattern, next_coroutine):
        print('Looking for ' + pattern)
        try:
            while True:
                s = (yield)
                if pattern in s:
                    next_coroutine.send(s)
        except GeneratorExit:
            next_coroutine.close()

消費者是一個函數,只打印出發送給它的行:

>>> def print_consumer():
        print('Preparing to print')
        try:
            while True:
                line = (yield)
                print(line)
        except GeneratorExit:
            print("=== Done ===")

當過濾器或消費者被構建時,必須調用它的__next__方法來開始執行:

>>> printer = print_consumer()
>>> printer.__next__()
Preparing to print
>>> matcher = match_filter('pend', printer)
>>> matcher.__next__()
Looking for pend
>>> read(text, matcher)
spending
pending
=== Done ===

即使名稱filter暗示移除元素,過濾器也可以轉換元素。下面的函數是個轉換元素的過濾器的示例。它消耗字符串併發送一個字典,包含了每個不同的字母在字符串中的出現次數。

>>> def count_letters(next_coroutine):
        try:
            while True:
                s = (yield)
                counts = {letter:s.count(letter) for letter in set(s)}
                next_coroutine.send(counts)
        except GeneratorExit as e:
            next_coroutine.close()

我們可以使用它來計算文本中最常出現的字母,並使用一個消費者,將字典合併來找出最常出現的鍵。

>>> def sum_dictionaries():
        total = {}
        try:
            while True:
                counts = (yield)
                for letter, count in counts.items():
                    total[letter] = count + total.get(letter, 0)
        except GeneratorExit:
            max_letter = max(total.items(), key=lambda t: t[1])[0]
            print("Most frequent letter: " + max_letter)

為了在文件上運行這個流水線,我們必須首先按行讀取文件。之後,將結果發送給count_letters,最後發送給sum_dictionaries。我們可以服用read協程來讀取文件中的行。

>>> s = sum_dictionaries()
>>> s.__next__()
>>> c = count_letters(s)
>>> c.__next__()
>>> read(text, c)
Most frequent letter: n

5.3.3 多任務

生產者或過濾器並不受限於唯一的下游。它可以擁有多個協程作為它的下游,並使用send()向它們發送數據。例如,下面是read的一個版本,向多個下游發送字符串中的單詞:

>>> def read_to_many(text, coroutines):
        for word in text.split():
            for coroutine in coroutines:
                coroutine.send(word)
        for coroutine in coroutines:
            coroutine.close()

我們可以使用它來檢測多個單詞中的相同文本:

>>> m = match("mend")
>>> m.__next__()
Looking for mend
>>> p = match("pe")
>>> p.__next__()
Looking for pe
>>> read_to_many(text, [m, p])
Commending
spending
people
pending
=== Done ===
=== Done ===

首先,read_to_manym上調用了send(word)。這個協程正在等待循環中的text = (yield),之後打印出所發現的匹配,並且等待下一個send。之後執行流返回到了read_to_many,它向p發送相同的行。所以,text中的單詞會按照順序打印出來。