tokio task的通信和同步(2): 通信

tokio使用通道在task之間進行通信,有四種類型通道:oneshot、mpsc、broadcast和watch。

oneshot通道

oneshot通道的特性是:單Sender、單Receiver以及單消息,簡單來說就是一次性的通道。

oneshot通道的創建方式是使用oneshot::channel()方法:


#![allow(unused)]
fn main() {
pub fn channel<T>() -> (Sender<T>, Receiver<T>)
}

它返回該通道的寫端sender和讀端receiver,其中泛型T表示的是讀寫兩端所傳遞的消息類型。

例如,創建一個可發送i32數據的一次性通道:


#![allow(unused)]
fn main() {
let (tx, rx) = oneshot::channel::<i32>();
}

返回的結果中,tx是發送者(sender)、rx是接收者(receiver)。

多數時候不需要去聲明通道的類型,編譯器可以根據發送數據時的類型自動推斷出類型。


#![allow(unused)]
fn main() {
let (tx, rx) = oneshot::channel();
}

Sender

Sender通過send()方法發送數據,因為oneshot通道只能發送一次數據,所以send()發送數據的時候,tx直接被消費掉。Sender並不一定總能成功發送消息,比如,Sender發送消息之前,Receiver端就已經關閉了讀端。因此send()返回Result結果:如果發送成功,則返回Ok(()),如果發送失敗,則返回Err(T)

因此,發送數據的時候,通常會做如下檢測:


#![allow(unused)]
fn main() {
// 或 if tx.send(33).is_err() {}
// 或直接忽略錯誤 let _ = tx.send();
if let Err(_) = tx.send(33) {
  println!("receiver closed");
}
}

另外需注意,send()是非異步但卻不阻塞的,它總是立即返回,如果能發送數據,則發送數據,如果不能發送數據,就返回錯誤,它不會等待Receiver啟動讀取操作。也因此,send()可以應用在同步代碼中,也可以應用在異步代碼中。

Sender可以通過is_closed()方法來判斷Receiver端是否已經關閉。

Sender可以通過close()方法來等待Receiver端關閉。它可以結合select!宏使用:其中一個分支計算要發送的數據,另一個分支為closed()等待分支,如果先計算完成,則發送計算結果,而如果是先等到了對端closed的異步任務完成,則無需再計算浪費CPU去計算結果。例如:


#![allow(unused)]
fn main() {
tokio::spawn(async move {
  tokio::select! {
    _ = tx.closed() => {
      // 先等待到了對端關閉,不做任何事,select!會自動取消其它分支的任務
    }
    value = compute() => {
      // 先計算得到結果,則發送給對端
      // 但有可能剛計算完成,尚未發送時,對端剛好關閉,因此可能發送失敗
      // 此處丟棄發送失敗的錯誤
      let _ = tx.send(value);
    }
  }
});
}

Receiver

Receiver沒有recv()方法,rx本身實現了Future Trait,它執行時對應的異步任務就是接收數據,因此只需await即可用來接收數據。

但是,接收數據並不一定會接收成功。例如,Sender端尚未發送任何數據就已經關閉了(被drop),此時Receiver端會接收到error::RecvError錯誤。因此,接收數據的時候通常也會進行判斷:


#![allow(unused)]
fn main() {
match rx.await {
  Ok(v) => println!("got = {:?}", v),
  Err(_) => println!("the sender dropped"),
  // Err(e: RecvError) => xxx,
}
}

既然通過rx.await來接收數據,那麼已經隱含了一個信息,異步任務中接收數據時會進行等待。

Receiver端可以通過close()方法關閉自己這一端,當然也可以直接drop來關閉。關閉操作是冪等的,即,如果關閉的是已經關閉的Recv,則不產生任何影響。

關閉Recv端之後,可以保證Sender端無法再發送消息。但需要注意,有可能Recv端關閉完成之前,Sender端正好在這時發送了一個數據過來。因此,在關閉Recv端之後,儘可能地再調用一下try_recv()方法嘗試接收一次數據。

try_recv()方法返回三種可能值:

  • Ok(T): 表示成功接收到通道中的數據
  • Err(TryRecvError::Empty): 表示通道為空
  • Err(TryRecvError::Closed): 表示通道為空,且Sender端已關閉,即Sender未發送任何數據就關閉了

例如:


#![allow(unused)]
fn main() {
let (tx, mut rx) = oneshot::channel::<()>();

drop(tx);

match rx.try_recv() {
    // The channel will never receive a value.
    Err(TryRecvError::Closed) => {}
    _ => unreachable!(),
}
}

使用示例

一個完整但簡單的示例:

use tokio::{self, runtime::Runtime, sync};

fn main() {
    let rt = Runtime::new().unwrap();
    rt.block_on(async {
        let (tx, rx) = sync::oneshot::channel();

        tokio::spawn(async move {
            if tx.send(33).is_err() {
                println!("receiver dropped");
            }
        });

        match rx.await {
            Ok(value) => println!("received: {:?}", value),
            Err(_) => println!("sender dropped"),
        };
    });
}

另一個比較常見的使用場景是結合select!宏,此時應在recv前面加上&mut。例如:


#![allow(unused)]
fn main() {
let interval = tokio::interval(tokio::time::Duration::from_millis(100));

// 注意mut
let (tx, mut rx) = oneshot::channel();
loop {
    // 注意,select!中無需await,因為select!會自動輪詢推進每一個分支的任務進度
    tokio::select! {
        _ = interval.tick() => println!("Another 100ms"),
        msg = &mut recv => {
            println!("Got message: {}", msg.unwrap());
            break;
        }
    }
}
}

mpsc通道

mpsc通道的特性是可以有多個發送者發送多個消息,且只有一個接收者。mpsc通道是使用最頻繁的通道類型。

mpsc通道分為兩種:

  • bounded channel: 有界通道,通道有容量限制,即通道中最多可以存放指定數量(至少為1)的消息,通過mpsc::channel()創建
  • unbounded channel: 無界通道,通道中可以無限存放消息,直到內存耗盡,通過mpsc::unbounded_channel()創建

有界通道

通過mpsc::channel()創建有界通道,需傳遞一個大於1的usize值作為其參數。

例如,創建一個最多可以存放100個消息的有界通道。


#![allow(unused)]
fn main() {
// tx是Sender端,rx是Receiver端
// 接收端接收數據時需修改狀態,因此聲明為mut
let (tx, mut rx) = mpsc::channel(100);
}

mpsc通道只能有一個Receiver端,但可以tx.clone()得到多個Sender端,clone得到的Sender都可以使用send()方法向該通道發送消息。

發送消息時,如果通道已滿,發送消息的任務將等待直到通道中有空閒的位置。

發送消息時,如果Receiver端已經關閉,則發送消息的操作將返回SendError

如果所有的Sender端都已經關閉,則Receiver端接收消息的方法recv()將返回None。

一個簡單的示例:

use tokio::{ self, runtime::Runtime, sync };

fn main() {
    let rt = Runtime::new().unwrap();
    rt.block_on(async {
        let (tx, mut rx) = sync::mpsc::channel::<i32>(10);

        tokio::spawn(async move {
            for i in 1..=10 {
                // if let Err(_) = tx.send(i).await {}
                if tx.send(i).await.is_err() {
                    println!("receiver closed");
                    return;
                }
            }
        });

        while let Some(i) = rx.recv().await {
            println!("received: {}", i);
        }
    });
}

輸出的結果:

received: 1
received: 2
received: 3
received: 4
received: 5
received: 6
received: 7
received: 8
received: 9
received: 10

上面的示例中,先生成了一個異步任務,該異步任務向通道中發送10個數據,Receiver端則在while循環中不斷從通道中取數據。

將上面的示例改一下,生成10個異步任務分別發送數據:

use tokio::{ self, runtime::Runtime, sync };

fn main() {
    let rt = Runtime::new().unwrap();
    rt.block_on(async {
        let (tx, mut rx) = sync::mpsc::channel::<i32>(10);

        for i in 1..=10 {
            let tx = tx.clone();
            tokio::spawn(async move {
                if tx.send(i).await.is_err() {
                    println!("receiver closed");
                }
            });
        }
        drop(tx);

        while let Some(i) = rx.recv().await {
            println!("received: {}", i);
        }
    });
}

輸出的結果:

received: 2
received: 3
received: 1
received: 4
received: 6
received: 5
received: 10
received: 7
received: 8
received: 9

10個異步任務發送消息的順序是未知的,因此接收到的消息無法保證順序。

另外注意上面示例中的drop(tx),因為生成的10個異步任務中都擁有clone後的Sender,clone出的Sender在每個異步任務完成時自動被drop,但原始任務中還有一個Sender,如果不關閉這個Sender,rx.recv()將不會返回None,而是一直等待。

如果通道已滿,Sender通過send()發送消息時將等待。例如下面的示例中,通道容量為5,但要發送7個數據,前5個數據會立即發送,發送第6個消息的時候將等待,直到1秒後Receiver開始從通道中消費數據。

use chrono::Local;
use tokio::{self, sync, runtime::Runtime, time::{self, Duration}};

fn now() -> String {
    Local::now().format("%F %T").to_string()
}

fn main() {
    let rt = Runtime::new().unwrap();
    rt.block_on(async {
        let (tx, mut rx) = sync::mpsc::channel::<i32>(5);

        tokio::spawn(async move {
            for i in 1..=7 {
              if tx.send(i).await.is_err() {
                println!("receiver closed");
                return;
              }
              println!("sended: {}, {}", i, now());
            }
        });

        time::sleep(Duration::from_secs(1)).await;
        while let Some(i) = rx.recv().await {
            println!("received: {}", i);
        }
    });
}

輸出結果:

sended: 1, 2021-11-12 18:25:28
sended: 2, 2021-11-12 18:25:28
sended: 3, 2021-11-12 18:25:28
sended: 4, 2021-11-12 18:25:28
sended: 5, 2021-11-12 18:25:28
received: 1
received: 2
received: 3
received: 4
received: 5
sended: 6, 2021-11-12 18:25:29
sended: 7, 2021-11-12 18:25:29
received: 6
sended: 8, 2021-11-12 18:25:29
received: 7
received: 8
received: 9
sended: 9, 2021-11-12 18:25:29
sended: 10, 2021-11-12 18:25:29
received: 10

Sender端和Receiver端有一些額外的方法需要解釋一下它們的作用。

對於Sender端:

  • capacity(): 獲取當前通道的剩餘容量(注意,不是初始化容量)
  • closed(): 等待Receiver端關閉,當Receiver端關閉後該等待任務會立即完成
  • is_closed(): 判斷Receiver端是否已經關閉
  • send(): 向通道中發送消息,通道已滿時會等待通道中的空閒位置,如果對端已關閉,則返回錯誤
  • send_timeout(): 向通道中發送消息,通道已滿時只等待指定的時長
  • try_send(): 向通道中發送消息,但不等待,如果發送不成功,則返回錯誤
  • reserve(): 等待並申請一個通道中的空閒位置,返回一個Permit,申請的空閒位置被佔位,且該位置只留給該Permit實例,之後該Permit可以直接向通道中發送消息,並釋放其佔位的位置。申請成功時,通道空閒容量減1,釋放位置時,通道容量會加1
  • try_reserve(): 嘗試申請一個空閒位置且不等待,如果無法申請,則返回錯誤
  • reserve_owned(): 與reserve()類似,它返回OwnedPermit,但會Move Sender
  • try_reserve_owned(): reserve_owned()的不等待版本,嘗試申請空閒位置失敗時會立即返回錯誤
  • blocking_send(): Sender可以在同步代碼環境中使用該方法向異步環境發送消息

對於Receiver端:

  • close(): 關閉Receiver端
  • recv(): 接收消息,如果通道已空,則等待,如果對端已全部關閉,則返回None
  • try_recv(): 嘗試接收消息,不等待,如果無法接收消息(即通道為空或對端已關閉),則返回錯誤
  • blocking_recv(): Receiver可以在同步代碼環境中使用該方法接收來自異步環境的消息

注意,在這些方法中,try_xxx()方法都是立即返回不等待的(可以認為是同步代碼),因此調用它們後無需await,只有調用那些可能需要等待的方法,調用後才需要await。例如rx.recv().awaitrx.try_recv()

下面是一些稍詳細的用法說明和示例。

Sender端可通過send_timeout()來設置一個等待通道空閒位置的超時時間,它和send()返回值一樣,此外還添加一種超時錯誤:超時後仍然沒有發送成功時將返回錯誤。至於返回的是什麼錯誤,對於發送端來說不重要,重要的是發送的消息是否成功。因此,對於Sender端的條件判斷,通常也僅僅只是檢測is_err()


#![allow(unused)]
fn main() {
if tx.send_timeout(33, Duration::from_secs(1)).await.is_err() {
  println!("receiver closed or timeout");
}
}

需要特別注意的是,Receiver端調用close()方法關閉通道後,只是半關閉狀態,Receiver端仍然可以繼續讀取可能已經緩衝在通道中的消息,close()只能保證Sender端無法再發送普通的消息,但Permit或OwnedPermit仍然可以向通道發送消息。只有通道已空且所有Sender端(包括Permit和OwnedPermit)都已經關閉的情況下,recv()才會返回None,此時代表通道完全關閉

Receiver的try_recv()方法在無法立即接收消息時會立即返回錯誤。返回的錯誤分為兩種:

  • TryRecvError::Empty錯誤: 表示通道已空,但Sender端尚未全部關閉
  • TryRecvError::Disconnected錯誤: 表示通道已空,且Sender端(包括Permit和OwnedPermit)已經全部關閉

關於reserve()reserve_owned(),看官方示例即可輕鬆理解:

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    // 創建容量為1的通道
    let (tx, mut rx) = mpsc::channel(1);
    // 申請並佔有唯一的空閒位置
    let permit = tx.reserve().await.unwrap();
    // 唯一的位置已被permit佔有,tx.send()無法發送消息
    assert!(tx.try_send(123).is_err());
    // Permit可以通過send()方法向它佔有的那個位置發送消息
    permit.send(456);
    // Receiver端接收到消息
    assert_eq!(rx.recv().await.unwrap(), 456);


    // 創建容量為1的通道
    let (tx, mut rx) = mpsc::channel(1);
    // tx.reserve_owned()會消費掉tx
    let permit = tx.reserve_owned().await.unwrap();
    // 通過permit.send()發送消息,它又返回一個Sender
    let tx = permit.send(456);
    assert_eq!(rx.recv().await.unwrap(), 456);
    //可以繼續使用返回的Sender發送消息
    tx.send(789).await.unwrap();
}

無界通道

理解了mpsc的有界通道之後,再理解無界通道會非常輕鬆。


#![allow(unused)]
fn main() {
let (tx, mut rx) = mpsc::unbounded_channel();
}

對於無界通道,它的通道中可以緩衝無限數量的消息,直到內存耗盡。這意味著,Sender端可以無需等待地不斷向通道中發送消息,這也意味著無界通道的Sender既可以在同步環境中也可以在異步環境中向通道中發送消息。只有當Receiver端已經關閉,Sender端的發送才會返回錯誤。

使用無界通道的關鍵,在於必須要保證不會無限度地緩衝消息而導致內存耗盡。例如,讓Receiver端消費消息的速度儘量快,或者採用一些複雜的限速機制讓嚴重超前的Sender端等一等。

broadcast通道

broadcast通道是一種廣播通道,可以有多個Sender端以及多個Receiver端,可以發送多個數據,且任何一個Sender發送的每一個數據都能被所有的Receiver端看到。

使用mpsc::broadcast()創建廣播通道,要求指定一個通道容量作為參數。它返回Sender和Receiver。Sender可以克隆得到多個Sender,可以調用Sender的subscribe()方法來創建新的Receiver。

例如,下面是官方文檔提供的一個示例:

use tokio::sync::broadcast;

#[tokio::main]
async fn main() {
    // 最多存放16個消息
    // tx是Sender,rx1是Receiver
    let (tx, mut rx1) = broadcast::channel(16);

    // Sender的subscribe()方法可生成新的Receiver
    let mut rx2 = tx.subscribe();

    tokio::spawn(async move {
        assert_eq!(rx1.recv().await.unwrap(), 10);
        assert_eq!(rx1.recv().await.unwrap(), 20);
    });

    tokio::spawn(async move {
        assert_eq!(rx2.recv().await.unwrap(), 10);
        assert_eq!(rx2.recv().await.unwrap(), 20);
    });

    tx.send(10).unwrap();
    tx.send(20).unwrap();
}

Sender端通過send()發送消息的時候,如果所有的Receiver端都已關閉,則send()方法返回錯誤。

Receiver端可通過recv()去接收消息,如果所有的Sender端都已經關閉,則該方法返回RecvError::Closed錯誤。該方法還可能返回RecvError::Lagged錯誤,該錯誤表示接收端已經落後於發送端。

雖然broadcast通道也指定容量,但是通道已滿的情況下還可以繼續寫入新數據而不會等待(因此上面示例中的send()無需await),此時通道中最舊的(頭部的)數據將被剔除,並且新數據添加在尾部。就像是FIFO隊列一樣。出現這種情況時,就意味著接收端已經落後於發送端。

當接收端已經開始落後於發送端時,下一次的recv()操作將直接返回RecvError::Lagged錯誤。如果緊跟著再執行recv()且落後現象未再次發生,那麼這次的recv()將取得隊列頭部的消息。

use tokio::sync::broadcast;

#[tokio::main]
async fn main() {
    // 通道容量2
    let (tx, mut rx) = broadcast::channel(2);

    // 寫入3個數據,將出現接收端落後於發送端的情況,
    // 此時,第一個數據(10)將被剔除,剔除後,20將位於隊列的頭部
    tx.send(10).unwrap();
    tx.send(20).unwrap();
    tx.send(30).unwrap();

    // 落後於發送端之後的第一次recv()操作,返回RecvError::Lagged錯誤
    assert!(rx.recv().await.is_err());

    // 之後可正常獲取通道中的數據
    assert_eq!(20, rx.recv().await.unwrap());
    assert_eq!(30, rx.recv().await.unwrap());
}

Receiver也可以使用try_recv()方法去無等待地接收消息,如果Sender都已關閉,則返回TryRecvError::Closed錯誤,如果接收端已落後,則返回TryRecvError::Lagged錯誤,如果通道為空,則返回TryRecvError::Empty錯誤。

另外,tokio::broadcast的任何一個Receiver都可以看到每一次發送的消息,且它們都可以去recv()同一個消息,tokio::broadcast對此的處理方式是消息克隆:每一個Receiver調用recv()去接收一個消息的時候,都會克隆通道中的該消息一次,直到所有存活的Receiver都克隆了該消息,該消息才會從通道中被移除,進而釋放一個通道空閒位置。

這可能會導致一種現象:某個ReceiverA已經接收了通道中的第10個消息,但另一個ReceiverB可能尚未接收第一個消息,由於第一個消息還未被全部接收者所克隆,它仍會保留在通道中並佔用通道的位置,假如該通道的最大容量為10,此時Sender再發送一個消息,那麼第一個數據將被踢掉,ReceiverB接收到消息的時候將收到RecvError::Lagged錯誤並永遠地錯過第一個消息。

watch通道

watch通道的特性是:只能有單個Sender,可以有多個Receiver,且通道永遠只保存一個數據。Sender每次向通道中發送數據時,都會修改通道中的那個數據。

通道中的這個數據可以被Receiver進行引用讀取。

一個簡單的官方示例:

use tokio::sync::watch;
#[tokio::main]
async fn main() {
    // 創建watch通道時,需指定一個初始值存放在通道中
    let (tx, mut rx) = watch::channel("hello");

    // Recevier端,通過changed()來等待通道的數據發生變化
    // 通過borrow()引用通道中的數據
    tokio::spawn(async move {
        while rx.changed().await.is_ok() {
            println!("received = {:?}", *rx.borrow());
        }
    });

    // 向通道中發送數據,實際上是修改通道中的那個數據
    tx.send("world")?;
}

watch通道的用法很簡單,但是有些細節需要理解。

Sender端可通過subscribe()創建新的Receiver端。

當所有Receiver端均已關閉時,send()方法將返回錯誤。也就是說,send()必須要在有Receiver存活的情況下才能發送數據。

但是Sender端還有一個send_replace()方法,它可以在沒有Receiver的情況下將數據寫入通道,並且該方法會返回通道中原來保存的值。

無論是Sender端還是Receiver端,都可以通過borrow()方法取得通道中當前的值。由於可以有多個Receiver,為了避免讀寫時的數據不一致,watch內部使用了讀寫鎖:Sender端要發送數據修改通道中的數據時,需要申請寫鎖,論是Sender還是Receiver端,在調用borrow()或其它一些方式訪問通道數據時,都需要申請讀鎖。因此,訪問通道數據時要儘快釋放讀鎖,否則可能會長時間阻塞Sender端的發送操作。

如果Sender端未發送數據,或者隔較長時間才發送一次數據,那麼通道中的數據在一段時間內將一直保持不變。如果Receiver在這段時間內去多次讀取通道,得到的結果將完全相同。但有時候,可能更需要的是等待通道中的數據已經發生變化,然後再根據新的數據做進一步操作,而不是循環不斷地去讀取並判斷當前讀取到的值是否和之前讀取的舊值相同。

watch通道已經提供了這種功能:Receiver端可以標記通道中的數據,記錄該數據是否已經被讀取過。Receiver端的changed()方法用於等待通道中的數據發生變化,其內部判斷過程是:如果通道中的數據已經被標記為已讀取過,那麼changed()將等待數據更新,如果數據未標記過已讀取,那麼changed()認為當前數據就是新數據,changed()會立即返回。

Receiver端的borrow()方法不會標記數據已經讀取,所以borrow()之後調用的changed()會立即返回。但是changed()等待到新值之後,會立即將該值標記為已讀取,使得下次調用changed()時會進行等待。

此外,Receiver端還有一個borrow_and_update()方法,它會讀取數據並標記數據已經被讀取,因此隨後調用chagned()將進入等待。

最後再強調一次,無論是Sender端還是Receiver端,訪問數據的時候都會申請讀鎖,要儘量快地釋放讀鎖,以免Sender長時間無法發送數據。