Build with Naz : Rust async in practice tokio::select!, actor pattern & cancel safety
- Introduction
- What can go wrong when racing futures?
- YouTube video for this article
- Examples of cancellation safety in async Rust using tokio::select!
- Build with Naz video series on developerlife.com YouTube channel
Introduction #
This tutorial, video, and repo are a deep dive into the concept of cancellation safety in
async code using Tokio and Rust. It affects the tokio::select! macro, and what happens
to the racing Futures that don’t win. The examples provided here, along with the video,
will go over both code that is is cancellation safe and code that is not. These examples
reflect real-world patterns, and are a generalized form of them.
tokio::select! might as well have been called tokio::race! (there’s a The Fast and
Furious : Tokyo
Drift joke in there
somewhere).
It races the given futures in the branches of the macro, and the first one to resolve wins
(it is Ready when poll()ed). The other futures are dropped. These futures are run
concurrently, not in parallel, on the same worker thread, since we are not using
tokio::spawn! or its variants.
Here’s the basic setup:
loop {
tokio::select!{
branch_1_result = future_1 => {
// handle branch_1_result
},
branch_2_result = future_2 => {
// handle branch_2_result
},
// and so on
}
}
A classic example is that you’re reading something from an async network or file stream.
And you want to have a timeout that breaks out of the loop if it takes too long. In this
case you might have two branches:
- A
tokio::time::sleep()Futurein the timeout branch. - Some code to get the data asynchronously from the stream in the other branch.
Another example is that you might be waiting for the user to type something from the keyboard or mouse (such as a TUI app) and also listen for signals to shut down the app, or other signals to perform re-rendering of the TUI. You can see this in
r3bl_tuihere and inr3bl_terminal_asynchere.
Note that all branches must have a Future to call .await on. The macro does not
require you to call .await. The code it generates take care of this.
It might be worth your time (if you haven’t already) to read the official Tokio docs on
tokio::select!macro and the concept of cancellation safety before diving into the examples below.
What can go wrong when racing futures? #
If you recall, in Rust, a Future is just a data structure that doesn’t really do
anything until you .await it.
- The Tokio runtime actually does work on the
Futures by polling them to see whether they areReadyorPending. - If they’re not
Readythey go back to waiting until theirWakeris called, and then Tokio willpoll()them again. - They are cheap to create, they are stateful, and they can be nested (easily composed).
Please read our article on effective async Rust to get a better understanding of how async Rust, and
Futures works and how runtimes are implemented.
These are some of the great things about Rust Futures. However, the nature of a Rust
Future is what may cause a problem with “cancellation safety” in the tokio::select!
macro.
So what happens to future_2 (the branch reading or writing from an async stream) if the
timeout branch (for future_1) wins the race?
- Is the
future_2in the middle of doing something when this happens? - And if so, what happens to the work it was doing when it hits the
.awaitpoint in its code, and then stops?
This is the crux of the issue with cancellation safety in async Rust code. Lots of tokio
code is built to be cancellation safe, so if you’re using mpsc or broadcast channels,
async streams, etc. you will be fine. However if you’re maintaining state inside the
future_2 and then it is dropped, then this article will help you understand what
happens.
YouTube video for this article #
This blog post has examples from this live coding video. If you like to learn via video, please watch the companion video on the developerlife.com YouTube channel.
Examples of cancellation safety in async Rust using tokio::select! #
Let’s create some examples to illustrate how to use the typestate pattern in Rust. You can run
cargo new --lib async_cancel_safe to create a new library crate.
💡 You can get the code from the
rust-scratchrepo.
Then add the following to the Cargo.toml file that’s generated. These pull in all the
dependencies that we need for these examples.
[package]
name = "async_cancel_safe"
version = "0.1.0"
edition = "2021"
[dependencies]
tokio = { version = "1.38.0", features = ["full"] }
# Async stream testing.
r3bl_test_fixtures = { version = "0.0.2" }
futures-util = "0.3.30"
We are going to add all the examples below as tests to the lib.rs file in this crate.
Example 1: Right and wrong way to sleep, and interval #
Add the following code to your lib.rs file. Both these examples show similar ways of using
tokio::time::sleep(..) incorrectly in a tokio::select! block.
/// Equivalent to [test_sleep_right_and_wrong_ways_v2]. This test uses
/// [`tokio::pin`] and [`tokio::time::sleep`].
/// Run the test using:
/// `cargo test -- --nocapture test_sleep_right_and_wrong_ways_v1`
#[tokio::test]
async fn test_sleep_right_and_wrong_ways_v1() {
let mut count = 5;
let sleep_time = 100;
let duration = std::time::Duration::from_millis(sleep_time);
let sleep = tokio::time::sleep(duration);
tokio::pin!(sleep);
loop {
tokio::select! {
// Branch 1 (right way)
// This branch executes a deterministic number of times. The same
// sleep future is re-used on each iteration. Once the sleep "expires"
// it stays "expired"! This is the desired behavior:
// https://docs.rs/tokio/latest/tokio/time/struct.Sleep.html
_ = &mut sleep => {
println!("branch 1 - tick : {count}");
count -= 1;
if count == 0 {
break;
}
}
// Branch 2 (wrong way)
// This branch is executed a non deterministic number of times.
// This is because the sleep future is not pinned. It is dropped
// when the other branch is executed. Then on the next iteration,
// a new sleep future is created.
_ = tokio::time::sleep(duration) => {
println!("branch 2 - sleep");
}
}
}
}
/// Equivalent to [test_sleep_right_and_wrong_ways_v1]. This test uses
/// [`tokio::time::interval()`]
/// Run the test using:
/// `cargo test -- --nocapture test_sleep_right_and_wrong_ways_v2`
#[tokio::test]
async fn test_sleep_right_and_wrong_ways_v2() {
let mut count = 5;
let sleep_time = 100;
let duration = std::time::Duration::from_millis(sleep_time);
let mut interval = tokio::time::interval(duration);
loop {
tokio::select! {
// Branch 1 (right way)
// This branch executes a deterministic number of times. The same
// sleep future is re-used on each iteration.
_ = interval.tick() => {
println!("branch 1 - tick : {count}");
count -= 1;
if count == 0 {
break;
}
}
// Branch 2 (wrong way)
// This branch is executed a non deterministic number of times.
// This is because the sleep future is not pinned. It is dropped
// when the other branch is executed. Then on the next iteration,
// a new sleep future is created.
_ = tokio::time::sleep(duration) => {
println!("branch 2 - sleep");
}
}
}
}
You can run these tests to see what they do by running the following in your terminal:
cargo test -- --nocapture test_sleep_right_and_wrong_ways_v1cargo test -- --nocapture test_sleep_right_and_wrong_ways_v2
They are flaky and its not possible to really make accurate assertions at the end of each of these tests.
Let’s break down v1 first to see what is happening. Here’s the output:
---- test_sleep_right_and_wrong_ways_v1 stdout ----
branch 2 - sleep : 5, elapsed: 101 ms
branch 1 - tick : 5, elapsed: 101 ms
branch 1 - tick : 4, elapsed: 101 ms
branch 1 - tick : 3, elapsed: 101 ms
branch 1 - tick : 2, elapsed: 101 ms
branch 1 - tick : 1, elapsed: 101 msk
- Branch 1 (right way): This branch executes a deterministic number of times. The same
sleep future is re-used on each iteration. This is achieved using the
tokio::pin!macro. Here are the docs on how to useSleepintokio::select!blocks. Since futures are stateful, ensuring that the same one is re-used between iterations of theloopensures that state isn’t lost when the other branch is executed, or when this branch finishes and its future is dropped. Notice that the first time inbranch 1the code waits for 100ms, and then the subsequent 4 iterations of the loop do not wait at all! This is because thesleepfuture is in aReadystate after the first iteration, and effectively, we only wait 100ms in this loop. For those familiar with Javascript, this is akin tosetTimeoutand notsetIntervalsemantics. - Branch 2 (wrong way): This branch is executed a non deterministic number of times. This is because the sleep future is not pinned. It is dropped when the other branch is executed. Then on the next iteration, a new sleep future is created. This means that the state of the future is lost, and its behavior with providing a reliable delay is non deterministic.
Let’s break down v2 next.
- Branch 1 (right way): This branch executes a deterministic number of times. However, we
are using
tokio::time::interval()this time around. It is re-used between many iterations of theloop. This function returns aIntervalstruct that has atick()method that returns aFuturethat resolves when the interval has elapsed. - Branch 2 (wrong way): Same as before.
Difference between interval and sleep #
This is the mental model that I’ve developed for using these.
- If your intention is to have a single timeout then,
sleepmight be the way to go. You create andtokio::pin!thesleepfuture, and then re-use it in theloop. Once this timeout expires, then you can handle your timeout condition in that branch. - If your intention is to have a re-usable timer that ticks on a regular interval, then
intervalis the way to go. You create theintervaloutside theloop, and then calltick()on it in theloop. This will give you aFuturethat resolves when the interval has elapsed. And you can safely use this sameIntervalrepeatedly in the loop. And even accumulate how many times it runs to decide when to break.
Example 2: Safe cancel of a future using interval and mpsc channel #
Add the following snippet to your lib.rs file.
/// Run the test using:
/// `cargo test -- --nocapture test_safe_cancel_example`
#[tokio::test]
async fn test_safe_cancel_example() {
let sleep_time = 100;
let duration = std::time::Duration::from_millis(sleep_time);
let mut count = 5;
let mut interval = tokio::time::interval(duration);
// Shutdown channel.
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
let mut vec: Vec<usize> = vec![];
loop {
tokio::select! {
// Branch 1.
_ = interval.tick() => {
println!("branch 1 - tick : count {}", count);
vec.push(count);
count = count.saturating_sub(1);
if count == 0 {
_ = tx.try_send(());
}
}
// Branch 2.
_ = rx.recv() => {
println!("branch 2 => shut down");
break;
}
}
}
assert_eq!(vec, vec![5, 4, 3, 2, 1]);
}
When you run this test using cargo test -- --nocapture test_safe_cancel_example, you should
get this output in your terminal:
running 1 test
branch 1 - tick : count 5
branch 1 - tick : count 4
branch 1 - tick : count 3
branch 1 - tick : count 2
branch 1 - tick : count 1
branch 2 => shut down
Let’s break down what’s happening in this test.
Branch 1- Theintervalis created outside theloopand is used to create aFuturethat resolves when the interval has elapsed. This happens inBranch 1and we let this branch run5times before sending a message on thetxchannel.Branch 2- Thetxchannel is used to send a message to therxchannel. This is done inBranch 1whencountreaches0. Therxchannel is used to receive a message. This is done inBranch 2and when a message is received, we break out of theloop.
Branch 1 runs 5 times, and Branch 1 runs 1 time and breaks out of the loop. If you
look at the vec that we accumulate outside of the loop this contains what we expect.
Example 3: Inducing cancellation safety issues #
This is the example we have all been waiting for. Let’s start with copying the
following snippet in your lib.rs file. We will create a new module here.
#[cfg(test)]
pub mod test_unsafe_cancel_example {
use r3bl_test_fixtures::{gen_input_stream_with_delay, PinnedInputStream};
pub fn get_input_vec() -> Vec<usize> {
vec![1, 2, 3, 4]
}
pub fn get_stream_delay() -> std::time::Duration {
std::time::Duration::from_millis(100)
}
fn get_input_stream() -> PinnedInputStream<usize> {
gen_input_stream_with_delay(get_input_vec(), get_stream_delay())
}
/// This is just to see how to use the async stream [gen_input_stream()].
#[tokio::test]
async fn test_generate_event_stream_pinned() {
use futures_util::StreamExt;
let mut count = 0;
let mut stream = get_stream();
while let Some(item) = stream.next().await {
let lhs = item;
let rhs = get_input_vec()[count];
assert_eq!(lhs, rhs);
count += 1;
}
}
// <more stuff to add later>
}
Let’s break down what’s happening here.
get_input_vec()- This function returns aVec<usize>that we will use to generate events in thegen_input_stream()function. This is meant to simulate the stream ofusizevalues that may be generated from reading a file or a network source. Or even write to a file or network source. We could have just made theseu8, but this is a made up test, so we are usingusize.gen_input_stream()- This is where things get interesting. This function creates an async stream that yields the values from theVec<usize>returned byget_input_vec(). It waits for100msbetween each value that it yields. This is to simulate the delay that might be present when reading from a file or network source. Note the trait magic and imports that are used to make this work; to get the details on this, check our article on trait pointers and testing.- These two functions are our test fixture to simulate a slow async stream. Now, let’s
test the test fixtures in
test_generate_event_stream_pinned(). This test simply reads from the async stream and compares the values that it reads with the values that are expected from theVec<usize>returned byget_input_vec().
You can get the
r3bl_test_fixturessource here. You can get the crate from crates.io.
In lib.rs replace the // <more stuff to add later> with the following code:
/// There is no need to [futures_util::FutureExt::fuse()] the items in each
/// [tokio::select!] branch. This is because Tokio's event loop is designed to handle
/// this efficiently by remembering the state of each future across iterations.
///
/// More info: <https://gemini.google.com/app/e55fd62339b674fb>
#[rustfmt::skip]
async fn read_3_items_not_cancel_safe(stream: &mut PinnedInputStream<usize>)
-> Vec<usize>
{
use futures_util::StreamExt;
let mut vec = vec![];
println!("branch 2 => entering read_3_items_not_cancel_safe");
for _ in 0..3 {
let item = stream.next() /* .fuse() */ .await.unwrap();
println!("branch 2 => read_3_items_not_cancel_safe got item: {item}");
vec.push(item);
println!("branch 2 => vec so far contains: {vec:?}");
}
vec
}
/// There is no need to [futures_util::FutureExt::fuse()] the items in each
/// [tokio::select!] branch. This is because Tokio's event loop is designed to handle
/// this efficiently by remembering the state of each future across iterations.
///
/// More info: <https://gemini.google.com/app/e55fd62339b674fb>
#[tokio::test]
async fn test_unsafe_cancel_stream() {
use futures_util::StreamExt;
let mut stream = get_input_stream();
let sleep_time = 300;
let duration = std::time::Duration::from_millis(sleep_time);
let sleep = tokio::time::sleep(duration);
tokio::pin!(sleep);
loop {
tokio::select! {
// Branch 1 - Timeout.
_ = &mut sleep => {
println!("branch 1 - time is up - end");
break;
}
// Branch 2 - Read from stream.
it = read_3_items_not_cancel_safe(&mut stream) /* .fuse() */ => {
println!("branch 2 - got 3 items: {it:?}");
}
}
}
println!("loop exited");
// Only [1, 2] is consumed by Branch 2 before the timeout happens
// in Branch 1.
let it = stream.next().await.unwrap();
assert_eq!(it, 3);
}
When you run this test using cargo test -- --nocapture test_unsafe_cancel_stream, you
can expect the following output in your terminal.
branch 2 => entering read_3_items_not_cancel_safe
yielding item: 1
branch 2 => read_3_items_not_cancel_safe got item: 1
branch 2 => vec so far contains: [1]
yielding item: 2
branch 2 => read_3_items_not_cancel_safe got item: 2
branch 2 => vec so far contains: [1, 2]
branch 1 - time is up - end
loop exited
yielding item: 3
So let’s break down what’s happening in this test.
Branch 1- This branch is a timeout branch. It waits for300msbefore breaking out of the loop. This is to simulate a timeout that might happen when reading from a file or network source. With this delay, we ensure thatBranch 2doesn’t get to read all the values from the async stream. And thus we induce a cancellation safety issue, due the wayread_3_items_not_cancel_safe()is implemented.Branch 2- This branch needs to reads3items from the async stream before resolving. This is done in a loop that reads3items inread_3_items_not_cancel_safe(). This is not safe because if the timeout branch wins the race, then the stream is dropped and theread_3_items_not_cancel_safe()future is dropped, along with the containedvec! This means that the stream is dropped before all the items are read from it. This is the cancellation safety issue that we are inducing in this test.
There are many ways to resolve this. The key is not to hold state inside of a Future
that you don’t want to lose if the Future is dropped. You can use mpsc channels or a
pinned Vec to get around this issue.
Note that in the case of a graceful shutdown, where you might not care about what data in some buffer is dropped, then this is not a problem.
Build with Naz video series on developerlife.com YouTube channel #
You can watch a video series on building this crate with Naz on the developerlife.com YouTube channel.
- YT channel
- Playlists
👀 Watch Rust 🦀 live coding videos on our YouTube Channel.
📦 Install our useful Rust command line apps usingcargo install r3bl-cmdr(they are from the r3bl-open-core project):
- 🐱
giti: run interactive git commands with confidence in your terminal- 🦜
edi: edit Markdown with style in your terminalgiti in action
edi in action