使用tokio Timer

本篇介紹tokio的計時器功能:Timer。

每一個異步框架都應該具備計時器功能,tokio的計時器功能在開啟了time特性後可用。

tokio = {version = "1.13", features = ["rt", "rt-multi-thread", "time"]}

tokio的time模塊包含幾個功能:

  • Duration類型:是對std::time::Duration的重新導出,兩者等價。它用於描述持續時長,例如睡眠3秒的3秒是一個時長,每隔3秒的3秒也是一個時長
  • Instant類型:從程序運行開始就單調遞增的時間點,僅結合Duration一起使用。例如,此刻是處在某個時間點A,下一次(例如某個時長過後),處在另一個時間點B,時間點B一定不會早於時間點A,即便修改了操作系統的時鐘或硬件時鐘,它也不會時光倒流的現象
  • Sleep類型:是一個Future,通過調用sleep()sleep_until()返回,該Future本身不做任何事,它只在到達某個時間點(Instant)時完成
  • Interval類型:是一個流式的間隔計時器,通過調用interval()interval_at()返回。Interval使用Duration來初始化,表示每隔一段時間(即指定的Duration時長)後就產生一個值
  • Timeout類型:封裝異步任務,併為異步任務設置超時時長,通過調用timeout()timeout_at()返回。如果異步任務在指定時長內仍未完成,則異步任務被強制取消並返回Error

時長: tokio::time::Duration

tokio::time::Duration是對std::time::Duration的Re-exports,它兩完全等價,因此可在tokio上下文中使用任何一種Duration。

Duration類型描述了一種時長,該結構包含兩部分:秒和納秒。


#![allow(unused)]
fn main() {
pub struct Duration {
    secs: u64,
    nanos: u32,
}
}

可使用Duration::new(Sec, Nano_sec)來構建Duration。例如,Duration::new(5, 30)構建了一個5秒30納秒的時長,即總共5_000_000_030納秒。

如果Nano_sec部分超出了納秒範圍(1秒等於10億納秒),將進位到秒單位上,例如第二個參數指定為500億納秒,那麼會向秒部分加50秒。

注意,構建時長時,這兩部分的值可能會超出範圍,例如計算後的秒部分的值超出了u64的範圍,或者計算得到了負數。對此,Duration提供了幾種不同的處理方式。

特殊地,如果兩個參數都指定為0,那麼表示時長為0,可用is_zero()來檢測某個Duration是否是0時長。0時長可用於上下文切換(yield),例如sleep睡眠0秒,表示不用睡眠,但會交出CPU使得發生上下文切換。

還可以使用如下幾種簡便的方式構建各種單位的時長:

  • Duration::from_secs(3):3秒時長
  • Duration::from_millis(300):300毫秒時長
  • Duration::from_micros(300):300微秒時長
  • Duration::from_nanos(300):300納秒時長
  • Duration::from_secs_f32(2.3):2.3秒時長
  • Duration::from_secs_f64(2.3):2.3秒時長

對於構建好的Duration實例dur = Duration::from_secs_f32(2.3),可以使用如下幾種方法方便地提取、轉換它的秒、毫秒、微秒、納秒。

  • dur.as_secs():轉換為秒的表示方式,2
  • dur.as_millis(): 轉換為毫秒錶示方式,2300
  • dur.as_micros(): 轉換為微秒錶示方式,2_300_000
  • dur.as_nanos(): 轉換為納秒錶示方式,2_300_000_000
  • dur.as_secs_f32(): 小數秒錶示方式,2.3
  • dur.as_secs_f64(): 小數秒錶示方式,2.3
  • dur.subsec_millis(): 小數部分轉換為毫秒精度的表示方式,300
  • dur.subsec_micros(): 小數部分轉換為微秒精度的表示方式,300_000
  • dur.subsec_nanos(): 小數部分轉換為納秒精度的表示方式,300_000_000

Duration實例可以直接進行大小比較以及加減乘除運算:

  • checked_add(): 時長的加法運算,超出Duration範圍時返回None
  • checked_sub(): 時長的減法運算,超出Duration範圍時返回None
  • checked_mul(): 時長的乘法運算,超出Duration範圍時返回None
  • checked_div(): 時長的除法運算,超出Duration範圍時(即分母為0)返回None
  • saturating_add():飽和式的加法運算,超出範圍時返回Duration支持的最大時長
  • saturating_mul():飽和式的乘法運算,超出範圍時返回Duration支持的最大時長
  • saturating_sub():飽和式的減法運算,超出範圍時返回0時長
  • mul_f32():時長乘以小數,得到的結果如果超出範圍或無效,則panic
  • mul_f64():時長乘以小數,得到的結果如果超出範圍或無效,則panic
  • div_f32():時長除以小數,得到的結果如果超出範圍或無效,則panic
  • div_f64():時長除以小數,得到的結果如果超出範圍或無效,則panic

時間點: tokio::time::Instant

Instant用於表示時間點,主要用於兩個時間點的比較和相關運算。

tokio::time::Instant是對std::time::Instant的封裝,添加了一些對齊功能,使其能夠適用於tokio runtime。

Instant是嚴格單調遞增的,絕不會出現時光倒流的現象,即之後的時間點一定晚於之前創建的時間點。但是,tokio time提供了pause()函數可暫停時間點,還提供了advance()函數用於向後跳轉到某個時間點。

tokio::time::Instant::now()用於創建代表此時此刻的時間點。Instant可以直接進行大小比較,還能執行+-操作。

use tokio;
use tokio::time::Instant;
use tokio::time::Duration;

#[tokio::main]
async fn main() {
    // 創建代表此時此刻的時間點
    let now = Instant::now();
    
    // Instant 加一個Duration,得到另一個Instant
    let next_3_sec = now + Duration::from_secs(3);
    // Instant之間的大小比較
    println!("{}", now < next_3_sec);  // true
    
    // Instant減Duration,得到另一個Instant
    let new_instant = next_3_sec - Duration::from_secs(2);
    
    // Instant減另一個Instant,得到Duration
    // 注意,Duration有它的有效範圍,因此必須是大的Instant減小的Instant,反之將panic
    let duration = next_3_sec - new_instant;
}

此外tokio::time::Instant還有以下幾個方法:

  • from_std(): 將std::time::Instant轉換為tokio::time::Instant
  • into_std(): 將tokio::time::Instant轉換為std::time::Instant
  • elapsed(): 指定的時間點實例,距離此時此刻的時間點,已經過去了多久(返回Duration)
  • duration_since(): 兩個Instant實例之間相差的時長,要求B.duration_since(A)中的B必須晚於A,否則panic
  • checked_duration_since(): 兩個時間點之間的時長差,如果計算返回的Duration無效,則返回None
  • saturating_duration_since(): 兩個時間點之間的時長差,如果計算返回的Duration無效,則返回0時長的Duration實例
  • checked_add(): 為時間點加上某個時長,如果加上時長後是無效的Instant,則返回None
  • checked_sub(): 為時間點減去某個時長,如果減去時長後是無效的Instant,則返回None

tokio頂層也提供了一個tokio::resume()方法,功能類似於tokio::time::from_std(),都是將std::time::Instant::now()保存為tokio::time::Instant。不同的是,後者用於創建tokio time Instant時間點,而resume()是讓tokio的Instant的計時系統與系統的計時系統進行一次同步更新。

睡眠: tokio::time::Sleep

tokio::time::sleep()tokio::time::sleep_until()提供tokio版本的睡眠任務:

use tokio::{self, runtime::Runtime, time};

fn main(){
    let rt = Runtime::new().unwrap();
    rt.block_on(async {
        // 睡眠2秒
        time::sleep(time::Duration::from_secs(2)).await;

        // 一直睡眠,睡到2秒後醒來
        time::sleep_until(time::Instant::now() + time::Duration::from_secs(2)).await;
    });
}

注意,std::thread::sleep()會阻塞當前線程,而tokio的睡眠不會阻塞當前線程,實際上tokio的睡眠在進入睡眠後不做任何事,僅僅只是立即放棄CPU,並進入任務輪詢隊列,等待睡眠時間終點到了之後被Reactor喚醒,然後進入就緒隊列等待被調度。

可以簡單理解異步睡眠:調用睡眠後,記錄睡眠的終點時間點,之後在輪詢到該任務時,比較當前時間點是否已經超過睡眠終點,如果超過了,則喚醒該睡眠任務,如果未超過終點,則不管。

注意,tokio的sleep的睡眠精度是毫秒,因此無法保證也不應睡眠更低精度的時間。例如不要睡眠100微秒或100納秒,這時無法保證睡眠的時長。

下面是一個睡眠10微秒的例子,多次執行,會發現基本上都要1毫秒多,去掉執行指令的時間,實際的睡眠時長大概是1毫秒。另外,將睡眠10微秒改成睡眠100微秒或1納秒,結果也是接近的。

use tokio::{self, runtime::Runtime, time};

fn main() {
    let rt = Runtime::new().unwrap();
    rt.block_on(async {
        let start = time::Instant::now();
        // time::sleep(time::Duration::from_nanos(100)).await;
        // time::sleep(time::Duration::from_micros(100)).await;
        time::sleep(time::Duration::from_micros(10)).await;
        println!("sleep {}", time::Instant::now().duration_since(start).as_nanos());
    });
}

執行的多次,輸出結果:

sleep 1174300
sleep 1202900
sleep 1161200
sleep 1393200
sleep 1306400
sleep 1285300

sleep()sleep_until()都返回time::Sleep類型,它有3個方法可調用:

  • deadline(): 返回Instant,表示該睡眠任務的睡眠終點
  • is_elapsed(): 可判斷此時此刻是否已經超過了該sleep任務的睡眠終點
  • reset():可用於重置睡眠任務。如果睡眠任務未完成,則直接修改睡眠終點,如果睡眠任務已經完成,則再次創建睡眠任務,等待新的終點

需要注意的是,reset()要求修改睡眠終點,因此Sleep實例需要是mut的,但這樣會消費掉Sleep實例,更友好的方式是使用tokio::pin!(sleep)將sleep給pin在當前棧中,這樣就可以調用as_mut()方法獲取它的可修改版本。

use chrono::Local;
use tokio::{self, runtime::Runtime, time};

#[allow(dead_code)]
fn now() -> String {
    Local::now().format("%F %T").to_string()
}

fn main() {
    let rt = Runtime::new().unwrap();
    rt.block_on(async {
        println!("start: {}", now());
        let slp = time::sleep(time::Duration::from_secs(1));
        tokio::pin!(slp);

        slp.as_mut().reset(time::Instant::now() + time::Duration::from_secs(2));

        slp.await;
        println!("end: {}", now());
    });
}

輸出:

start: 2021-11-02 21:57:42
end: 2021-11-02 21:57:44

重置已完成的睡眠實例:

use chrono::Local;
use tokio::{self, runtime::Runtime, time};

#[allow(dead_code)]
fn now() -> String {
    Local::now().format("%F %T").to_string()
}

fn main() {
    let rt = Runtime::new().unwrap();
    rt.block_on(async {
        println!("start: {}", now());
        let slp = time::sleep(time::Duration::from_secs(1));
        tokio::pin!(slp);
        
        //注意調用slp.as_mut().await,而不是slp.await,後者會move消費掉slp
        slp.as_mut().await;
        println!("end 1: {}", now());

        slp.as_mut().reset(time::Instant::now() + time::Duration::from_secs(2));

        slp.await;
        println!("end 2: {}", now());
    });
}

輸出結果:

start: 2021-11-02 21:59:25
end 1: 2021-11-02 21:59:26
end 2: 2021-11-02 21:59:28

任務超時: tokio::time::Timeout

tokio::time::timeout()tokio::time::timeout_at()可設置一個異步任務的完成超時時間,前者接收一個Duration和一個Future作為參數,後者接收一個Instant和一個Future作為參數。這兩個函數封裝異步任務之後,返回time::Timeout,它也是一個Future。

如果在指定的超時時間內該異步任務已完成,則返回該異步任務的返回值,如果未完成,則異步任務被撤銷並返回Err。

use chrono::Local;
use tokio::{self, runtime::Runtime, time};

fn now() -> String {
    Local::now().format("%F %T").to_string()
}

fn main() {
    let rt = Runtime::new().unwrap();
    rt.block_on(async {
        let res = time::timeout(time::Duration::from_secs(5), async {
            println!("sleeping: {}", now());
            time::sleep(time::Duration::from_secs(6)).await;
            33
        });

        match res.await {
            Err(_) => println!("task timeout: {}", now()),
            Ok(data) => println!("get the res '{}': {}", data, now()),
        };
    });
}

得到結果:

sleeping: 2021-11-03 17:12:33
task timeout: 2021-11-03 17:12:38

如果將睡眠6秒改為睡眠4秒,那麼將得到結果:

sleeping: 2021-11-03 17:13:11
get the res '33': 2021-11-03 17:13:15

得到time::Timeout實例res後,可以通過res.get_ref()或者res.get_mut()獲得Timeout所封裝的Future的可變和不可變引用,使用res.into_inner()獲得所封裝的Future,這會消費掉該Future。

如果要取消Timeout的計時等待,直接刪除掉Timeout實例即可。

間隔任務: tokio::time::Interval

tokio::time::interval()tokio::time::interval_at()用於設置間隔性的任務。

  • interval_at(): 接收一個Instant參數和一個Duration參數,Instant參數表示間隔計時器的開始計時點,Duration參數表示間隔的時長
  • interval(): 接收一個Duration參數,它在第一次被調用的時候立即開始計時

注意,這兩個函數只是定義了間隔計時器的起始計時點和間隔的時長,要真正開始讓它開始計時,還需要調用它的tick()方法生成一個Future任務,並調用await來執行並等待該任務的完成。

例如,5秒後開始每隔1秒執行一次輸出操作:

use chrono::Local;
use tokio::{self, runtime::Runtime, time::{self, Duration, Instant}};

fn now() -> String {
    Local::now().format("%F %T").to_string()
}

fn main() {
    let rt = Runtime::new().unwrap();
    rt.block_on(async {
        println!("before: {}", now());

        // 計時器的起始計時點:此時此刻之後的5秒後
        let start = Instant::now() + Duration::from_secs(5);
        let interval = Duration::from_secs(1);
        let mut intv = time::interval_at(start, interval);

        // 該計時任務"阻塞",直到5秒後被喚醒
        intv.tick().await;
        println!("task 1: {}", now());

        // 該計時任務"阻塞",直到1秒後被喚醒
        intv.tick().await;
        println!("task 2: {}", now());

        // 該計時任務"阻塞",直到1秒後被喚醒
        intv.tick().await;
        println!("task 3: {}", now());
    });
}

輸出結果:

before: 2021-11-03 18:52:14
task 1: 2021-11-03 18:52:19
task 2: 2021-11-03 18:52:20
task 3: 2021-11-03 18:52:21

上面定義的計時器,有幾點需要說明清楚:

  1. interval_at()第一個參數定義的是計時器的開始時間,這樣描述不準確,它表述的是最早都要等到這個時間點才開始計時。例如,定義計時器5秒之後開始計時,但在第一次tick()之前,先睡眠了10秒,那麼該計時器將在10秒後才開始,但如果第一次tick之前只睡眠了3秒,那麼還需再等待2秒該tick計時任務才會完成。
  2. 定義計時器時,要將其句柄(即計時器變量)聲明為mut,因為每次tick時,都需要修改計時器內部的下一次計時起點。
  3. 不像其它語言中的間隔計時器,tokio的間隔計時器需要手動調用tick()方法來生成臨時的異步任務。
  4. 刪除計時器句柄可取消間隔計時器。

再看下面的示例,定義5秒後開始的計時器,但在第一次開始計時前,先睡眠10秒。

use chrono::Local;
use tokio::{
    self,
    runtime::Runtime,
    time::{self, Duration, Instant},
};

fn now() -> String {
    Local::now().format("%F %T").to_string()
}

fn main() {
    let rt = Runtime::new().unwrap();
    rt.block_on(async {
        println!("before: {}", now());

        let start = Instant::now() + Duration::from_secs(5);
        let interval = Duration::from_secs(1);
        let mut intv = time::interval_at(start, interval);

        time::sleep(Duration::from_secs(10)).await;
        intv.tick().await;
        println!("task 1: {}", now());
        intv.tick().await;
        println!("task 2: {}", now());
    });
}

輸出結果:

before: 2021-11-03 19:00:10
task 1: 2021-11-03 19:00:20
task 2: 2021-11-03 19:00:20

注意輸出結果中的task 1和task 2的時間點是相同的,說明第一次tick之後,並沒有等待1秒之後再執行緊跟著的tick,而是立即執行之。

簡單解釋一下上面示例中的計時器內部的工作流程,假設定義計時器的時間點是19:00:10:

  • 定義5秒後開始的計時器intv,該計時器內部有一個字段記錄著下一次開始tick的時間點,其值為19:00:15
  • 睡眠10秒後,時間點到了19:00:20,此時第一次執行intv.tick(),它將生成一個異步任務,執行器執行時發現此時此刻的時間點已經超過該計時器內部記錄的值,於是該異步任務立即完成並進入就緒隊列等待調度,同時修改計時器內部的值為19:00:16
  • 下一次執行tick的時候,此時此刻仍然是19:00:20,已經超過了該計時器內部的19:00:16,因此計時任務立即完成

這是tokio Interval在遇到計時延遲時的默認計時策略,叫做Burst。tokio支持三種延遲後的計時策略。可使用set_missed_tick_behavior(MissedTickBehavior)來修改計時策略。

1.Burst策略,衝刺型的計時策略,當出現延遲後,將盡量快地完成接下來的tick,直到某個tick趕上它正常的計時時間點

例如,5秒後開始的每隔1秒的計時器,第一次開始tick前睡眠了10秒,那麼10秒後將立即進行如下幾次tick,或者說瞬間完成如下幾次tick:

  • 第一次tick,它本該在第五秒的時候被執行
  • 第二次tick,它本該在第六秒的時候被執行
  • 第三次tick,它本該在第七秒的時候被執行
  • 第四次tick,它本該在第八秒的時候被執行
  • 第五次tick,它本該在第九秒的時候被執行
  • 第六次tick,它本該在第十秒的時候被執行

而第七次tick的時間點,將回歸正常,即在第十一秒的時候開始被執行。

2.Delay策略,延遲性的計時策略,當出現延遲後,仍然按部就班地每隔指定的時長計時。在內部,這種策略是在每次執行tick之後,都修改下一次計時起點為Instant::now() + Duration。因此,這種策略下的任何相鄰兩次的tick,其中間間隔的時長都至少達到Duration。

例如:

use chrono::Local;
use tokio::{self, runtime::Runtime};
use tokio::time::{self, Duration, Instant, MissedTickBehavior};

fn now() -> String {
    Local::now().format("%F %T").to_string()
}

fn main() {
    let rt = Runtime::new().unwrap();
    rt.block_on(async {
        println!("before: {}", now());

        let mut intv = time::interval_at(
            Instant::now() + Duration::from_secs(5),
            Duration::from_secs(2),
        );
        intv.set_missed_tick_behavior(MissedTickBehavior::Delay);

        time::sleep(Duration::from_secs(10)).await;

        println!("start: {}", now());
        intv.tick().await;
        println!("tick 1: {}", now());
        intv.tick().await;
        println!("tick 2: {}", now());
        intv.tick().await;
        println!("tick 3: {}", now());
    });
}

輸出結果:

before: 2021-11-03 19:31:02
start: 2021-11-03 19:31:12
tick 1: 2021-11-03 19:31:12
tick 2: 2021-11-03 19:31:14
tick 3: 2021-11-03 19:31:16

3.Skip策略,忽略型的計時策略,當出現延遲後,仍然所有已經被延遲的計時任務。這種策略總是以定義計時器時的起點為基準,類似等差數量,每一次執行tick的時間點,一定符合Start + N * Duration

use chrono::Local;
use tokio::{self, runtime::Runtime};
use tokio::time::{self, Duration, Instant, MissedTickBehavior};

fn now() -> String {
    Local::now().format("%F %T").to_string()
}

fn main() {
    let rt = Runtime::new().unwrap();
    rt.block_on(async {
        println!("before: {}", now());

        let mut intv = time::interval_at(
            Instant::now() + Duration::from_secs(5),
            Duration::from_secs(2),
        );
        intv.set_missed_tick_behavior(MissedTickBehavior::Skip);

        time::sleep(Duration::from_secs(10)).await;

        println!("start: {}", now());
        intv.tick().await;
        println!("tick 1: {}", now());
        intv.tick().await;
        println!("tick 2: {}", now());
        intv.tick().await;
        println!("tick 3: {}", now());
    });
}

輸出結果:

before: 2021-11-03 19:34:53
start: 2021-11-03 19:35:03
tick 1: 2021-11-03 19:35:03
tick 2: 2021-11-03 19:35:04
tick 3: 2021-11-03 19:35:06

注意上面的輸出結果中,第一次tick和第二次tick只相差1秒而不是相差2秒。

上面通過interval_at()解釋清楚了tokio::time::Interval的三種計時策略。但在程序中,更大的可能是使用interval()來定義間隔計時器,它等價於interval_at(Instant::now() + Duration),表示計時起點從現在開始的計時器。

此外,可以使用period()方法獲取計時器的間隔時長,使用missed_tick_behavior()獲取當前的計時策略。