tokio task的通信和同步(3): 同步
tokio::sync
模塊提供了幾種狀態同步的機制:
- Mutex: 互斥鎖
- RwLock: 讀寫鎖
- Notify: 通知喚醒機制
- Barrier: 屏障
- Semaphore: 信號量
因為tokio是跨線程執行任務的,因此通常會使用Arc
來封裝這些同步原語,以使其能夠跨線程。例如:
#![allow(unused)] fn main() { let mutex = Arc::new(Mutex::new()); let rwlock = Arc::new(Mutex::RwLock()); }
Mutex互斥鎖
當多個併發任務(tokio task或線程)可能會修改同一個數據時,就會出現數據競爭現象(競態),具體表現為:某個任務對該數據的修改不生效或被覆蓋。
互斥鎖的作用,就是保護併發情況下可能會出現競態的代碼,這部分代碼稱為臨界區。當某個任務要執行臨界區中的代碼時,必須先申請鎖,申請成功,則可以執行這部分代碼,執行完成這部分代碼後釋放鎖。釋放鎖之前,其它任務無法再申請鎖,它們必須等待鎖被釋放。
假如某個任務一直持有鎖,其它任務將一直等待。因此,互斥鎖應當儘量快地釋放,這樣可以提高併發量。
簡單介紹完互斥鎖之後,再看tokio提供的互斥鎖。
tokio::sync::Mutex
使用new()來創建互斥鎖,使用lock()來申請鎖,申請鎖成功時將返回MutexGuard,並通過drop的方式來釋放鎖。
例如:
use std::sync::Arc; use tokio::{self, sync, runtime::Runtime, time::{self, Duration}}; fn main() { let rt = Runtime::new().unwrap(); rt.block_on(async { let mutex = Arc::new(sync::Mutex::new(0)); for i in 0..10 { let lock = Arc::clone(&mutex); tokio::spawn(async move { let mut data = lock.lock().await; *data += 1; println!("task: {}, data: {}", i, data); }); } time::sleep(Duration::from_secs(1)).await; }); }
輸出結果:
task: 0, data: 1
task: 2, data: 2
task: 3, data: 3
task: 4, data: 4
task: 1, data: 5
task: 7, data: 6
task: 9, data: 7
task: 6, data: 8
task: 5, data: 9
task: 8, data: 10
可以看到,任務的調度順序是隨機的,但是數據加1的操作是依次完成的。
需特別說明的是,tokio::sync::Mutex
其內部使用了標準庫的互斥鎖,即std::sync::Mutex
,而標準庫的互斥鎖是針對線程的,因此,使用tokio的互斥鎖時也會鎖住整個線程。此外,tokio::sync::Mutex
是對標準庫的Mutex的封裝,性能相對要更差一些。也因此,官方文檔中建議,如非必須,應使用標準庫的Mutex或性能更高的parking_lot
提供的互斥鎖,而不是tokio的Mutex。
例如,將上面的示例該成標準庫的Mutex鎖。
fn main() { let rt = Runtime::new().unwrap(); rt.block_on(async { let mutex = Arc::new(std::sync::Mutex::new(0)); for i in 0..10 { let lock = mutex.clone(); tokio::spawn(async move { let mut data = lock.lock().unwrap(); *data += 1; println!("task: {}, data: {}", i, data); }); } time::sleep(Duration::from_secs(1)).await; }); }
什麼情況下可以選擇使用tokio的Mutex?當跨await的時候,可以考慮使用tokio Mutex,因為這時使用標準庫的Mutex將編譯錯誤。當然,也有相應的解決方案。
什麼是跨await?每個await都代表一個異步任務,跨await即表示該異步任務中出現了至少一個子任務。而每個異步任務都可能會被tokio內部偷到不同的線程上執行,因此跨await時要求其父任務實現Send Trait,這是因為子任務中可能會引用父任務中的數據。
例如,下面定義的async函數中使用了標準庫的Mutex,且有子任務,這會編譯錯誤:
use std::sync::{Arc, Mutex, MutexGuard}; use tokio::{self, runtime::Runtime, time::{self, Duration}}; async fn add_1(mutex: &Mutex<u64>) { let mut lock = mutex.lock().unwrap(); *lock += 1; // 子任務,跨await,且引用了父任務中的數據 time::sleep(Duration::from_millis(*lock)).await; } fn main() { let rt = Runtime::new().unwrap(); rt.block_on(async { let mutex = Arc::new(Mutex::new(0)); for i in 0..10 { let lock = mutex.clone(); tokio::spawn(async move { add_1(&lock).await; }); } time::sleep(Duration::from_secs(1)).await; }); }
std::sync::MutexGuard
未實現Send,因此父任務async move{}
語句塊是非Send的,於是編譯報錯。但如果上面的示例中沒有子任務sleep().await
子任務,則編譯無錯,因為已經可以明確知道該Mutex所在的任務是在當前線程執行的。
對於上面的錯誤,可簡單地使用tokio::sync::Mutex
來修復。
use std::sync::Arc; use tokio::{ self, runtime::Runtime, sync::{Mutex, MutexGuard}, time::{self, Duration} }; async fn add_1(mutex: &Mutex<u64>) { let mut lock = mutex.lock().await; *lock += 1; time::sleep(Duration::from_millis(*lock)).await; } fn main() { let rt = Runtime::new().unwrap(); rt.block_on(async { let mutex = Arc::new(Mutex::new(0)); for i in 0..10 { let lock = mutex.clone(); tokio::spawn(async move { add_1(&lock).await; }); } time::sleep(Duration::from_secs(1)).await; }); }
前面已經說過,tokio的Mutex性能相對較差一些,因此可以不使用tokio Mutex的情況下,儘量不使用它。對於上面的需求,仍然可以繼續使用標準庫的Mutex,但需要做一些調整。
例如,可以在子任務await之前,把所有未實現Send的數據都drop掉,保證子任務無法引用父任務中的任何非Send數據。
#![allow(unused)] fn main() { use std::sync::{Arc, Mutex, MutexGuard}; async fn add_1(mutex: &Mutex<u64>) { { let mut lock = mutex.lock().unwrap(); *lock += 1; } // 子任務,跨await,不引用父任務中的數據 time::sleep(Duration::from_millis(10)).await; } }
這種方案的主要思想是讓子任務和父任務不要出現不安全的數據交叉。如果可以的話,應儘量隔離子任務和非Send數據所在的任務。上面的例子已經實現了這一點,但更好的方式是將子任務sleep().await
從這個函數中移走。
use std::sync::{Arc, Mutex}; #[allow(unused_imports)] use tokio::{ self, runtime::Runtime, sync, time::{self, Duration}}; async fn add_1(mutex: &Mutex<u64>) -> u64 { let mut lock = mutex.lock().unwrap(); *lock += 1; *lock } // 申請的互斥鎖在此被釋放 fn main() { let rt = Runtime::new().unwrap(); rt.block_on(async { let mutex = Arc::new(Mutex::new(0)); for i in 0..100 { let lock = mutex.clone(); tokio::spawn(async move { let n = add_1(&lock).await; time::sleep(Duration::from_millis(n)).await; }); } time::sleep(Duration::from_secs(1)).await; println!("data: {}", mutex.lock().unwrap()); }); }
另外注意,標準庫的Mutex存在毒鎖問題。所謂毒鎖,即某個持有互斥鎖的線程panic了,那麼這個鎖有可能永遠得不到釋放(除非線程panic之前已經釋放),也稱為被汙染的鎖。毒鎖問題可能很嚴重,因為出現毒鎖有可能意味著數據將從此開始不再準確,所以多數時候是直接讓毒鎖的panic向上傳播或單獨處理。但出現毒鎖並不總是危險的,所以標準庫也提供了對應的方案。
但tokio Mutex不存在毒鎖問題,在持有Mutex的線程panic時,tokio的做法是直接釋放鎖。
RwLock讀寫鎖
相比Mutex互斥鎖,讀寫鎖區分讀操作和寫操作,讀寫鎖允許多個讀鎖共存,但寫鎖獨佔。因此,在併發能力上它比Mutex要更好一些。
下面是官方文檔中的一個示例:
use tokio::sync::RwLock; #[tokio::main] async fn main() { let lock = RwLock::new(5); // 多個讀鎖共存 { // read()返回RwLockReadGuard let r1 = lock.read().await; let r2 = lock.read().await; assert_eq!(*r1, 5); // 對Guard解引用,即可得到其內部的值 assert_eq!(*r2, 5); } // 讀鎖(r1, r2)在此釋放 // 只允許一個寫鎖存在 { // write()返回RwLockWriteGuard let mut w = lock.write().await; *w += 1; assert_eq!(*w, 6); } // 寫鎖(w)被釋放 }
需注意,讀寫鎖有幾種不同的設計方式:
- 讀鎖優先: 只要有讀操作申請鎖,優先將鎖分配給讀操作。這種方式可以提供非常好的併發能力,但是大量的讀操作可能會長時間阻擋寫操作
- 寫鎖優先: 只要有寫操作申請鎖,優先將鎖分配給寫操作。這種方式可以保證寫操作不會被餓死,但會嚴重影響併發能力
tokio RwLock實現的是寫鎖優先,它的具體規則如下:
- 每次申請鎖時都將等待,申請鎖的異步任務被切換,CPU交還給調度器
- 如果申請的是讀鎖,並且此時沒有寫鎖存在,則申請成功,對應的任務被喚醒
- 如果申請的是讀鎖,但此時有寫鎖(包括寫鎖申請)的存在,那麼將等待所有的寫鎖釋放(因為寫鎖總是優先)
- 如果申請的是寫鎖,如果此時沒有讀鎖的存在,則申請成功
- 如果申請的是寫鎖,但此時有讀鎖的存在,那麼將等待當前正在持有的讀鎖釋放
注意,RwLock的寫鎖優先會很容易產生死鎖。例如,下面的代碼會產生死鎖:
use std::sync::Arc; use tokio::{self, runtime::Runtime, sync::RwLock, time::{self, Duration}}; fn main() { let rt = Runtime::new().unwrap(); rt.block_on(async { let lock = Arc::new(RwLock::new(0)); let lock1 = lock.clone(); tokio::spawn(async move { let n = lock1.read().await; time::sleep(Duration::from_secs(2)).await; let nn = lock1.read().await; }); time::sleep(Duration::from_secs(1)).await; let mut wn = lock.write().await; *wn = 2; }); }
上面示例中,按照時間的流程,首先會在子任務中申請讀鎖,1秒後在當前任務中申請寫鎖,再1秒後子任務申請讀鎖。
申請第一把讀鎖時,因為此時無鎖,所以讀鎖(即變量n)申請成功。1秒後申請寫鎖時,由於此時讀鎖n尚未釋放,因此寫鎖申請失敗,將等待。再1秒之後,繼續在子任務中申請讀鎖,但是此時有寫鎖申請存在,因此第二次申請讀鎖將等待,於是讀鎖寫鎖互相等待,死鎖出現了。
當要使用寫鎖時,如果要避免死鎖,一定要保證同一個任務中的任意兩次鎖申請之間,前面已經無鎖,並且寫鎖儘早釋放。
對於上面的示例,同一個子任務中申請兩次讀鎖,但是第二次申請讀鎖時,第一把讀鎖仍未釋放,這就產生了死鎖的可能。只需在第二次申請讀鎖前,將第一把讀鎖釋放即可。更完整一點,在寫鎖寫完數據後也手動釋放寫鎖(上面的示例中寫完就退出,寫鎖會自動釋放,因此無需手動釋放)。
use std::sync::Arc; use tokio::{self, runtime::Runtime, sync::RwLock, time::{self, Duration}}; fn main() { let rt = Runtime::new().unwrap(); rt.block_on(async { let lock = Arc::new(RwLock::new(0)); let lock1 = lock.clone(); tokio::spawn(async move { let n = lock1.read().await; drop(n); // 在申請第二把讀鎖前,先釋放第一把讀鎖 time::sleep(Duration::from_secs(2)).await; let nn = lock1.read().await; drop(nn); }); time::sleep(Duration::from_secs(1)).await; let mut wn = lock.write().await; *wn = 2; drop(wn); }); }
RwLock還有一些其它的方法,在理解了RwLock申請鎖的規則之後,這些方法都很容易理解,可以自行去查看官方手冊。
Notify通知喚醒
Notify提供了一種簡單的通知喚醒功能,它類似於只有一個信號燈的信號量。
下面是官方文檔中的示例:
use tokio::sync::Notify; use std::sync::Arc; #[tokio::main] async fn main() { let notify = Arc::new(Notify::new()); let notify2 = notify.clone(); tokio::spawn(async move { notify2.notified().await; println!("received notification"); }); println!("sending notification"); notify.notify_one(); }
Notify::new()
創建Notify實例,Notify實例初始時沒有permit位,permit可認為是執行權。
每當調用notified().await
時,將判斷此時是否有執行權,如果有,則可直接執行,否則將進入等待。因此,初始化之後立即調用notified().await
將會等待。
每當調用notify_one()
時,將產生一個執行權,但多次調用也最多隻有一個執行權。因此,調用notify_one()
之後再調用notified().await
則並無需等待。
如果同時有多個等待執行權的等候者,釋放一個執行權,在其它環境中可能會產生驚群現象,即大量等候者被一次性同時喚醒去爭搶一個資源,搶到的可以繼續執行,而未搶到的等候者又重新被阻塞。好在,tokio Notify沒有這種問題,tokio使用隊列方式讓等候者進行排隊,先等待的總是先獲取到執行權,因此不會一次性喚醒所有等候者,而是隻喚醒隊列頭部的那個等候者。
Notify還有一個notify_waiters()
方法,它不會釋放執行權,但是它會一次性喚醒所有正在等待的等候者。嚴格來說,是讓當前已經註冊的等候者(即已經調用notified()
,但是還未await)在下次等待的時候,可以直接通過。
官方手冊給了一個示例:
use tokio::sync::Notify; use std::sync::Arc; #[tokio::main] async fn main() { let notify = Arc::new(Notify::new()); let notify2 = notify.clone(); // 註冊兩個等候者 let notified1 = notify.notified(); let notified2 = notify.notified(); let handle = tokio::spawn(async move { println!("sending notifications"); notify2.notify_waiters(); }); // 兩個等候者的await都會直接通過 notified1.await; notified2.await; println!("received notifications"); }
Barrier屏障
Barrier是一種讓多個併發任務在某種程度上保持進度同步的手段。
例如,一個任務分兩步,有很多個這種任務併發執行,但每個任務中的第二步都要求所有任務的第一步已經完成。這時可以在第二步之前使用屏障,這樣可以保證所有任務在開始第二步之前的進度是同步的。
當然,也不一定要等待所有任務的進度都同步,可以設置等待一部分任務的進度同步。也就是說,讓併發任務的進度按批次進行同步。第一批的任務進度都同步後,這一批任務將通過屏障,但是該屏障依然會阻擋下一批任務,直到下一批任務的進度都同步之後才放行。
官方文檔給了一個示例,不算經典,但有助於理解:
#![allow(unused)] fn main() { use tokio::sync::Barrier; use std::sync::Arc; let mut handles = Vec::with_capacity(10); // 參數10表示屏障寬度為10,只等待10個任務達到屏障點就放行這一批任務 // 也就是說,某時刻已經有9個任務在等待,當第10個任務調用wait的時候,屏障將放行這一批 let barrier = Arc::new(Barrier::new(10)); for _ in 0..10 { let c = barrier.clone(); handles.push(tokio::spawn(async move { println!("before wait"); // 在此設置屏障,保證10個任務都已輸出before wait才繼續向下執行 let wait_result = c.wait().await; println!("after wait"); wait_result })); } let mut num_leaders = 0; for handle in handles { let wait_result = handle.await.unwrap(); if wait_result.is_leader() { num_leaders += 1; } } assert_eq!(num_leaders, 1); }
Barrier調用wait()
方法時,返回BarrierWaitResult
,該結構有一個is_leader()
方法,可以用來判斷某個任務是否是該批次任務中的第一個任務。每一批通過屏障的任務都只有一個leader,其餘非leader任務調用is_leader()
都將返回false。
使用屏障時,一定要保證可以到達屏障點的併發任務數量是屏障寬度的整數倍,否則多出來的任務將一直等待。例如,將屏障的寬度設置為10(即10個任務一批),但是有15個併發任務,多出來的5個任務無法湊成完整的一批,這5個任務將一直等待。
use std::sync::Arc; use tokio::sync::Barrier; use tokio::{ self, runtime::Runtime, time::{self, Duration} }; fn main() { let rt = Runtime::new().unwrap(); rt.block_on(async { let barrier = Arc::new(Barrier::new(10)); for i in 1..=15 { let b = barrier.clone(); tokio::spawn(async move { println!("data before: {}", i); b.wait().await; // 15個任務中,多出5個任務將一直在此等待 time::sleep(Duration::from_millis(10)).await; println!("data after: {}", i); }); } time::sleep(Duration::from_secs(5)).await; }); }
Semaphore信號量
信號量可以保證在某一時刻最多運行指定數量的併發任務。
使用信號量時,需在初始化時指定信號燈(tokio中的SemaphorePermit)的數量,每當任務要執行時,將從中取走一個信號燈,當任務完成時(信號燈被drop)會歸還信號燈。當某個任務要執行時,如果此時信號燈數量為0,則該任務將等待,直到有信號燈被歸還。因此,信號量通常用來提供類似於限量的功能。
例如,信號燈數量為1,表示所有併發任務必須串行運行,這種模式和互斥鎖是類似的。再例如,信號燈數量設置為2,表示最多隻有兩個任務可以併發執行,如果有第三個任務,則必須等前兩個任務中的某一個先完成。
例如:
use chrono::Local; use std::sync::Arc; use tokio::{ self, runtime::Runtime, sync::Semaphore, time::{self, Duration}}; fn now() -> String { Local::now().format("%F %T").to_string() } fn main() { let rt = Runtime::new().unwrap(); rt.block_on(async { // 只有3個信號燈的信號量 let semaphore = Arc::new(Semaphore::new(3)); // 5個併發任務,每個任務執行前都先獲取信號燈 // 因此,同一時刻最多隻有3個任務進行併發 for i in 1..=5 { let semaphore = semaphore.clone(); tokio::spawn(async move { let _permit = semaphore.acquire().await.unwrap(); println!("{}, {}", i, now()); time::sleep(Duration::from_secs(1)).await; }); } time::sleep(Duration::from_secs(3)).await; }); }
輸出結果:
3, 2021-11-17 17:06:38
1, 2021-11-17 17:06:38
2, 2021-11-17 17:06:38
4, 2021-11-17 17:06:39
5, 2021-11-17 17:06:39
tokio::sync::Semaphore
提供了以下一些方法:
- close(): 關閉信號量,關閉信號量時,將喚醒所有的信號燈等待者
- is_closed(): 檢查信號量是否已經被關閉
- acquire(): 獲取一個信號燈,如果信號量已經被關閉,則返回錯誤AcquireError
- acquire_many(): 獲取指定數量的信號燈,如果信號燈數量不夠則等待,如果信號量已經被關閉,則返回AcquireError
- add_permits(): 向信號量中額外添加N個信號燈
- available_permits(): 當前信號量中剩餘的信號燈數量
- try_acquire(): 不等待地嘗試獲取一個信號燈,如果信號量已經關閉,則返回TryAcquireError::Closed,如果目前信號燈數量為0,則返回TryAcquireError::NoPermits
- try_acquire_many(): 嘗試獲取指定數量的信號燈
- acquire_owned(): 獲取一個信號燈並消費掉信號量
- acquire_many_owned(): 獲取指定數量的信號燈並消費掉信號量
- try_acquire_owned(): 嘗試獲取信號燈並消費掉信號量
- try_acquire_many_owned(): 嘗試獲取指定數量的信號燈並消費掉信號量
對於獲取到的信號燈SemaphorePermit,有一個forget()
方法,該方法可以將信號燈不歸還給信號量,因此信號量中的信號燈將永久性地減少(當然,可使用add_permits()
添加額外的信號燈)。
信號量的限量功能,也可以通過sync::mpsc
通道來實現。大致邏輯為:設置通道寬度為允許的最大併發任務數量,並先填滿通道,當執行一個任務時,先從通道取走一個消息,再執行任務,每次執行完任務後都重新向通道中回補一個消息。