Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

16|Semaphore:一篇文章搞懂訊號量

你好,我是鳥窩。

本章導讀

                  Semaphore 控制併發數量圖

可用許可數 = 3

任務1 ─Acquire─┐
任務2 ─Acquire─┼──> [執行中](最多 3 個)
任務3 ─Acquire─┘
任務4 ─Acquire────────> [等待許可]

任務完成 -> Release -> 等待中的任務取得許可繼續

在前面的課程裡,我們學習了標準庫的併發原語、原子操作和 Channel,掌握了這些,你就可以解決 80% 的併發程式設計問題了。但是,如果你要想進一步提升你的併發程式設計能力,就需要學習一些第三方庫。

所以,在接下來的幾節課裡,我會給你分享 Go 官方或者其他人提供的第三方庫,這節課我們先來學習訊號量,訊號量(Semaphore)是用來控制多個 goroutine 同時訪問多個資源的併發原語。

訊號量是什麼?都有什麼操作?

訊號量的概念是荷蘭電腦科學家 Edsger Dijkstra 在 1963 年左右提出來的,廣泛應用在不同的作業系統中。在系統中,會給每一個程式一個訊號量,代表每個程式目前的狀態。未得到控制權的程式,會在特定的地方被迫停下來,等待可以繼續進行的訊號到來。

最簡單的訊號量就是一個變數加一些併發控制的能力,這個變數是 0 到 n 之間的一個數值。當 goroutine 完成對此訊號量的等待(wait)時,該計數值就減 1,當 goroutine 完成對此訊號量的釋放(release)時,該計數值就加 1。當計數值為 0 的時候,goroutine 呼叫 wait 等待該訊號量是不會成功的,除非計數器又大於 0,等待的 goroutine 才有可能成功返回。

更復雜的訊號量型別,就是使用抽象資料型別代替變數,用來代表複雜的資源型別。實際上,大部分的訊號量都使用一個整型變數來表示一組資源,並沒有實作太複雜的抽象資料型別,所以你只要知道有更復雜的訊號量就行了,我們這節課主要是學習最簡單的訊號量。

說到這兒呢,我想借助一個生活中的例子,來幫你進一步理解訊號量。

舉個例子,圖書館新購買了 10 本《Go 併發程式設計的獨家秘籍》,有 1 萬個學生都想讀這本書,“僧多粥少”。所以,圖書館管理員先會讓這 1 萬個同學進行登記,按照登記的順序,借閱此書。如果書全部被借走,那麼,其他想看此書的同學就需要等待,如果有人還書了,圖書館管理員就會通知下一位同學來借閱這本書。這裡的資源是《Go 併發程式設計的獨家秘籍》這十本書,想讀此書的同學就是 goroutine,圖書管理員就是訊號量。

怎麼樣,現在是不是很好理解了?那麼,接下來,我們來學習下訊號量的 P/V 操作。

P/V 操作

Dijkstra 在他的論文中為訊號量定義了兩個操作 P 和 V。P 操作(descrease、wait、acquire)是減少訊號量的計數值,而 V 操作(increase、signal、release)是增加訊號量的計數值。

使用偽程式碼表示如下(中括號代表原子操作):

function V(semaphore S, integer I):
    [S ← S + I]

function P(semaphore S, integer I):
    repeat:
        [if S ≥ I:
        S ← S − I
        break]

可以看到,初始化訊號量 S 有一個指定數量(n)的資源,它就像是一個有 n 個資源的池子。P 操作相當於請求資源,如果資源可用,就立即返回;如果沒有資源或者不夠,那麼,它可以不斷嘗試或者阻塞等待。V 操作會釋放自己持有的資源,把資源返還給訊號量。訊號量的值除了初始化的操作以外,只能由 P/V 操作改變。

現在,我們來總結下訊號量的實作。

  1. 初始化訊號量:設定初始的資源的數量。
  2. P 操作:將訊號量的計數值減去 1,如果新值已經為負,那麼呼叫者會被阻塞並加入到等待佇列中。否則,呼叫者會繼續執行,並且獲得一個資源。
  3. V 操作:將訊號量的計數值加 1,如果先前的計數值為負,就說明有等待的 P 操作的呼叫者。它會從等待佇列中取出一個等待的呼叫者,喚醒它,讓它繼續執行。

講到這裡,我想再稍微說一個題外話,我們在第 2 講提到過飢餓,就是說在高併發的極端場景下,會有些 goroutine 始終搶不到鎖。為了處理飢餓的問題,你可以在等待佇列中做一些“文章”。比如實作一個優先順序的佇列,或者先入先出的佇列,等等,保持公平性,並且照顧到優先順序。

在正式進入實作訊號量的具體實作原理之前,我想先講一個知識點,就是訊號量和互斥鎖的區別與聯絡,這有助於我們掌握接下來的內容。

其實,訊號量可以分為計數訊號量(counting semaphre)和二進位訊號量(binary semaphore)。剛剛所說的圖書館借書的例子就是一個計數訊號量,它的計數可以是任意一個整數。在特殊的情況下,如果計數值只能是 0 或者 1,那麼,這個訊號量就是二進位訊號量,提供了互斥的功能(要麼是 0,要麼是 1),所以,有時候互斥鎖也會使用二進位訊號量來實作。

我們一般用訊號量保護一組資源,比如資料庫連線池、一組客戶端的連線、幾個印表機資源,等等。如果訊號量蛻變成二進位訊號量,那麼,它的 P/V 就和互斥鎖的 Lock/Unlock 一樣了。

有人會很細緻地區分二進位訊號量和互斥鎖。比如說,有人提出,在 Windows 系統中,互斥鎖只能由持有鎖的執行緒釋放鎖,而二進位訊號量則沒有這個限制(Stack Overflow上也有相關的討論)。實際上,雖然在 Windows 系統中,它們的確有些區別,但是對 Go 語言來說,互斥鎖也可以由非持有的 goroutine 來釋放,所以,從行為上來說,它們並沒有嚴格的區別。

我個人認為,沒必要進行細緻的區分,因為互斥鎖並不是一個很嚴格的定義。實際在遇到互斥併發的問題時,我們一般選用互斥鎖。

好了,言歸正傳,剛剛我們掌握了訊號量的含義和具體操作方式,下面,我們就來具體瞭解下官方擴充套件庫的實作。

Go 官方擴充套件庫的實作

在執行時,Go 內部使用訊號量來控制 goroutine 的阻塞和喚醒。我們在學習基本併發原語的實作時也看到了,比如互斥鎖的第二個欄位:

type Mutex struct {
    state int32
    sema  uint32
}

訊號量的 P/V 操作是透過函式實作的:

func runtime_Semacquire(s *uint32)
func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)
func runtime_Semrelease(s *uint32, handoff bool, skipframes int)

遺憾的是,它是 Go 執行時內部使用的,並沒有封裝暴露成一個對外的訊號量併發原語,原則上我們沒有辦法使用。不過沒關係,Go 在它的擴充套件包中提供了訊號量semaphore,不過這個訊號量的型別名並不叫 Semaphore,而是叫 Weighted。

之所以叫做 Weighted,我想,應該是因為可以在初始化建立這個訊號量的時候設定權重(初始化的資源數),其實我覺得叫 Semaphore 或許會更好。

我們來分析下這個訊號量的幾個實作方法。

  1. Acquire 方法:相當於 P 操作,你可以一次獲取多個資源,如果沒有足夠多的資源,呼叫者就會被阻塞。它的第一個引數是 Context,這就意味著,你可以透過 Context 增加超時或者 cancel 的機制。如果是正常獲取了資源,就返回 nil;否則,就返回 ctx.Err(),訊號量不改變。
  2. Release 方法:相當於 V 操作,可以將 n 個資源釋放,返還給訊號量。
  3. TryAcquire 方法:嘗試獲取 n 個資源,但是它不會阻塞,要麼成功獲取 n 個資源,返回 true,要麼一個也不獲取,返回 false。

知道了訊號量的實作方法,在實際的場景中,我們應該怎麼用呢?我來舉個 Worker Pool 的例子,來幫助你理解。

我們建立和 CPU 核數一樣多的 Worker,讓它們去處理一個 4 倍數量的整數 slice。每個 Worker 一次只能處理一個整數,處理完之後,才能處理下一個。

當然,這個問題的解決方案有很多種,這一次我們使用訊號量,程式碼如下:

var (
    maxWorkers = runtime.GOMAXPROCS(0)                    // worker數量
    sema       = semaphore.NewWeighted(int64(maxWorkers)) //訊號量
    task       = make([]int, maxWorkers*4)                // 任務數,是worker的四倍
)

func main() {
    ctx := context.Background()

    for i := range task {
        // 如果沒有worker可用,會阻塞在這裡,直到某個worker被釋放
        if err := sema.Acquire(ctx, 1); err != nil {
            break
        }

        // 啟動worker goroutine
        go func(i int) {
            defer sema.Release(1)
            time.Sleep(100 * time.Millisecond) // 模擬一個耗時操作
            task[i] = i + 1
        }(i)
    }

    // 請求所有的worker,這樣能確保前面的worker都執行完
    if err := sema.Acquire(ctx, int64(maxWorkers)); err != nil {
        log.Printf("獲取所有的worker失敗: %v", err)
    }

    fmt.Println(task)
}

在這段程式碼中,main goroutine 相當於一個 dispacher,負責任務的分發。它先請求訊號量,如果獲取成功,就會啟動一個 goroutine 去處理計算,然後,這個 goroutine 會釋放這個訊號量(有意思的是,訊號量的獲取是在 main goroutine,訊號量的釋放是在 worker goroutine 中),如果獲取不成功,就等到有訊號量可以使用的時候,再去獲取。

需要提醒你的是,其實,在這個例子中,還有一個值得我們學習的知識點,就是最後的那一段處理(第 25 行)。如果在實際應用中,你想等所有的 Worker 都執行完,就可以獲取最大計數值的訊號量

Go 擴充套件庫中的訊號量是使用互斥鎖 +List 實作的。互斥鎖實作其它欄位的保護,而 List 實作了一個等待佇列,等待者的通知是透過 Channel 的通知機制實作的。

我們來看一下訊號量 Weighted 的資料結構:

type Weighted struct {
    size    int64         // 最大資源數
    cur     int64         // 當前已被使用的資源
    mu      sync.Mutex    // 互斥鎖,對欄位的保護
    waiters list.List     // 等待佇列
}

在訊號量的幾個實作方法裡,Acquire 是程式碼最複雜的一個方法,它不僅僅要監控資源是否可用,而且還要檢測 Context 的 Done 是否已關閉。我們來看下它的實作程式碼。

func (s *Weighted) Acquire(ctx context.Context, n int64) error {
    s.mu.Lock()
        // fast path, 如果有足夠的資源,都不考慮ctx.Done的狀態,將cur加上n就返回
    if s.size-s.cur >= n && s.waiters.Len() == 0 {
      s.cur += n
      s.mu.Unlock()
      return nil
    }
  
        // 如果是不可能完成的任務,請求的資源數大於能提供的最大的資源數
    if n > s.size {
      s.mu.Unlock()
            // 依賴ctx的狀態返回,否則一直等待
      <-ctx.Done()
      return ctx.Err()
    }
  
        // 否則就需要把呼叫者加入到等待佇列中
        // 建立了一個ready chan,以便被通知喚醒
    ready := make(chan struct{})
    w := waiter{n: n, ready: ready}
    elem := s.waiters.PushBack(w)
    s.mu.Unlock()
  

        // 等待
    select {
    case <-ctx.Done(): // context的Done被關閉
      err := ctx.Err()
      s.mu.Lock()
      select {
      case <-ready: // 如果被喚醒了,忽略ctx的狀態
        err = nil
      default: 通知waiter
        isFront := s.waiters.Front() == elem
        s.waiters.Remove(elem)
        // 通知其它的waiters,檢查是否有足夠的資源
        if isFront && s.size > s.cur {
          s.notifyWaiters()
        }
      }
      s.mu.Unlock()
      return err
    case <-ready: // 被喚醒了
      return nil
    }
  }

其實,為了提高效能,這個方法中的 fast path 之外的程式碼,可以抽取成 acquireSlow 方法,以便其它 Acquire 被內聯。

Release 方法將當前計數值減去釋放的資源數 n,並喚醒等待佇列中的呼叫者,看是否有足夠的資源被獲取。

func (s *Weighted) Release(n int64) {
    s.mu.Lock()
    s.cur -= n
    if s.cur < 0 {
      s.mu.Unlock()
      panic("semaphore: released more than held")
    }
    s.notifyWaiters()
    s.mu.Unlock()
}

notifyWaiters 方法就是逐個檢查等待的呼叫者,如果資源不夠,或者是沒有等待者了,就返回:

func (s *Weighted) notifyWaiters() {
    for {
      next := s.waiters.Front()
      if next == nil {
        break // No more waiters blocked.
      }
  

      w := next.Value.(waiter)
      if s.size-s.cur < w.n {
        //避免飢餓,這裡還是按照先入先出的方式處理
        break
      }

      s.cur += w.n
      s.waiters.Remove(next)
      close(w.ready)
    }
  }

notifyWaiters 方法是按照先入先出的方式喚醒呼叫者。當釋放 100 個資源的時候,如果第一個等待者需要 101 個資源,那麼,佇列中的所有等待者都會繼續等待,即使有的等待者只需要 1 個資源。這樣做的目的是避免飢餓,否則的話,資源可能總是被那些請求資源數小的呼叫者獲取,這樣一來,請求資源數巨大的呼叫者,就沒有機會獲得資源了。

好了,到這裡,你就知道了官方擴充套件庫的訊號量實作方法,接下來你就可以使用訊號量了。不過,在此之前呢,我想給你講幾個使用時的常見錯誤。這部分內容可是幫助你避坑的,我建議你好好學習。

使用訊號量的常見錯誤

保證訊號量不出錯的前提是正確地使用它,否則,公平性和安全性就會受到損害,導致程式 panic。

在使用訊號量時,最常見的幾個錯誤如下:

  1. 請求了資源,但是忘記釋放它;
  2. 釋放了從未請求的資源;
  3. 長時間持有一個資源,即使不需要它;
  4. 不持有一個資源,卻直接使用它。

不過,即使你規避了這些坑,在同時使用多種資源,不同的訊號量控制不同的資源的時候,也可能會出現死鎖現象,比如哲學家就餐問題。

就 Go 擴充套件庫實作的訊號量來說,在呼叫 Release 方法的時候,你可以傳遞任意的整數。但是,如果你傳遞一個比請求到的數量大的錯誤的數值,程式就會 panic。如果傳遞一個負數,會導致資源永久被持有。如果你請求的資源數比最大的資源數還大,那麼,呼叫者可能永遠被阻塞。

所以,使用訊號量遵循的原則就是請求多少資源,就釋放多少資源。你一定要注意,必須使用正確的方法傳遞整數,不要“耍小聰明”,而且,請求的資源數一定不要超過最大資源數。

其它訊號量的實作

除了官方擴充套件庫的實作,實際上,我們還有很多方法實作訊號量,比較典型的就是使用 Channel 來實作。

根據之前的 Channel 型別的介紹以及 Go 記憶體模型的定義,你應該能想到,使用一個 buffer 為 n 的 Channel 很容易實作訊號量,比如下面的程式碼,我們就是使用 chan struct{}型別來實作的。

在初始化這個訊號量的時候,我們設定它的初始容量,代表有多少個資源可以使用。它使用 Lock 和 Unlock 方法實作請求資源和釋放資源,正好實作了 Locker 介面。

  // Semaphore 資料結構,並且還實作了Locker介面
  type semaphore struct {
    sync.Locker
    ch chan struct{}
  }
  
  // 建立一個新的訊號量
  func NewSemaphore(capacity int) sync.Locker {
    if capacity <= 0 {
      capacity = 1 // 容量為1就變成了一個互斥鎖
    }
    return &semaphore{ch: make(chan struct{}, capacity)}
  }
  
  // 請求一個資源
  func (s *semaphore) Lock() {
    s.ch <- struct{}{}
  }
  
  // 釋放資源
  func (s *semaphore) Unlock() {
    <-s.ch
  }

當然,你還可以自己擴充套件一些方法,比如在請求資源的時候使用 Context 引數(Acquire(ctx))、實作 TryLock 等功能。

看到這裡,你可能會問,這個訊號量的實作看起來非常簡單,而且也能應對大部分的訊號量的場景,為什麼官方擴充套件庫的訊號量的實作不採用這種方法呢?其實,具體是什麼原因,我也不知道,但是我必須要強調的是,官方的實作方式有這樣一個功能:它可以一次請求多個資源,這是透過 Channel 實作的訊號量所不具備的

除了 Channel,marusama/semaphore也實作了一個可以動態更改資源容量的訊號量,也是一個非常有特色的實作。如果你的資源數量並不是固定的,而是動態變化的,我建議你考慮一下這個訊號量庫。

總結

這是一個很奇怪的現象:標準庫中實作基本併發原語(比如 Mutex)的時候,強烈依賴訊號量實作等待佇列和通知喚醒,但是,標準庫中卻沒有把這個實作直接暴露出來放到標準庫,而是透過第三庫提供。

不管怎樣,訊號量這個併發原語在多資源共享的併發控制的場景中被廣泛使用,有時候也會被 Channel 型別所取代,因為一個 buffered chan 也可以代表 n 個資源。

但是,官方擴充套件的訊號量也有它的優勢,就是可以一次獲取多個資源。在批次獲取資源的場景中,我建議你嘗試使用官方擴充套件的訊號量

思考題

  1. 你能用 Channel 實作訊號量併發原語嗎?你能想到幾種實作方式?
  2. 為什麼訊號量的資源數設計成 int64 而不是 uint64 呢?

歡迎在留言區寫下你的思考和答案,我們一起交流討論。如果你覺得有所收穫,也歡迎你把今天的內容分享給你的朋友或同事。