理解tokio核心(2): task
本篇是介紹tokio核心的第二篇,理解tokio中的task。
何為tokio task?
tokio官方手冊tokio::task中用了一句話介紹task:Asynchronous green-threads(異步的綠色線程)。
Rust中的原生線程(std::thread
)是OS線程,每一個原生線程,都對應一個操作系統的線程。操作系統線程在內核層,由操作系統負責調度,缺點是涉及相關的系統調用,它有更重的線程上下文切換開銷。
green thread則是用戶空間的線程,由程序自身提供的調度器負責調度,由於不涉及系統調用,同一個OS線程內的多個綠色線程之間的上下文切換的開銷非常小,因此非常的輕量級。可以認為,它們就是一種特殊的協程。
解釋了何為綠色線程後,回到tokio的task概念。什麼是task呢?
每定義一個Future
(例如一個async語句塊就是一個Future),就定義了一個靜止的尚未執行的task,當它在runtime中開始運行的時候,它就是真正的task,一個真正的異步任務。
要注意,在tokio runtime中執行的並不都是異步任務,綁定在runtime中的可能是同步任務(例如一個數值計算就是一個同步任務,只是速度非常快,可忽略不計),可能會長時間計算,可能會阻塞整個線程,這一點在前一篇介紹runtime時詳細說明過。tokio嚴格區分異步任務和同步任務,只有異步任務才算是tokio task。tokio推薦的做法是將同步任務放入blocking thread中運行。
從官方手冊將task描述為綠色線程也能理解,tokio::task
只能是完全受tokio調度管理的異步任務,而不是脫離tokio調度控制的同步任務。
tokio::task
tokio::task
模塊本身提供了幾個函數:
- spawn:向runtime中添加新異步任務
- spawn_blocking:生成一個blocking thread並執行指定的任務
- block_in_place:在某個worker thread中執行同步任務,但是會將同線程中的其它異步任務轉移走,使得異步任務不會被同步任務飢餓
- yield_now: 立即放棄CPU,將線程交還給調度器,自己則進入就緒隊列等待下一輪的調度
- unconstrained: 將指定的異步任務聲明未不受限的異步任務,它將不受tokio的協作式調度,它將一直霸佔當前線程直到任務完成,不會受到tokio調度器的管理
- spawn_local: 生成一個在當前線程內運行,一定不會被偷到其它線程中運行的異步任務
這裡的三個spawn類的方法都返回JoinHandle類型,JoinHandle類型可以通過await來等待異步任務的完成,也可以通過abort()來中斷異步任務,異步任務被中斷後返回JoinError類型。
task::spawn()
這個很簡單,就是直接在當前的runtime中生成一個異步任務。
use chrono::Local; use std::thread; use tokio::{self, task, runtime::Runtime, time}; fn now() -> String { Local::now().format("%F %T").to_string() } fn main() { let rt = Runtime::new().unwrap(); let _guard = rt.enter(); task::spawn(async { time::sleep(time::Duration::from_secs(3)).await; println!("task over: {}", now()); }); thread::sleep(time::Duration::from_secs(4)); }
task::spawn_blocking()
生成一個blocking thread來執行指定的任務。在前一篇介紹runtime的文章中已經解釋清楚,這裡不再解釋。
#![allow(unused)] fn main() { let join = task::spawn_blocking(|| { // do some compute-heavy work or call synchronous code "blocking completed" }); let result = join.await?; assert_eq!(result, "blocking completed"); }
task::block_in_place()
block_in_place()
的目的和spawn_blocking()
類似。區別在於spawn_blocking()
會新生成一個blocking thread來執行指定的任務,而block_in_place()
是在當前worker thread中執行指定的可能會長時間運行或長時間阻塞線程的任務,但是它會先將該worker thread中已經存在的異步任務轉移到其它worker thread,使得這些異步任務不會被飢餓。
顯然,block_in_place()
只應該在多線程runtime環境中運行,如果是單線程runtime,block_in_place會阻塞唯一的那個worker thread。
#![allow(unused)] fn main() { use tokio::task; task::block_in_place(move || { // do some compute-heavy work or call synchronous code }); }
在block_in_place內部,可以使用block_on()或enter()重新進入runtime環境。
#![allow(unused)] fn main() { use tokio::task; use tokio::runtime::Handle; task::block_in_place(move || { Handle::current().block_on(async move { // do something async }); }); }
task::yield_now
讓當前任務立即放棄CPU,將worker thread交還給調度器,任務自身則進入調度器的就緒隊列等待下次被輪詢調度。類似於其它異步系統中的next_tick
行為。
需注意,調用yield_now()
後還需await才立即放棄CPU,因為yield_now本身是一個異步任務。
#![allow(unused)] fn main() { use tokio::task; async { task::spawn(async { // ... println!("spawned task done!") }); // Yield, allowing the newly-spawned task to execute first. task::yield_now().await; println!("main task done!"); } }
注意,yield後,任務調度的順序是未知的。有可能任務在發出yield後,緊跟著的下一輪調度會再次調度該任務。
task::unconstrained()
tokio的異步任務都是受tokio調度控制的,tokio採用協作式調度策略來調度它所管理的異步任務。當異步任務中的執行到了某個本該阻塞的操作時(即使用了tokio提供的那些原本會阻塞的API,例如tokio版本的sleep()),將不會阻塞當前線程,而是進入等待隊列,等待Reactor接收事件通知來喚醒該異步任務,這樣當前線程會被釋放給調度器,使得調度器能夠繼續分配其它異步任務到該線程上執行。
task::unconstrained()
則是創建一個不受限制不受調度器管理的異步任務,它將不會參與調度器的協作式調度,可以認為是將這個異步任務暫時脫離了調度管理。這樣一來,即便該任務中遇到了本該阻塞而放棄線程的操作,也不會去放棄,而是直接阻塞該線程。
因此,unconstrained()
創建的異步任務將會使得同線程的其它異步任務被飢餓。如果確實有這樣的需求,建議使用block_in_place()
或spawn_blocking()
。
task::spawn_local()
關於spawn_local()
,後面介紹LocalSet的時候再一起介紹。
取消任務
正在執行的異步任務可以隨時被abort()
取消,取消之後的任務返回JoinError類型。
use tokio::{self, runtime::Runtime, time}; fn main() { let rt = Runtime::new().unwrap(); rt.block_on(async { let task = tokio::task::spawn(async { time::sleep(time::Duration::from_secs(10)).await; }); // 讓上面的異步任務跑起來 time::sleep(time::Duration::from_millis(1)).await; task.abort(); // 取消任務 // 取消任務之後,可以取得JoinError let abort_err: JoinError = task.await.unwrap_err(); println!("{}", abort_err.is_cancelled()); }) }
如果異步任務已經完成,再對該任務執行abort()
操作將沒有任何效果。也就是說,沒有JoinError,task.await.unwrap_err()
將報錯,而task.await.unwrap()
則正常。
tokio::join!宏和tokio::try_join!宏
可以使用await去等待某個異步任務的完成,無論這個異步任務是正常完成還是被取消。
tokio提供了兩個宏tokio::join!
和tokio::try_join!
。它們可以用於等待多個異步任務全部完成:
join!
必須等待所有任務完成try_join!
要麼等待所有異步任務正常完成,要麼等待第一個返回Result Err的任務出現
另外,這兩個宏都需要Future參數,它們將提供的各參數代表的任務封裝成為一個大的task。
例如:
use chrono::Local; use tokio::{self, runtime::Runtime, time}; fn now() -> String { Local::now().format("%F %T").to_string() } async fn do_one() { println!("doing one: {}", now()); time::sleep(time::Duration::from_secs(2)).await; println!("do one done: {}", now()); } async fn do_two() { println!("doing two: {}", now()); time::sleep(time::Duration::from_secs(1)).await; println!("do two done: {}", now()); } fn main() { let rt = Runtime::new().unwrap(); rt.block_on(async { tokio::join!(do_one(), do_two());// 等待兩個任務均完成,才繼續向下執行代碼 println!("all done: {}", now()); }); }
輸出:
doing one: 2021-11-02 16:51:36
doing two: 2021-11-02 16:51:36
do two done: 2021-11-02 16:51:37
do one done: 2021-11-02 16:51:38
all done: 2021-11-02 16:51:38
下面是官方文檔中try_join!
的示例:
async fn do_stuff_async() -> Result<(), &'static str> { // async work } async fn more_async_work() -> Result<(), &'static str> { // more here } #[tokio::main] async fn main() { let res = tokio::try_join!(do_stuff_async(), more_async_work()); match res { Ok((first, second)) => { // do something with the values } Err(err) => { println!("processing failed; error = {}", err); } } }
固定在線程內的本地異步任務: tokio::task::LocalSet
當使用多線程runtime時,tokio會協作式調度它管理的所有worker thread上的所有異步任務。例如某個worker thread空閒後可能會從其它worker thread中偷取一些異步任務來執行,或者tokio會主動將某些異步任務轉移到不同的線程上執行。這意味著,異步任務可能會不受預料地被跨線程執行。
有時候並不想要跨線程執行。例如,那些沒有實現Send
的異步任務,它們不能跨線程,只能在一個固定的線程上執行。
tokio提供了讓某些任務固定在某一個線程中運行的功能,叫做LocalSet,這些異步任務被放在一個獨立的本地任務隊列中,它們不會跨線程執行。
要使用tokio::task::LocalSet
,需使用LocalSet::new()
先創建好一個LocalSet實例,它將生成一個獨立的任務隊列用來存放本地異步任務。
之後,便可以使用LocalSet的spawn_local()
向該隊列中添加異步任務。但是,添加的異步任務不會直接執行,只有對LocalSet調用await或調用LocalSet::run_until()
或LocalSet::block_on()
的時候,才會開始運行本地隊列中的異步任務。調用後兩個方法會進入LocalSet的上下文環境。
例如,使用await來運行本地異步任務。
use chrono::Local; use tokio::{self, runtime::Runtime, time}; fn now() -> String { Local::now().format("%F %T").to_string() } fn main() { let rt = Runtime::new().unwrap(); let local_tasks = tokio::task::LocalSet::new(); // 向本地任務隊列中添加新的異步任務,但現在不會執行 local_tasks.spawn_local(async { println!("local task1"); time::sleep(time::Duration::from_secs(5)).await; println!("local task1 done"); }); local_tasks.spawn_local(async { println!("local task2"); time::sleep(time::Duration::from_secs(5)).await; println!("local task2 done"); }); println!("before local tasks running: {}", now()); rt.block_on(async { // 開始執行本地任務隊列中的所有異步任務,並等待它們全部完成 local_tasks.await; }); }
除了LocalSet::spawn_local()
可以生成新的本地異步任務,tokio::task::spawn_local()
也可以生成新的本地異步任務,但是它的使用有個限制,必須在LocalSet上下文內部才能調用。
例如:
use chrono::Local; use tokio::{self, runtime::Runtime, time}; fn now() -> String { Local::now().format("%F %T").to_string() } fn main() { let rt = Runtime::new().unwrap(); let local_tasks = tokio::task::LocalSet::new(); local_tasks.spawn_local(async { println!("local task1"); time::sleep(time::Duration::from_secs(2)).await; println!("local task1 done"); }); local_tasks.spawn_local(async { println!("local task2"); time::sleep(time::Duration::from_secs(3)).await; println!("local task2 done"); }); println!("before local tasks running: {}", now()); // LocalSet::block_on進入LocalSet上下文 local_tasks.block_on(&rt, async { tokio::task::spawn_local(async { println!("local task3"); time::sleep(time::Duration::from_secs(4)).await; println!("local task3 done"); }).await.unwrap(); }); println!("all local tasks done: {}", now()); }
需要注意的是,調用LocalSet::block_on()
和LocalSet::run_until()
時均需指定一個異步任務(Future)作為其參數,它們都會立即開始執行該異步任務以及本地任務隊列中已存在的任務,但是這兩個函數均只等待其參數對應的異步任務執行完成就返回。這意味著,它們返回的時候,可能還有正在執行中的本地異步任務,它們會繼續保留在本地任務隊列中。當再次進入LocalSet上下文或await
LocalSet的時候,它們會等待調度並運行。
use chrono::Local; use tokio::{self, runtime::Runtime, time}; fn now() -> String { Local::now().format("%F %T").to_string() } fn main() { let rt = Runtime::new().unwrap(); let local_tasks = tokio::task::LocalSet::new(); local_tasks.spawn_local(async { println!("local task1"); time::sleep(time::Duration::from_secs(2)).await; println!("local task1 done {}", now()); }); // task2要睡眠10秒,它將被第一次local_tasks.block_on在3秒後中斷 local_tasks.spawn_local(async { println!("local task2"); time::sleep(time::Duration::from_secs(10)).await; println!("local task2 done, {}", now()); }); println!("before local tasks running: {}", now()); local_tasks.block_on(&rt, async { tokio::task::spawn_local(async { println!("local task3"); time::sleep(time::Duration::from_secs(3)).await; println!("local task3 done: {}", now()); }).await.unwrap(); }); // 線程阻塞15秒,此時task2睡眠10秒的時間已經過去了, // 當再次進入LocalSet時,task2將可以直接被喚醒 thread::sleep(std::time::Duration::from_secs(15)); // 再次進入LocalSet local_tasks.block_on(&rt, async { // 先執行該任務,當遇到睡眠1秒的任務時,將出現任務切換, // 此時,調度器將調度task2,而此時task2已經睡眠完成 println!("re enter localset context: {}", now()); time::sleep(time::Duration::from_secs(1)).await; println!("re enter localset context done: {}", now()); }); println!("all local tasks done: {}", now()); }
輸出結果:
#![allow(unused)] fn main() { before local tasks running: 2021-10-26 20:19:11 local task1 local task3 local task2 local task1 done 2021-10-26 20:19:13 local task3 done: 2021-10-26 20:19:14 re enter localset context: 2021-10-26 20:19:29 local task2 done, 2021-10-26 20:19:29 re enter localset context done: 2021-10-26 20:19:30 all local tasks done: 2021-10-26 20:19:30 }
需要注意的是,再次運行本地異步任務時,之前被中斷的異步任務所等待的事件可能已經出現了,因此它們可能會被直接喚醒重新進入就緒隊列等待下次輪詢調度。正如上面需要睡眠10秒的task2,它會被第一次block_on中斷,雖然task2已經不再執行,但是15秒之後它的睡眠完成事件已經出現,它可以在下次調度本地任務時直接被喚醒。但注意,喚醒的任務不是直接就可以被執行的,而是放入就緒隊列等待調度。
這意味著,再次進入上下文時,所指定的Future中必須至少存在一個會引起調度切換的任務,否則該Future以同步的方式運行直到結束都不會給已經被喚醒的任務任何執行的機會。
例如,將上面示例中的第二個block_on中的Future參數換成下面的async代碼塊,task2將不會被調度執行:
#![allow(unused)] fn main() { local_tasks.block_on(&rt, async { println!("re-enter localset context, and exit context"); println!("task2 will not be scheduled"); }) }
下面是使用run_until()
兩次進入LocalSet上下文的示例,和block_on()
類似,區別僅在於它只能在Runtime::block_on()
內或[tokio::main]
註解的main函數內部被調用。
use chrono::Local; use tokio::{self, runtime::Runtime, time}; fn now() -> String { Local::now().format("%F %T").to_string() } fn main() { let rt = Runtime::new().unwrap(); let local_tasks = tokio::task::LocalSet::new(); local_tasks.spawn_local(async { println!("local task1"); time::sleep(time::Duration::from_secs(5)).await; println!("local task1 done {}", now()); }); println!("before local tasks running: {}", now()); rt.block_on(async { local_tasks .run_until(async { println!("local task2"); time::sleep(time::Duration::from_secs(3)).await; println!("local task2 done: {}", now()); }) .await; }); thread::sleep(std::time::Duration::from_secs(10)); rt.block_on(async { local_tasks .run_until(async { println!("local task3"); tokio::task::yield_now().await; println!("local task3 done: {}", now()); }) .await; }); println!("all local tasks done: {}", now()); }
輸出結果:
before local tasks running: 2021-10-26 21:23:18
local task2
local task1
local task2 done: 2021-10-26 21:23:21
local task3
local task1 done 2021-10-26 21:23:31
local task3 done: 2021-10-26 21:23:31
all local tasks done: 2021-10-26 21:23:31
tokio::select!宏
在Golang中有一個select關鍵字,tokio中則類似地提供了一個名為select!
的宏。tokio::select!
宏使用場景非常普遍,因此有必要理解該宏的工作流程。
select!
宏的作用是輪詢指定的多個異步任務,每個異步任務都是select!
的一個分支,當某個分支已完成,則執行該分支對應的代碼,同時取消其它分支。簡單來說,select!
的作用是等待第一個完成的異步任務並執行對應任務完成後的操作。
它的使用語法參考如下:
#![allow(unused)] fn main() { tokio::select! { <pattern1> = <async expression 1> (, if <precondition1>)? => <handler1>, // branch 1 <pattern2> = <async expression 2> (, if <precondition2>)? => <handler2>, // branch 2 ... (else => <handler_else>)? }; }
else分支是可選的,每個分支的if前置條件是可選的。因此,簡化的語法為:
#![allow(unused)] fn main() { tokio::select! { <pattern1> = <async expression 1> => <handler1>, // branch 1 <pattern2> = <async expression 2> => <handler2>, // branch 2 ... }; }
即,每個分支都有一個異步任務,並對異步任務完成後的返回結果進行模式匹配,如果匹配成功,則執行對應的handler。
一個簡單的示例:
use tokio::{self, runtime::Runtime, time::{self, Duration}}; async fn sleep(n: u64) -> u64 { time::sleep(Duration::from_secs(n)).await; n } fn main() { let rt = Runtime::new().unwrap(); rt.block_on(async { tokio::select! { v = sleep(5) => println!("sleep 5 secs, branch 1 done: {}", v), v = sleep(3) => println!("sleep 3 secs, branch 2 done: {}", v), }; println!("select! done"); }); }
輸出結果:
sleep 3 secs, branch 2 done: 3
select! done
注意,select!
本身是【阻塞】的,只有select!
執行完,它後面的代碼才會繼續執行。
每個分支可以有一個if前置條件,當if前置條件為false時,對應的分支將被select!
忽略(禁用),但該分支的異步任務仍然會執行,只不過select!
不再輪詢它(即不再推進異步任務的執行)。
下面是官方手冊對select!
工作流程的描述:
- 評估所有分支中存在的if前置條件,如果某個分支的前置條件返回false,則禁用該分支。注意,循環(如loop)時,每一輪執行的select!都會清除分支的禁用標記
- 收集所有分支中的異步表達式(包括已被禁用的分支),並在同一個線程中推進所有未被禁用的異步任務執行,然後等待
- 當某個分支的異步任務完成,將該異步任務的返回值與對應分支的模式進行匹配,如果匹配成功,則執行對應分支的handler,如果匹配失敗,則禁用該分支,本次
select!
調用不會再考慮該分支。如果匹配失敗,則重新等待下一個異步任務的完成 - 如果所有分支最終都被禁用,則執行else分支,如果不存在else分支,則panic
默認情況下,select!
會偽隨機公平地輪詢每一個分支,如果確實需要讓select!
按照任務書寫順序去輪詢,可以在select!
中使用biased
。
例如,官方手冊提供了一個很好的例子:
#[tokio::main] async fn main() { let mut count = 0u8; loop { tokio::select! { // 如果取消biased,挑選的任務順序將隨機,可能會導致分支中的斷言失敗 biased; _ = async {}, if count < 1 => { count += 1; assert_eq!(count, 1); } _ = async {}, if count < 2 => { count += 1; assert_eq!(count, 2); } _ = async {}, if count < 3 => { count += 1; assert_eq!(count, 3); } _ = async {}, if count < 4 => { count += 1; assert_eq!(count, 4); } else => { break; } }; } }
另外,上面的例子中將select!
放進了一個loop循環中,這是很常見的用法。對於上面的例子來說,如果註釋掉biased
,那麼在第一輪循環中,由於select!
中的4個if前置條件均為true,因此按照隨機的順序推進這4個異步任務。由於上面示例中的異步任務表達式不做任何事,因此第一個被推進的異步任務會先完成,selcet!
將取消剩餘3個任務,假如先完成任務的分支的斷言通過,那麼select!
返回後將進入下一輪loop循環,重新調用一次select!
宏,重新評估if條件,這次將只有3個分支通過檢測,不通過的那個分支將被禁用,select!
將按照隨機順序推進這3個分支。