理解tokio核心(1): runtime

在使用tokio之前,應當先理解tokio的核心概念:runtime和task。只有理解了這兩個核心概念,才能正確地、合理地使用tokio。本文先詳細介紹runtime這個核心概念,還會介紹一些基本的調度知識,這些都是理解異步理解tokio的必要知識,後面再專門介紹task。

創建tokio Runtime

要使用tokio,需要先創建它提供的異步運行時環境(Runtime),然後在這個Runtime中執行異步任務。

使用tokio::runtime創建Runtime:

use tokio;

fn main() {
  // 創建runtime
  let rt = tokio::runtime::Runtime::new().unwrap();
}

也可以使用Runtime Builder來配置並創建runtime:

use tokio;

fn main() {
  // 創建帶有線程池的runtime
  let rt = tokio::runtime::Builder::new_multi_thread()
    .worker_threads(8)  // 8個工作線程
    .enable_io()        // 可在runtime中使用異步IO
    .enable_time()      // 可在runtime中使用異步計時器(timer)
    .build()            // 創建runtime
    .unwrap();
}

tokio提供了兩種工作模式的runtime:

  • 1.單一線程的runtime(single thread runtime,也稱為current thread runtime)
  • 2.多線程(線程池)的runtime(multi thread runtime)

注: 這裡的所說的線程是Rust線程,而每一個Rust線程都是一個OS線程。

IO併發類任務較多時,單一線程的runtime性能不如多線程的runtime,但因為多線程runtime使用了多線程,使得線程間的通信變得更為複雜,也加重了線程間切換的開銷,使得有些情況下的性能不如使用單線程runtime。因此,在要求極限性能的時候,建議測試兩種工作模式的性能差距來選擇更優解。在後面深入了一些tokio後,我會再花一個小節來解釋單一線程的runtime和多線程的runtime的調度區別以及如何選擇合適的runtime。

默認情況下(比如以上兩種方式),創建出來的runtime都是多線程runtime,且沒有指定工作線程數量時,默認的工作線程數量將和CPU核數(虛擬核,即CPU線程數)相同。

只有明確指定,才能創建出單一線程的runtime。例如:


#![allow(unused)]
fn main() {
// 創建單一線程的runtime
let rt = tokio::runtime::Builder::new_current_thread().build().unwrap();
}

例如,創建一個多線程的runtime,查看其線程數:

use tokio;

fn main(){
  let rt = tokio::runtime::Runtime::new().unwrap();
  std::thread::sleep(std::time::Duration::from_secs(10));
}

在另一個終端查看線程數:

$ ps -eLf | grep 'targe[t]'
longshu+ 15759    62 15759  6  9 20:42 pts/0  00:00:00 target/debug/async main
longshu+ 15759    62 15761  0  9 20:42 pts/0  00:00:00 target/debug/async main
longshu+ 15759    62 15762  0  9 20:42 pts/0  00:00:00 target/debug/async main
longshu+ 15759    62 15763  0  9 20:42 pts/0  00:00:00 target/debug/async main
longshu+ 15759    62 15764  0  9 20:42 pts/0  00:00:00 target/debug/async main
longshu+ 15759    62 15765  0  9 20:42 pts/0  00:00:00 target/debug/async main
longshu+ 15759    62 15766  0  9 20:42 pts/0  00:00:00 target/debug/async main
longshu+ 15759    62 15767  0  9 20:42 pts/0  00:00:00 target/debug/async main
longshu+ 15759    62 15768  0  9 20:42 pts/0  00:00:00 target/debug/async main

總共9個OS線程,其中8個worker thread(我的電腦是4核8線程的),外加一個main thread。

async main

對於main函數,tokio提供了簡化的異步運行時創建方式:

use tokio;

#[tokio::main]
async fn main() {}

通過#[tokio::main]註解(annotation),使得async main自身成為一個async runtime。

#[tokio::main]創建的是多線程runtime,還有以下幾種方式創建多線程runtime:


#![allow(unused)]
fn main() {
#[tokio::main(flavor = "multi_thread"] // 等價於#[tokio::main]
#[tokio::main(flavor = "multi_thread", worker_threads = 10))]
#[tokio::main(worker_threads = 10))]
}

它們等價於如下沒有使用#[tokio::main]的代碼:

fn main(){
  tokio::runtime::Builder::new_multi_thread()
        .worker_threads(N)  
        .enable_all()
        .build()
        .unwrap()
        .block_on(async { ... });
}

#[tokio::main]也可以創建單一線程的main runtime:


#![allow(unused)]
fn main() {
#[tokio::main(flavor = "current_thread")]
}

等價於:

fn main() {
    tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .unwrap()
        .block_on(async { ... })
}

多個runtime共存

可手動創建線程,並在不同線程內創建互相獨立的runtime。

例如:

use std::thread;
use std::time::Duration;
use tokio::runtime::Runtime;

fn main() {
  // 在第一個線程內創建一個多線程的runtime
  let t1 = thread::spawn(||{
    let rt = Runtime::new().unwrap();
    thread::sleep(Duration::from_secs(10));
  });

  // 在第二個線程內創建一個多線程的runtime
  let t2 = thread::spawn(||{
    let rt = Runtime::new().unwrap();
    thread::sleep(Duration::from_secs(10));
  });

  t1.join().unwrap();
  t2.join().unwrap();
}

對於4核8線程的電腦,此時總共有19個OS線程:16個worker-thread,2個spawn-thread,一個main-thread。

runtime實現了SendSync這兩個Trait,因此可以將runtime包在Arc裡,然後跨線程使用同一個runtime。

進入runtime: 在異步runtime中執行異步任務

提供了Runtime後,可在Runtime中執行異步任務。

多數時候,異步任務是一些帶有網絡IO操作的任務,比如異步的http請求。但是介紹tokio用法時,不需要那麼複雜,只需使用tokio的異步timer即可解釋清楚,如tokio::time::sleep()

注:std::time也提供了sleep(),但它會阻塞整個線程,而tokio::time中的sleep()則只是讓它所在的任務放棄CPU並進入調度隊列等待被喚醒,它不會阻塞任何線程,它所在的線程仍然可被用來執行其它異步任務。因此,在tokio runtime中,應當使用tokio::time中的sleep()。

例如:

use tokio::runtime::Runtime;
use chrono::Local;

fn main() {
    let rt = Runtime::new().unwrap();
    rt.block_on(async {
        println!("before sleep: {}", Local::now().format("%F %T.%3f"));
        tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
        println!("after sleep: {}", Local::now().format("%F %T.%3f"));
    });
}

輸出:

before sleep: 2021-10-24 11:53:38.496
after sleep: 2021-10-24 11:53:40.497

上面調用了runtime的block_on()方法,block_on要求一個Future作為參數,可以像上面一樣直接使用一個async {}來定義一個Future。每一個Future都是一個已經定義好但尚未執行的異步任務,每一個異步任務中可能會包含其它子任務。

這些異步任務不會直接執行,需要先將它們放入到runtime環境,然後在合適的地方通過Future的await來執行它們。await可以將已經定義好的異步任務立即加入到runtime的任務隊列中等待調度執行,於此同時,await會等待該異步任務完成才返回。例如:


#![allow(unused)]
fn main() {
rt.block_on(async {
    // 只是定義了Future,此時尚未執行
    let task = tokio::time::sleep(tokio::time::Duration::from_secs(2));
    // ...不會執行...
    // ...
    // 開始執行task任務,並等待它執行完成
    task.await;

    // 上面的任務完成之後,才會繼續執行下面的代碼
});
}

block_on會阻塞當前線程(例如阻塞住上面的main函數所在的主線程),直到其指定的**異步任務樹(可能有子任務)**全部完成。

注:block_on是等待異步任務完成,而不是等待runtime中的所有任務都完成,後面介紹blocking thread時會再次說明block_on的阻塞問題。

block_on也有返回值,其返回值為其所執行異步任務的返回值。例如:

use tokio::{time, runtime::Runtime};

fn main() {
    let rt = Runtime::new().unwrap();
    let res: i32 = rt.block_on(async{
      time::sleep(time::Duration::from_secs(2)).await;
      3
    });
    println!("{}", res);  // 3
}

spawn: 向runtime中添加新的異步任務

在上面的例子中,直接將async {}作為block_on()的參數,這個async {}本質上是一個Future,即一個異步任務。在這個最外層的異步任務內部,還可以創建新的異步任務,它們都將在同一個runtime中執行。

有時候,定義要執行的異步任務時,並未身處runtime內部。例如定義一個異步函數,此時可以使用tokio::spawn()來生成異步任務。

use std::thread;

use chrono::Local;
use tokio::{self, runtime::Runtime, time};

fn now() -> String {
    Local::now().format("%F %T").to_string()
}

// 在runtime外部定義一個異步任務,且該函數返回值不是Future類型
fn async_task() {
  println!("create an async task: {}", now());
  tokio::spawn(async {
    time::sleep(time::Duration::from_secs(10)).await;
    println!("async task over: {}", now());
  });
}

fn main() {
    let rt1 = Runtime::new().unwrap();
    rt1.block_on(async {
      // 調用函數,該函數內創建了一個異步任務,將在當前runtime內執行
      async_task();
    });
}

除了tokio::spawn(),runtime自身也能spawn,因此,也可以傳遞runtime(注意,要傳遞runtime的引用),然後使用runtime的spawn()

use tokio::{Runtime, time}
fn async_task(rt: &Runtime) {
  rt.spawn(async {
    time::sleep(time::Duration::from_secs(10)).await;
  });
}

fn main(){
  let rt = Runtime::new().unwrap();
  rt.block_on(async {
    async_task(&rt);
  });
}

進入runtime: 非阻塞的enter()

block_on()是進入runtime的主要方式。但還有另一種進入runtime的方式:enter()

block_on()進入runtime時,會阻塞當前線程,enter()進入runtime時,不會阻塞當前線程,它會返回一個EnterGuard。EnterGuard沒有其它作用,它僅僅只是聲明從它開始的所有異步任務都將在runtime上下文中執行,直到刪除該EnterGuard。

刪除EnterGuard並不會刪除runtime,只是釋放之前的runtime上下文聲明。因此,刪除EnterGuard之後,可以聲明另一個EnterGuard,這可以再次進入runtime的上下文環境。

use tokio::{self, runtime::Runtime, time};
use chrono::Local;
use std::thread;

fn now() -> String {
  Local::now().format("%F %T").to_string()
}

fn main() {
    let rt = Runtime::new().unwrap();

    // 進入runtime,但不阻塞當前線程
    let guard1 = rt.enter();

    // 生成的異步任務將放入當前的runtime上下文中執行
    tokio::spawn(async {
      time::sleep(time::Duration::from_secs(5)).await;
      println!("task1 sleep over: {}", now());
    });

    // 釋放runtime上下文,這並不會刪除runtime
    drop(guard1);

    // 可以再次進入runtime
    let guard2 = rt.enter();
    tokio::spawn(async {
      time::sleep(time::Duration::from_secs(4)).await;
      println!("task2 sleep over: {}", now());
    });

    drop(guard2);

    // 阻塞當前線程,等待異步任務的完成
    thread::sleep(std::time::Duration::from_secs(10));
}

理解runtime和異步調度

異步Runtime提供了異步IO驅動、異步計時器等異步API,還提供了任務的調度器(scheduler)和Reactor事件循環(Event Loop)。

每當創建一個Runtime時,就在這個Runtime中創建好了一個Reactor和一個Scheduler,同時還創建了一個任務隊列。

從這一點看來,異步運行時和操作系統的進程調度方式是類似的,只不過現代操作系統的進程調度邏輯要比異步運行時的調度邏輯複雜的多。

當一個異步任務需要運行,這個任務要被放入到可運行的任務隊列(就緒隊列),然後等待被調度,當一個異步任務需要阻塞時(對應那些在同步環境下會阻塞的操作),它被放進阻塞隊列。

阻塞隊列中的每一個被阻塞的任務,都需要等待Reactor收到對應的事件通知(比如IO完成的通知、睡眠完成的通知等)來喚醒它。當該任務被喚醒後,它將被放入就緒隊列,等待調度器的調度。

就緒隊列中的每一個任務都是可運行的任務,可隨時被調度器調度選中。調度時會選擇哪一個任務,是調度器根據調度算法去決定的。某個任務被調度選中後,調度器將分配一個線程去執行該任務。

大方向上來說,有兩種調度策略:搶佔式調度和協作式調度。搶佔式調度策略,調度器會在合適的時候(調度規則決定什麼是合適的時候)強行切換當前正在執行的調度單元(例如進程、線程),避免某個任務長時間霸佔CPU從而導致其它任務出現飢餓。協作式調度策略則不會強行切斷當前正在執行的單元,只有執行單元執行完任務或主動放棄CPU,才會將該執行單元重新排隊等待下次調度,這可能會導致某個長時間計算的任務霸佔CPU,但是可以讓任務充分執行儘早完成,而不會被中斷。

對於面向大眾使用的操作系統(如Linux)通常採用搶佔式調度策略來保證系統安全,避免惡意程序霸佔CPU。而對於語言層面來說,通常採用協作式調度策略,這樣既有底層OS的搶佔式保底,又有協作式的高效。tokio的調度策略是協作式調度策略。

也可以簡單粗暴地去理解異步調度:任務剛出生時,放進任務隊列尾部,調度器總是從任務隊列的頭部選擇任務去執行,執行任務時,如果任務有阻塞操作,則該任務總是會被放入到任務隊列的尾部。如果任務隊列的第一個任務都是阻塞的(即任務之前被阻塞但目前尚未完成),則調度器直接將它重新放回隊列的尾部。因此,調度器總是從前向後一次又一次地輪詢這個任務隊列。當然,調度算法通常會比這種簡單的方式要複雜的多,它可能會採用多個任務隊列,多種挑選標準,且隊列不是簡單的隊列,而是更高效的數據結構。

以上是通用知識,用於理解何為異步調度系統,每個調度系統都有自己的特性。例如,Rust tokio並不完全按照上面所描述的方式進行調度。tokio的作者,非常友好地提供了一篇他實現tokio調度器的思路,裡面詳細介紹了調度器的基本知識和tokio調度器的調度策略,參考Making the Tokio scheduler 10x faster

tokio的兩種線程:worker thread和blocking thread

需要注意,tokio提供了兩種功能的線程:

  • 用於異步任務的工作線程(worker thread)
  • 用於同步任務的阻塞線程(blocking thread)

單個線程或多個線程的runtime,指的都是工作線程,即只用於執行異步任務的線程,這些任務主要是IO密集型的任務。tokio默認會將每一個工作線程均勻地綁定到每一個CPU核心上。

但是,有些必要的任務可能會長時間計算而佔用線程,甚至任務可能是同步的,它會直接阻塞整個線程(比如thread::time::sleep()),這類任務如果計算時間或阻塞時間較短,勉強可以考慮留在異步隊列中,但如果任務計算時間或阻塞時間可能會較長,它們將不適合放在異步隊列中,因為它們會破壞異步調度,使得同線程中的其它異步任務處於長時間等待狀態,也就是說,這些異步任務可能會被餓很長一段時間。

例如,直接在runtime中執行阻塞線程的操作,由於這類阻塞操作不在tokio系統內,tokio無法識別這類線程阻塞的操作,tokio只能等待該線程阻塞操作的結束,才能重新獲得那個線程的管理權。換句話說,worker thread被線程阻塞的時候,它已經脫離了tokio的控制,在一定程度上破壞了tokio的調度系統。


#![allow(unused)]
fn main() {
rt.block_on(async{
  // 在runtime中,讓整個線程進入睡眠,注意不是tokio::time::sleep()
  std::thread::sleep(std::time::Duration::from_secs(10));
});
}

因此,tokio提供了這兩類不同的線程。worker thread只用於執行那些異步任務,異步任務指的是不會阻塞線程的任務。而一旦遇到本該阻塞但卻不會阻塞的操作(如使用tokio::time::sleep()而不是std::thread::sleep()),會直接放棄CPU,將線程交還給調度器,使該線程能夠再次被調度器分配到其它異步任務。blocking thread則用於那些長時間計算的或阻塞整個線程的任務。

blocking thread默認是不存在的,只有在調用了spawn_blocking()時才會創建一個對應的blocking thread。

blocking thread不用於執行異步任務,因此runtime不會去調度管理這類線程,它們在本質上相當於一個獨立的thread::spawn()創建的線程,它也不會像block_on()一樣會阻塞當前線程。它和獨立線程的唯一區別,是blocking thread是在runtime內的,可以在runtime內對它們使用一些異步操作,例如await。

use std::thread;
use chrono::Local;
use tokio::{self, runtime::Runtime, time};

fn now() -> String {
    Local::now().format("%F %T").to_string()
}

fn main() {
    let rt1 = Runtime::new().unwrap();
    // 創建一個blocking thread,可立即執行(由操作系統調度系統決定何時執行)
    // 注意,不阻塞當前線程
    let task = rt1.spawn_blocking(|| {
      println!("in task: {}", now());
      // 注意,是線程的睡眠,不是tokio的睡眠,因此會阻塞整個線程
      thread::sleep(std::time::Duration::from_secs(10))
    });

    // 小睡1毫秒,讓上面的blocking thread先運行起來
    std::thread::sleep(std::time::Duration::from_millis(1));
    println!("not blocking: {}", now());

    // 可在runtime內等待blocking_thread的完成
    rt1.block_on(async {
      task.await.unwrap();
      println!("after blocking task: {}", now());
    });
}

輸出:

in task: 2021-10-25 19:01:00
not blocking: 2021-10-25 19:01:00
after blocking task: 2021-10-25 19:01:10

需注意,blocking thread生成的任務雖然綁定了runtime,但是它不是異步任務,不受tokio調度系統控制。因此,如果在block_on()中生成了blocking thread或普通的線程,block_on()不會等待這些線程的完成。


#![allow(unused)]
fn main() {
rt.block_on(async{
  // 生成一個blocking thread和一個獨立的thread
  // block on不會阻塞等待兩個線程終止,因此block_on在這裡會立即返回
  rt.spawn_blocking(|| std::thread::sleep(std::time::Duration::from_secs(10)));
  thread::spawn(||std::thread::sleep(std::time::Duration::from_secs(10)));
});
}

tokio允許的blocking thread隊列很長(默認512個),且可以在runtime build時通過max_blocking_threads()配置最大長度。如果超出了最大隊列長度,新的任務將放在一個等待隊列中進行等待(比如當前已經有512個正在運行的任務,下一個任務將等待,直到有某個blocking thread空閒)。

blocking thread執行完對應任務後,並不會立即釋放,而是繼續保持活動狀態一段時間,此時它們的狀態是空閒狀態。當空閒時長超出一定時間後(可在runtime build時通過thread_keep_alive()配置空閒的超時時長),該空閒線程將被釋放。

blocking thread有時候是非常友好的,它像獨立線程一樣,但又和runtime綁定,它不受tokio的調度系統調度,tokio不會把其它任務放進該線程,也不會把該線程內的任務轉移到其它線程。換言之,它有機會完完整整地發揮單個線程的全部能力,而不像worker線程一樣,可能會被調度器打斷。

關閉Runtime

由於異步任務完全依賴於Runtime,而Runtime又是程序的一部分,它可以輕易地被刪除(drop),這時Runtime會被關閉(shutdown)。


#![allow(unused)]
fn main() {
let rt = Runtime::new().unwrap();
...
drop(rt);
}

這裡的變量rt,官方手冊將其稱為runtime的句柄(runtime handle)。

關閉Runtime時,將使得該Runtime中的所有異步任務被移除。完整的關閉過程如下:

  • 1.先移除整個任務隊列,保證不再產生也不再調度新任務
  • 2.移除當前正在執行但尚未完成的異步任務,即終止所有的worker thread
  • 3.移除Reactor,禁止接收事件通知

注意,這種刪除runtime句柄的方式只會立即關閉未被阻塞的worker thread,那些已經運行起來的blocking thread以及已經阻塞整個線程的worker thread仍然會執行。但是,刪除runtime又要等待runtime中的所有異步和非異步任務(會阻塞線程的任務)都完成,因此刪除操作會阻塞當前線程。

use std::thread;
use chrono::Local;
use tokio::{self, runtime::Runtime, time};

fn now() -> String {
    Local::now().format("%F %T").to_string()
}

fn main() {
    let rt = Runtime::new().unwrap();
    // 一個運行5秒的blocking thread
    // 刪除rt時,該任務將繼續運行,直到自己終止
    rt.spawn_blocking(|| {
      thread::sleep(std::time::Duration::from_secs(5));
      println!("blocking thread task over: {}", now());
    });
    
    // 進入runtime,並生成一個運行3秒的異步任務,
    // 刪除rt時,該任務直接被終止
    let _guard = rt.enter();
    rt.spawn(async {
      time::sleep(time::Duration::from_secs(3)).await;
      println!("worker thread task over 1: {}", now());
    });

    // 進入runtime,並生成一個運行4秒的阻塞整個線程的任務
    // 刪除rt時,該任務繼續運行,直到自己終止
    rt.spawn(async {
      std::thread::sleep(std::time::Duration::from_secs(4));
      println!("worker thread task over 2: {}", now());
    });
    
    // 先讓所有任務運行起來
    std::thread::sleep(std::time::Duration::from_millis(3));

    // 刪除runtime句柄,將直接移除那個3秒的異步任務,
    // 且阻塞5秒,直到所有已經阻塞的thread完成
    drop(rt);
    println!("runtime droped: {}", now());
}

輸出結果(注意結果中沒有異步任務中println!()輸出的內容):

worker thread task over 2: 2021-10-25 20:08:35
blocking thread task over: 2021-10-25 20:08:36
runtime droped: 2021-10-25 20:08:36

關閉runtime可能會被阻塞,因此,如果是在某個runtime中關閉另一個runtime,將會導致當前的runtime的某個worker thread被阻塞,甚至可能會阻塞很長時間,這是異步環境不允許的。

tokio提供了另外兩個關閉runtime的方式:shutdown_timeout()shutdown_background()。前者會等待指定的時間,如果正在超時時間內還未完成關閉,將強行終止runtime中的所有線程。後者是立即強行關閉runtime。

use std::thread;
use chrono::Local;
use tokio::{self, runtime::Runtime, time};

fn now() -> String {
    Local::now().format("%F %T").to_string()
}

fn main() {
    let rt = Runtime::new().unwrap();

    rt.spawn_blocking(|| {
      thread::sleep(std::time::Duration::from_secs(5));
      println!("blocking thread task over: {}", now());
    });
    
    let _guard = rt.enter();
    rt.spawn(async {
      time::sleep(time::Duration::from_secs(3)).await;
      println!("worker thread task over 1: {}", now());
    });

    rt.spawn(async {
      std::thread::sleep(std::time::Duration::from_secs(4));
      println!("worker thread task over 2: {}", now());
    });
    
    // 先讓所有任務運行起來
    std::thread::sleep(std::time::Duration::from_millis(3));

    // 1秒後強行關閉Runtime
    rt.shutdown_timeout(std::time::Duration::from_secs(1));
    println!("runtime droped: {}", now());
}

輸出:

runtime droped: 2021-10-25 20:16:02

需要注意的是,強行關閉Runtime,可能會使得尚未完成的任務的資源洩露。因此,應小心使用強行關閉Runtime的操作。

runtime Handle

tokio提供了一個稱為runtime Handle的東西,它實際上是runtime的一個引用,可以隨意被clone。它可以spawn()生成異步任務,這些異步任務將綁定在其所引用的runtime中,還可以block_on()enter()進入其所引用的runtime,此外,還可以生成blocking thread。


#![allow(unused)]
fn main() {
let rt = Runtime::new().unwrap();
let handle = rt.handle();
handle.spawn(...)
handle.spawn_blocking(...)
handle.block_on(...)
handle.enter()
}

需注意,如果runtime已被關閉,handle也將失效,此後再使用handle,將panic。

理解多進程、多線程、多協程的併發能力

大家都說,多進程效率不如多線程,多線程效率又不如多協程。但這裡面並不是如此簡單的一句話就能描述準確的,還需要理解其中的真相。

如果有很多IO任務要執行,為了讓這些IO操作不阻塞程序,可以使用多進程的方式將這些IO操作丟到【後臺】去等待,然後通過各種進程間通信的方式來傳遞數據。但是進程間的上下文切換會帶來較大的開銷。因此,當程序使用多進程方式,且工作進程數量較多時,因為不斷地進行進程間切換和內存拷貝,效率會明顯下降。

比多進程更好一些的是多線程方式,線程是進程內部的執行單元,線程間的上下文切換的開銷要遠小於進程間切換的開銷。因此,大概可以認為,多線程要優於多進程,如果單個進程內的線程數量較多,可以考慮引入多進程,然後在某些進程內使用多線程。

比多線程更好一些的是多協程方式,協程是線程內部的執行單元,協程的上下文切換開銷,又要遠小於線程間切換的開銷。因此,大概可以認為,多協程要優於多線程,如果單個線程內的協程數量較多,可以考慮引入多線程,然後在某些線程內使用多協程。

但是,多進程效率並不真的差,多線程的效率也並不真的比多協程效率差。高併發能力的高低,完全取決於程序是否出現了等待、是否浪費了可調度單元(即進程、線程、協程)、是否浪費了更多的CPU。

一個簡單的例子,假如要發送10W個HTTP請求,用多協程是最好的。為什麼呢?因為HTTP請求是一個非常簡單的IO任務,它只需要發送請求,然後等待。如果用多線程的併發模式,每個線程負責發送一個HTTP請求,那麼每一個線程都將長時間處於等待狀態,什麼也不做,這是對線程的浪費,加之線程數量太多,在這麼多的線程之間進行切換也會浪費大量CPU。因此,在這種情況下,多協程優於多線程。

另一方面,如果是要計算10W個密鑰,應當去使用一定數量的多進程或多線程(少於或等於CPU核數),以保證能儘量多地利用多核CPU。用多協程可能會很不好,因為協程調度會打斷計算進度,當然這取決於協程調度器的調度邏輯。

從這兩個簡單又極端的示例可以大概理解,如果要執行的任務越簡單(這裡的簡單表示的是計算密集程度低),越IO密集,越應該使用粒度更小的可調度單元(即協程)。計算任務越重,越應該利用多核CPU。

更多時候,一個任務裡會同時帶有IO和計算,無法簡單地判斷它是IO密集還是CPU密集的任務。這時候需要進行測試。

選擇單一線程runtime還是多線程runtime?

tokio提供了單一線程的runtime和多線程的runtime,雖然官方文檔裡時不時地提到【多數時候是多線程的runtime】,但並不意味著多線程的runtime優於單一線程的runtime,這取決於異步任務的工作類型。

簡單來說,每一個異步任務都是一個線程內的【協程】,單一線程的runtime是在單個線程內調度管理這些任務,多線程runtime則是在多個線程內不斷地分配和跨線程傳遞這些任務

單一線程的runtime的優勢在於它的任務調度開銷低,因為它不需要進行開銷更重的線程間切換,更不需要不斷地在線程間傳遞數據。因此,對於計算程度不重的需求,它的高併發性能會很好。

單一線程的runtime的劣勢在於這個runtime只能利用單核CPU,它無法利用多核CPU,也就無法發揮多核CPU的優勢。

注:也可以認為,單一線程的runtime,和Python、Ruby等語言的併發是類似的,都是充分利用單核CPU。但卻比它們更高效一些,一方面是語言本身的性能,另一方面是tokio的worker thread都是綁定CPU的,不會在不同的CPU核心之間進行切換,降低了切換開銷。

但是,可以手動在多個線程內創建互相獨立的單一線程runtime,這樣也可以利用多核CPU。

use tokio;
use std::thread;

fn main(){
  let t1 = thread::spawn(||{
    let rt = tokio::runtime::Builder::new().new_current_thread()
               .enable_all()
               .build()
               .unwrap();
    rt.block_on(...)
  });

  let t2 = thread::spawn(||{
    let rt = tokio::runtime::Builder::new().new_current_thread()
               .enable_all()
               .build()
               .unwrap();
    rt.block_on(...)
  });

  t1.join().unwrap();
  t2.join().unwrap();
}

這種手動創建多個單線程runtime的方式,可以利用多核CPU,但是這幾個線程是不受控制的,完全由操作系統決定如何調度它們。這種方式是多線程runtime的雛形。它和多線程runtime的區別在於,多線程runtime會調度管理這些線程,會盡量以高效的方式來分配任務(比如從其它線程中偷任務)。但是有了多線程,就有了額外的切換開銷,就有了CPU利用率的浪費。

因此,可以這樣認為,單線程runtime對單個線程(單核CPU)的利用率,是高於多線程runtime的

如果併發任務不重,單核CPU都無法跑滿,顯然單線程runtime要更優。如果併發任務中有較重的計算任務,則還需要再測試何種方式更優。