16|Semaphore:一篇文章搞懂訊號量
你好,我是鳥窩。
本章導讀
Semaphore 控制併發數量圖
可用許可數 = 3
任務1 ─Acquire─┐
任務2 ─Acquire─┼──> [執行中](最多 3 個)
任務3 ─Acquire─┘
任務4 ─Acquire────────> [等待許可]
任務完成 -> Release -> 等待中的任務取得許可繼續
在前面的課程裡,我們學習了標準庫的併發原語、原子操作和 Channel,掌握了這些,你就可以解決 80% 的併發程式設計問題了。但是,如果你要想進一步提升你的併發程式設計能力,就需要學習一些第三方庫。
所以,在接下來的幾節課裡,我會給你分享 Go 官方或者其他人提供的第三方庫,這節課我們先來學習訊號量,訊號量(Semaphore)是用來控制多個 goroutine 同時訪問多個資源的併發原語。
訊號量是什麼?都有什麼操作?
訊號量的概念是荷蘭電腦科學家 Edsger Dijkstra 在 1963 年左右提出來的,廣泛應用在不同的作業系統中。在系統中,會給每一個程式一個訊號量,代表每個程式目前的狀態。未得到控制權的程式,會在特定的地方被迫停下來,等待可以繼續進行的訊號到來。
最簡單的訊號量就是一個變數加一些併發控制的能力,這個變數是 0 到 n 之間的一個數值。當 goroutine 完成對此訊號量的等待(wait)時,該計數值就減 1,當 goroutine 完成對此訊號量的釋放(release)時,該計數值就加 1。當計數值為 0 的時候,goroutine 呼叫 wait 等待該訊號量是不會成功的,除非計數器又大於 0,等待的 goroutine 才有可能成功返回。
更復雜的訊號量型別,就是使用抽象資料型別代替變數,用來代表複雜的資源型別。實際上,大部分的訊號量都使用一個整型變數來表示一組資源,並沒有實作太複雜的抽象資料型別,所以你只要知道有更復雜的訊號量就行了,我們這節課主要是學習最簡單的訊號量。
說到這兒呢,我想借助一個生活中的例子,來幫你進一步理解訊號量。
舉個例子,圖書館新購買了 10 本《Go 併發程式設計的獨家秘籍》,有 1 萬個學生都想讀這本書,“僧多粥少”。所以,圖書館管理員先會讓這 1 萬個同學進行登記,按照登記的順序,借閱此書。如果書全部被借走,那麼,其他想看此書的同學就需要等待,如果有人還書了,圖書館管理員就會通知下一位同學來借閱這本書。這裡的資源是《Go 併發程式設計的獨家秘籍》這十本書,想讀此書的同學就是 goroutine,圖書管理員就是訊號量。
怎麼樣,現在是不是很好理解了?那麼,接下來,我們來學習下訊號量的 P/V 操作。
P/V 操作
Dijkstra 在他的論文中為訊號量定義了兩個操作 P 和 V。P 操作(descrease、wait、acquire)是減少訊號量的計數值,而 V 操作(increase、signal、release)是增加訊號量的計數值。
使用偽程式碼表示如下(中括號代表原子操作):
function V(semaphore S, integer I):
[S ← S + I]
function P(semaphore S, integer I):
repeat:
[if S ≥ I:
S ← S − I
break]
可以看到,初始化訊號量 S 有一個指定數量(n)的資源,它就像是一個有 n 個資源的池子。P 操作相當於請求資源,如果資源可用,就立即返回;如果沒有資源或者不夠,那麼,它可以不斷嘗試或者阻塞等待。V 操作會釋放自己持有的資源,把資源返還給訊號量。訊號量的值除了初始化的操作以外,只能由 P/V 操作改變。
現在,我們來總結下訊號量的實作。
- 初始化訊號量:設定初始的資源的數量。
- P 操作:將訊號量的計數值減去 1,如果新值已經為負,那麼呼叫者會被阻塞並加入到等待佇列中。否則,呼叫者會繼續執行,並且獲得一個資源。
- V 操作:將訊號量的計數值加 1,如果先前的計數值為負,就說明有等待的 P 操作的呼叫者。它會從等待佇列中取出一個等待的呼叫者,喚醒它,讓它繼續執行。
講到這裡,我想再稍微說一個題外話,我們在第 2 講提到過飢餓,就是說在高併發的極端場景下,會有些 goroutine 始終搶不到鎖。為了處理飢餓的問題,你可以在等待佇列中做一些“文章”。比如實作一個優先順序的佇列,或者先入先出的佇列,等等,保持公平性,並且照顧到優先順序。
在正式進入實作訊號量的具體實作原理之前,我想先講一個知識點,就是訊號量和互斥鎖的區別與聯絡,這有助於我們掌握接下來的內容。
其實,訊號量可以分為計數訊號量(counting semaphre)和二進位訊號量(binary semaphore)。剛剛所說的圖書館借書的例子就是一個計數訊號量,它的計數可以是任意一個整數。在特殊的情況下,如果計數值只能是 0 或者 1,那麼,這個訊號量就是二進位訊號量,提供了互斥的功能(要麼是 0,要麼是 1),所以,有時候互斥鎖也會使用二進位訊號量來實作。
我們一般用訊號量保護一組資源,比如資料庫連線池、一組客戶端的連線、幾個印表機資源,等等。如果訊號量蛻變成二進位訊號量,那麼,它的 P/V 就和互斥鎖的 Lock/Unlock 一樣了。
有人會很細緻地區分二進位訊號量和互斥鎖。比如說,有人提出,在 Windows 系統中,互斥鎖只能由持有鎖的執行緒釋放鎖,而二進位訊號量則沒有這個限制(Stack Overflow上也有相關的討論)。實際上,雖然在 Windows 系統中,它們的確有些區別,但是對 Go 語言來說,互斥鎖也可以由非持有的 goroutine 來釋放,所以,從行為上來說,它們並沒有嚴格的區別。
我個人認為,沒必要進行細緻的區分,因為互斥鎖並不是一個很嚴格的定義。實際在遇到互斥併發的問題時,我們一般選用互斥鎖。
好了,言歸正傳,剛剛我們掌握了訊號量的含義和具體操作方式,下面,我們就來具體瞭解下官方擴充套件庫的實作。
Go 官方擴充套件庫的實作
在執行時,Go 內部使用訊號量來控制 goroutine 的阻塞和喚醒。我們在學習基本併發原語的實作時也看到了,比如互斥鎖的第二個欄位:
type Mutex struct {
state int32
sema uint32
}
訊號量的 P/V 操作是透過函式實作的:
func runtime_Semacquire(s *uint32)
func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)
func runtime_Semrelease(s *uint32, handoff bool, skipframes int)
遺憾的是,它是 Go 執行時內部使用的,並沒有封裝暴露成一個對外的訊號量併發原語,原則上我們沒有辦法使用。不過沒關係,Go 在它的擴充套件包中提供了訊號量semaphore,不過這個訊號量的型別名並不叫 Semaphore,而是叫 Weighted。
之所以叫做 Weighted,我想,應該是因為可以在初始化建立這個訊號量的時候設定權重(初始化的資源數),其實我覺得叫 Semaphore 或許會更好。

我們來分析下這個訊號量的幾個實作方法。
- Acquire 方法:相當於 P 操作,你可以一次獲取多個資源,如果沒有足夠多的資源,呼叫者就會被阻塞。它的第一個引數是 Context,這就意味著,你可以透過 Context 增加超時或者 cancel 的機制。如果是正常獲取了資源,就返回 nil;否則,就返回 ctx.Err(),訊號量不改變。
- Release 方法:相當於 V 操作,可以將 n 個資源釋放,返還給訊號量。
- TryAcquire 方法:嘗試獲取 n 個資源,但是它不會阻塞,要麼成功獲取 n 個資源,返回 true,要麼一個也不獲取,返回 false。
知道了訊號量的實作方法,在實際的場景中,我們應該怎麼用呢?我來舉個 Worker Pool 的例子,來幫助你理解。
我們建立和 CPU 核數一樣多的 Worker,讓它們去處理一個 4 倍數量的整數 slice。每個 Worker 一次只能處理一個整數,處理完之後,才能處理下一個。
當然,這個問題的解決方案有很多種,這一次我們使用訊號量,程式碼如下:
var (
maxWorkers = runtime.GOMAXPROCS(0) // worker數量
sema = semaphore.NewWeighted(int64(maxWorkers)) //訊號量
task = make([]int, maxWorkers*4) // 任務數,是worker的四倍
)
func main() {
ctx := context.Background()
for i := range task {
// 如果沒有worker可用,會阻塞在這裡,直到某個worker被釋放
if err := sema.Acquire(ctx, 1); err != nil {
break
}
// 啟動worker goroutine
go func(i int) {
defer sema.Release(1)
time.Sleep(100 * time.Millisecond) // 模擬一個耗時操作
task[i] = i + 1
}(i)
}
// 請求所有的worker,這樣能確保前面的worker都執行完
if err := sema.Acquire(ctx, int64(maxWorkers)); err != nil {
log.Printf("獲取所有的worker失敗: %v", err)
}
fmt.Println(task)
}
在這段程式碼中,main goroutine 相當於一個 dispacher,負責任務的分發。它先請求訊號量,如果獲取成功,就會啟動一個 goroutine 去處理計算,然後,這個 goroutine 會釋放這個訊號量(有意思的是,訊號量的獲取是在 main goroutine,訊號量的釋放是在 worker goroutine 中),如果獲取不成功,就等到有訊號量可以使用的時候,再去獲取。
需要提醒你的是,其實,在這個例子中,還有一個值得我們學習的知識點,就是最後的那一段處理(第 25 行)。如果在實際應用中,你想等所有的 Worker 都執行完,就可以獲取最大計數值的訊號量。
Go 擴充套件庫中的訊號量是使用互斥鎖 +List 實作的。互斥鎖實作其它欄位的保護,而 List 實作了一個等待佇列,等待者的通知是透過 Channel 的通知機制實作的。
我們來看一下訊號量 Weighted 的資料結構:
type Weighted struct {
size int64 // 最大資源數
cur int64 // 當前已被使用的資源
mu sync.Mutex // 互斥鎖,對欄位的保護
waiters list.List // 等待佇列
}
在訊號量的幾個實作方法裡,Acquire 是程式碼最複雜的一個方法,它不僅僅要監控資源是否可用,而且還要檢測 Context 的 Done 是否已關閉。我們來看下它的實作程式碼。
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
s.mu.Lock()
// fast path, 如果有足夠的資源,都不考慮ctx.Done的狀態,將cur加上n就返回
if s.size-s.cur >= n && s.waiters.Len() == 0 {
s.cur += n
s.mu.Unlock()
return nil
}
// 如果是不可能完成的任務,請求的資源數大於能提供的最大的資源數
if n > s.size {
s.mu.Unlock()
// 依賴ctx的狀態返回,否則一直等待
<-ctx.Done()
return ctx.Err()
}
// 否則就需要把呼叫者加入到等待佇列中
// 建立了一個ready chan,以便被通知喚醒
ready := make(chan struct{})
w := waiter{n: n, ready: ready}
elem := s.waiters.PushBack(w)
s.mu.Unlock()
// 等待
select {
case <-ctx.Done(): // context的Done被關閉
err := ctx.Err()
s.mu.Lock()
select {
case <-ready: // 如果被喚醒了,忽略ctx的狀態
err = nil
default: 通知waiter
isFront := s.waiters.Front() == elem
s.waiters.Remove(elem)
// 通知其它的waiters,檢查是否有足夠的資源
if isFront && s.size > s.cur {
s.notifyWaiters()
}
}
s.mu.Unlock()
return err
case <-ready: // 被喚醒了
return nil
}
}
其實,為了提高效能,這個方法中的 fast path 之外的程式碼,可以抽取成 acquireSlow 方法,以便其它 Acquire 被內聯。
Release 方法將當前計數值減去釋放的資源數 n,並喚醒等待佇列中的呼叫者,看是否有足夠的資源被獲取。
func (s *Weighted) Release(n int64) {
s.mu.Lock()
s.cur -= n
if s.cur < 0 {
s.mu.Unlock()
panic("semaphore: released more than held")
}
s.notifyWaiters()
s.mu.Unlock()
}
notifyWaiters 方法就是逐個檢查等待的呼叫者,如果資源不夠,或者是沒有等待者了,就返回:
func (s *Weighted) notifyWaiters() {
for {
next := s.waiters.Front()
if next == nil {
break // No more waiters blocked.
}
w := next.Value.(waiter)
if s.size-s.cur < w.n {
//避免飢餓,這裡還是按照先入先出的方式處理
break
}
s.cur += w.n
s.waiters.Remove(next)
close(w.ready)
}
}
notifyWaiters 方法是按照先入先出的方式喚醒呼叫者。當釋放 100 個資源的時候,如果第一個等待者需要 101 個資源,那麼,佇列中的所有等待者都會繼續等待,即使有的等待者只需要 1 個資源。這樣做的目的是避免飢餓,否則的話,資源可能總是被那些請求資源數小的呼叫者獲取,這樣一來,請求資源數巨大的呼叫者,就沒有機會獲得資源了。
好了,到這裡,你就知道了官方擴充套件庫的訊號量實作方法,接下來你就可以使用訊號量了。不過,在此之前呢,我想給你講幾個使用時的常見錯誤。這部分內容可是幫助你避坑的,我建議你好好學習。
使用訊號量的常見錯誤
保證訊號量不出錯的前提是正確地使用它,否則,公平性和安全性就會受到損害,導致程式 panic。
在使用訊號量時,最常見的幾個錯誤如下:
- 請求了資源,但是忘記釋放它;
- 釋放了從未請求的資源;
- 長時間持有一個資源,即使不需要它;
- 不持有一個資源,卻直接使用它。
不過,即使你規避了這些坑,在同時使用多種資源,不同的訊號量控制不同的資源的時候,也可能會出現死鎖現象,比如哲學家就餐問題。
就 Go 擴充套件庫實作的訊號量來說,在呼叫 Release 方法的時候,你可以傳遞任意的整數。但是,如果你傳遞一個比請求到的數量大的錯誤的數值,程式就會 panic。如果傳遞一個負數,會導致資源永久被持有。如果你請求的資源數比最大的資源數還大,那麼,呼叫者可能永遠被阻塞。
所以,使用訊號量遵循的原則就是請求多少資源,就釋放多少資源。你一定要注意,必須使用正確的方法傳遞整數,不要“耍小聰明”,而且,請求的資源數一定不要超過最大資源數。
其它訊號量的實作
除了官方擴充套件庫的實作,實際上,我們還有很多方法實作訊號量,比較典型的就是使用 Channel 來實作。
根據之前的 Channel 型別的介紹以及 Go 記憶體模型的定義,你應該能想到,使用一個 buffer 為 n 的 Channel 很容易實作訊號量,比如下面的程式碼,我們就是使用 chan struct{}型別來實作的。
在初始化這個訊號量的時候,我們設定它的初始容量,代表有多少個資源可以使用。它使用 Lock 和 Unlock 方法實作請求資源和釋放資源,正好實作了 Locker 介面。
// Semaphore 資料結構,並且還實作了Locker介面
type semaphore struct {
sync.Locker
ch chan struct{}
}
// 建立一個新的訊號量
func NewSemaphore(capacity int) sync.Locker {
if capacity <= 0 {
capacity = 1 // 容量為1就變成了一個互斥鎖
}
return &semaphore{ch: make(chan struct{}, capacity)}
}
// 請求一個資源
func (s *semaphore) Lock() {
s.ch <- struct{}{}
}
// 釋放資源
func (s *semaphore) Unlock() {
<-s.ch
}
當然,你還可以自己擴充套件一些方法,比如在請求資源的時候使用 Context 引數(Acquire(ctx))、實作 TryLock 等功能。
看到這裡,你可能會問,這個訊號量的實作看起來非常簡單,而且也能應對大部分的訊號量的場景,為什麼官方擴充套件庫的訊號量的實作不採用這種方法呢?其實,具體是什麼原因,我也不知道,但是我必須要強調的是,官方的實作方式有這樣一個功能:它可以一次請求多個資源,這是透過 Channel 實作的訊號量所不具備的。
除了 Channel,marusama/semaphore也實作了一個可以動態更改資源容量的訊號量,也是一個非常有特色的實作。如果你的資源數量並不是固定的,而是動態變化的,我建議你考慮一下這個訊號量庫。
總結
這是一個很奇怪的現象:標準庫中實作基本併發原語(比如 Mutex)的時候,強烈依賴訊號量實作等待佇列和通知喚醒,但是,標準庫中卻沒有把這個實作直接暴露出來放到標準庫,而是透過第三庫提供。
不管怎樣,訊號量這個併發原語在多資源共享的併發控制的場景中被廣泛使用,有時候也會被 Channel 型別所取代,因為一個 buffered chan 也可以代表 n 個資源。
但是,官方擴充套件的訊號量也有它的優勢,就是可以一次獲取多個資源。在批次獲取資源的場景中,我建議你嘗試使用官方擴充套件的訊號量。

思考題
- 你能用 Channel 實作訊號量併發原語嗎?你能想到幾種實作方式?
- 為什麼訊號量的資源數設計成 int64 而不是 uint64 呢?
歡迎在留言區寫下你的思考和答案,我們一起交流討論。如果你覺得有所收穫,也歡迎你把今天的內容分享給你的朋友或同事。