Build with Naz : Rust async, non-blocking, concurrent, parallel, event loops, graceful shutdown
- Introduction
- What is async Rust? Sequential vs concurrent code & parallelism as a resource
- What async Rust is not
- YouTube video for this article
- Effective async Rust patterns by example
- Example 1: Build a timer future using Waker
- Example 2: Build an async runtime to run futures to completion
- Example 3: Running async code, concurrently, on a single thread
- Example 4: join!, select, spawn control flow constructors
- Example 5: async streams
- Example 6: Non-blocking event loops, channel safety, and graceful shutdown
- Parting thoughts
- Build with Naz video series on developerlife.com YouTube channel
Introduction #
In this article, video, and repo learn effective async Rust using real world patterns that show up consistently when creating non blocking, async, event loops, using channels. Delve into implementing the Future trait and async executor manually. Also explore graceful shutdown, when not to use async, and how to think about testing async code.
What is async Rust? Sequential vs concurrent code & parallelism as a resource #
In Rust, you can write sequential code, and concurrent code:
- Sequential code can be run sequentially, or in parallel (using
thread::spawn()). - Concurrent code can be run on a single thread or multiple threads.
Concurrency is a way to structure code into separate tasks. This does not define the resources on a machine that will be used to run or execute tasks.
Parallelism is a way to specify what resources (CPU cores, or threads) will be used on a machine’s operating system to run tasks.
These 2 concepts are not the same. They are related but not the same.
What async Rust is not #
Generally speaking, using async Rust is not just a matter of attaching async as a prefix
to a function, when you define it, and postfix .await when you call it. In fact, if you
don’t have at least one .await in your async function body, then it might not need to
be async. This article and video are a deep
dive into what async code is, what Rust Futures are, along with what async Runtimes are.
Along with some common patterns and anti-patterns when thinking in async Rust.
YouTube video for this article #
This blog post only has short examples on how to use Rust async effectively. To see how these ideas can be used in production code, with real-world examples, please watch the following video on the developerlife.com YouTube channel.
Effective async Rust patterns by example #
Let’s create some examples to illustrate how to use async Rust effectively. You can run
cargo new --lib effective-async-rust to create a new library crate.
The code in the video and this tutorial are all in this GitHub repo: https://github.com/nazmulidris/rust-scratch/blob/main/async_con_par/
Then add the following to the Cargo.toml file that’s generated. These pull in all the
dependencies that we need for these examples.
[package]
name = "effective-async-rust"
version = "0.1.0"
edition = "2021"
[dependencies]
crossterm = { version = "0.27.0", features = ["event-stream"] }
tokio = { version = "1.37.0", features = ["full", "tracing"] }
tracing = "0.1.40"
tracing-subscriber = "0.3.18"
futures = "0.3.30"
async-stream = "0.3.5"
Example 1: Build a timer future using Waker #
Then you can add the following code to the src/lib.rs file.
#[cfg(test)]
pub mod build_a_timer_future_using_waker;
We will implement the Future trait manually, in this example. Typically any async code
block is converted into a finite state machine which implements the Future trait.
Progress on the future only occurs when it is polled by the runtime or executor (eg:
Tokio).
- When a future is polled and it is
Readythen the future is complete. - If it is
Pendingthen the future is not complete. And when it is ready (at some point in the future, due to some event like network IO available viaepollorio_uring), the runtime expects the future to wake up the, by callingwake()on theWakerthat is passed to this future by the runtime, via theContextobject.
Here are more details on this:
The code for this example is here.
Create a new file src/build_a_timer_future_using_waker.rs. In this file, we are going
to:
- Build a timer that wakes up a task after a certain amount of time, to explore how
Wakerworks. - We’ll just spin up a new thread when the timer is created, sleep for the required time, and then signal the timer future when the time window has elapsed.
Add the following code to the file, to define a new struct that will implement the
Future trait. This struct will have a SharedState struct that will contain the state
of the future, and an optional Waker that will be used to wake up the future when the
timer has elapsed. This Waker is not available until the very first time the future is
polled by the runtime.
#[derive(Default)]
pub struct TimerFuture {
pub shared_state: Arc<Mutex<SharedState>>,
}
#[derive(Default)]
pub struct SharedState {
pub completed: bool,
pub waker: Option<Waker>,
}
Add the following code to implement the Future trait for the TimerFuture struct.
- This code will be used to poll the future, by the runtime, and check if the timer has elapsed.
- If it has, then the future is complete, and the runtime can move on to the next task. If
the timer has not elapsed, then the future is not complete, and the runtime won’t do
anything further with this future. And will go on to the next task (top level
Future) that it can make progress on.
Something has to wake up this future to let the runtime know that the timer has elapsed,
and that it needs to call poll() again on this Future. This is where the Waker comes
in.
- The first time
poll()is called on this future, the runtime passes in aWakerand we save that to theSharedStatestruct. - This will be used by the timer thread to wake up the future, when the timer has elapsed (which we will do next).
impl Future for TimerFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut shared_state = self.shared_state.lock().unwrap();
match shared_state.completed {
true => {
eprintln!("{}", "TimerFuture is completed".to_string().green());
Poll::Ready(())
}
false => {
eprintln!("{}", "TimerFuture is not completed".to_string().red());
// Importantly, we have to update the Waker every time the
// future is polled because the future may have moved to
// a different task with a different Waker. This will happen
// when futures are passed around between tasks after being
// polled.
shared_state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
}
Add the following code to create a new timer Future, and start a new thread that will
sleep for the required time, and then wake up the Future when the timer has elapsed, by
using the optional Waker that was saved in the SharedState struct (when poll() is
called on the Future, by the runtime).
impl TimerFuture {
pub fn new(duration: Duration) -> Self {
let new_instance = TimerFuture::default();
let shared_state_clone = new_instance.shared_state.clone();
thread::spawn(move || {
thread::sleep(duration);
let mut shared_state = shared_state_clone.lock().unwrap();
shared_state.completed = true;
shared_state.waker.take().unwrap().wake();
});
new_instance
}
}
Add the following test to run this code. The #[tokio::test] attribute macro generates
code to start a single threaded executor to run the test code.
#[tokio::test]
async fn run_timer_future_with_tokio() {
let timer_future = TimerFuture::new(Duration::from_millis(10));
let shared_state = timer_future.shared_state.clone();
assert!(!shared_state.lock().unwrap().completed);
timer_future.await;
assert!(shared_state.lock().unwrap().completed);
}
When you run this test, it should produce the following output:
running 1 test TimerFuture is not completed TimerFuture is completed test build_a_timer_future_using_waker::run_timer_future_with_tokio ... ok
Example 2: Build an async runtime to run futures to completion #
For this example, let’s add the following code to the src/lib.rs file.
#[cfg(test)]
pub mod build_an_executor_to_run_future;
In the example above, we use tokio to run the TimerFuture to completion. But in this
example, we will implement our own simple async runtime.
- This is a very simple runtime that will run futures to completion, by polling them until they are ready.
- It should highlight how the
WakerandContextare supplied by the runtime to theFuture.
You can get the source code for this example here.
We will need a few things to implement this runtime:
Taskstruct that will contain theFuturethat needs to be run to completion.Taskqueue that will contain all the tasks that need to be run. This will be astd::sync::mpsc::sync_channel.Wakerthat will be used to wake up the runtime when a task is ready to be polled.Contextthat will be used to pass theWakerto theFuturethat is being polled.Spawnerstruct that will be used to spawn new tasks into the runtime.Executorstruct that will be used to run the runtime.
Add the following code to the src/build_an_executor_to_run_future.rs file.
pub fn new_executor_and_spawner() -> (Executor, Spawner) {
const MAX_TASKS: usize = 10_000;
let (task_sender, task_receiver) = std::sync::mpsc::sync_channel(MAX_TASKS);
(Executor { task_receiver }, Spawner { task_sender })
}
pub struct Executor {
pub task_receiver: Receiver<Arc<Task>>,
}
pub struct Spawner {
pub task_sender: SyncSender<Arc<Task>>,
}
pub struct Task {
pub future: Mutex<Option<BoxFuture<'static, ()>>>,
pub task_sender: SyncSender<Arc<Task>>,
}
Add the following code to the Spawner struct to spawn new tasks into the runtime.
impl Spawner {
pub fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
let pinned_boxed_future = future.boxed();
let task = Arc::new(Task {
future: Mutex::new(Some(pinned_boxed_future)),
task_sender: self.task_sender.clone(),
});
eprintln!(
"{}",
"sending task to executor, adding to channel"
.to_string()
.blue()
);
self.task_sender
.send(task)
.expect("too many tasks in channel");
}
}
Add the following code to the Executor struct to run the runtime. This code will poll
the task queue, and block until it can get a task to run. Once it has a task, which it has
removed from the task channel or queue, it polls it (with the Context and Waker) to
check whether it is ready.
- If it is ready, then it is done.
- If it is not ready, then it does not do anything further with it. When the task is ready
to be polled (eg: when the duration has passed in the
TimerFuture’s thread), it will use theWakerto wake up the task when it is ready to be polled). TheArcWakeimplementation for theTaskstruct is used for this; all it does is send the task back to the task channel, so that it can be polled again by the executor 🎉. - Here’s what a real world implementation of
ArcWakemight look like using something like Linuxepollorio_uring: https://rust-lang.github.io/async-book/02_execution/05_io.html.
impl ArcWake for Task {
/// Implement `wake` by sending this task back onto the task
/// channel so that it will be polled again by the executor,
/// since it is now ready.
fn wake_by_ref(arc_self: &Arc<Self>) {
let cloned = arc_self.clone();
arc_self
.task_sender
.send(cloned)
.expect("too many tasks in channel");
eprintln!(
"{}",
"task woken up, added back to channel"
.to_string()
.underlined()
.green()
.bold()
);
}
}
impl Executor {
#[allow(clippy::while_let_loop)]
pub fn run(&self) {
// Remove task from receiver, or block if nothing available.
loop {
eprintln!("{}", "executor loop".to_string().red());
// Remove the task from the receiver.
// If it is pending, then the ArcWaker
// will add it back to the channel.
match self.task_receiver.recv() {
Ok(arc_task) => {
eprintln!(
"{}",
"running task - start, got task from receiver"
.to_string()
.red()
);
let mut future_in_task = arc_task.future.lock().unwrap();
match future_in_task.take() {
Some(mut future) => {
let waker = waker_ref(&arc_task);
let context = &mut Context::from_waker(&waker);
let poll_result = future.as_mut().poll(context);
eprintln!(
"{}",
format!(
"poll_result: {:?}", poll_result)
.to_string().red()
);
if poll_result.is_pending() {
// We're not done processing the future, so put it
// back in its task to be run again in the future.
*future_in_task = Some(future);
eprintln!("{}",
"putting task back in slot"
.to_string().red()
);
} else {
eprintln!("{}", "task is done".to_string().red());
}
}
None => {
panic!("this never runs");
}
}
eprintln!("{}", "running task - end".to_string().red());
}
Err(_) => {
eprintln!("no more tasks to run, breaking out of loop");
break;
}
}
}
}
}
And finally, add this test to run this code. Notice this code does not use tokio to run
the TimerFuture to completion. Instead, it uses the Executor and Spawner structs
that we implemented above.
#[test]
fn run_executor_and_spawner() {
use super::build_a_timer_future_using_waker::TimerFuture;
let results = Arc::new(std::sync::Mutex::new(Vec::new()));
let (executor, spawner) = new_executor_and_spawner();
let results_clone = results.clone();
spawner.spawn(async move {
results_clone.lock().unwrap().push("hello, start timer!");
TimerFuture::new(std::time::Duration::from_millis(10)).await;
results_clone.lock().unwrap().push("bye, timer finished!");
});
drop(spawner);
executor.run();
assert_eq!(
*results.lock().unwrap(),
vec!["hello, start timer!", "bye, timer finished!"]
);
}
This should produce the following output, which maps to the flow that we described above:
running 1 test sending task to executor, adding to channel executor loop running task - start, got task from receiver TimerFuture is not completed poll_result: Pending putting task back in slot running task - end executor loop task woken up, added back to channel running task - start, got task from receiver TimerFuture is completed poll_result: Ready(()) task is done running task - end executor loop no more tasks to run, breaking out of loop test build_an_executor_to_run_future::run_executor_and_spawner ... ok
Example 3: Running async code, concurrently, on a single thread #
For this example, let’s add the following code to the src/lib.rs file.
#[cfg(test)]
pub mod local_set;
If you have async code, you can use a LocalSet to run the async code, in different
tasks, on a single thread. This ensures that any data that you have to pass between
these tasks can be !Send. Instead of wrapping the shared data in a Arc or
Arc<Mutex>, you can just wrap it in an Rc.
In this example, we will explore how to run async code concurrently, on a single thread. This is an important concept to understand, as it is the basis for how async code can be run concurrently, using non-blocking event loops.
The code for this example is here.
Add the following code to the src/local_set.rs file.
- It shows how you can create a
Futurethat uses aRcto share data concurrently, running on a single thread. - This is why the data is
!Send, and we don’t need to use anArcorArc<Mutex>to share it between tasks. - Once the
LocalSetis created, andlocal_spawn()is called, the task doesn’t actually run untillocal_set.run_until(..)is called, orlocal_set.awaitis called.
#[tokio::test]
async fn run_local_set_and_spawn_local() {
// Can't send this data across threads (not wrapped in `Arc` or `Arc<Mutex>`).
let non_send_data = Rc::new("!SEND DATA");
let local_set = LocalSet::new();
// Spawn a local task (bound to same thread) that uses the non-send data.
let non_send_data_clone = non_send_data.clone();
let async_block_1 = async move {
println!(
// https://doc.rust-lang.org/std/fmt/index.html#fillalignment
"{:<7} {}",
"start",
non_send_data_clone.as_ref().yellow().bold(),
);
};
// Does not run anything.
let join_handle_1 = local_set.spawn_local(async_block_1);
// This is required to run `async_block_1`.
let _it = local_set.run_until(join_handle_1).await;
Add the following code to the src/local_set.rs file. This is just a different variant
(from the first example) of creating a new async block, and running it using the
LocalSet.
// Create a 2nd async block.
let non_send_data_clone = non_send_data.clone();
let async_block_2 = async move {
sleep(std::time::Duration::from_millis(100)).await;
println!(
// https://doc.rust-lang.org/std/fmt/index.html#fillalignment
"{:<7} {}",
"middle",
non_send_data_clone.as_ref().green().bold()
);
};
// This is required to run `async_block_2`.
let _it = local_set.run_until(async_block_2).await;
Finally add the following code to the src/local_set.rs file. This yet another way of how
you can create a new async block, and run it using the LocalSet. This one uses local_set.await
which runs all the futures that are associated with the local_set.
// Spawn another local task (bound to same thread) that uses
// the non-send data.
let non_send_data_clone = non_send_data.clone();
let async_block_3 = async move {
sleep(std::time::Duration::from_millis(100)).await;
println!(
// https://doc.rust-lang.org/std/fmt/index.html#fillalignment
"{:<7} {}",
"end",
non_send_data_clone.as_ref().cyan().bold()
);
};
// Does not run anything.
let _join_handle_3 = local_set.spawn_local(async_block_3);
// `async_block_3` won't run until this is called.
local_set.await;
}
Here’s the output when you run this test:
running 1 test start !SEND DATA middle !SEND DATA end !SEND DATA test local_set::run_local_set_and_spawn_local ... ok
Example 4: join!, select, spawn control flow constructors #
For this example, let’s add the following code to the src/lib.rs file.
#[cfg(test)]
pub mod demo_join_select_spawn;
You can use join!, select!, and spawn to control the flow of async code. These are
macros that are provided by the tokio crate. They are used to run multiple futures
concurrent, in parallel, and wait for them to complete.
The code for this example is here.
Add the following code to the src/demo_join_select_spawn.rs file. This code shows how
you can use join! to run multiple futures concurrently, and wait for them to complete.
pub async fn task_1(time: u64) {
sleep(Duration::from_millis(time)).await;
println!("task_1");
}
pub async fn task_2(time: u64) {
sleep(Duration::from_millis(time)).await;
println!("task_2");
}
pub async fn task_3(time: u64) {
sleep(Duration::from_millis(time)).await;
println!("task_3");
}
#[tokio::test]
async fn test_join() {
tokio::join!(task_1(100), task_2(200), task_3(300));
println!("all tasks done");
}
Here’s the output when you run this test:
running 1 test task_1 task_2 task_3 all tasks done test demo_join_select_spawn::test_join ... ok
Add the following code to the src/demo_join_select_spawn.rs file. This code shows how
you can use select! to run multiple futures concurrently, and wait for the first one to
complete.
#[tokio::test]
async fn test_select() {
tokio::select! {
_ = task_1(100) => println!("task_1 done"),
_ = task_2(200) => println!("task_2 done"),
_ = task_3(300) => println!("task_3 done"),
}
println!("one task done");
}
Here’s the output when you run this test:
running 1 test task_1 done one task done test demo_join_select_spawn::test_select ... ok
Add the following code to the src/demo_join_select_spawn.rs file. This code shows how
you can use spawn to run multiple futures in parallel, and wait for them to complete. We
pass the following to the #[tokio::test] attribute macro: flavor = "multi_thread",
worker_threads = 5 which tells it to run the test on multiple threads (max of 5).
#[tokio::test(flavor = "multi_thread", worker_threads = 5)]
async fn test_spawn() {
let handle_1 = tokio::spawn(task_1(100));
let handle_2 = tokio::spawn(task_2(100));
let handle_3 = tokio::spawn(task_3(100));
handle_1.await.unwrap();
handle_2.await.unwrap();
handle_3.await.unwrap();
println!("all tasks done");
}
When you run this test, it should produce the following output (the ordering of the tasks which run first, second, and third, will vary):
running 1 test task_3 task_1 task_2 all tasks done test demo_join_select_spawn::test_spawn ... ok
Example 5: async streams #
For this example, let’s add the following code to the src/lib.rs file.
#[cfg(test)]
pub mod async_stream;
You can use async streams to create a stream of values that are produced asynchronously.
This is useful for testing, for example in the r3bl_terminal_async crate in
readline.rs in test_streams
module.
The code for this example is here.
Add the following code to the src/async_stream.rs file.
- This code shows how you can use
async_streamcrate’sstream!macro to create a stream of values that are generated from a vector of strings. - This stream is then converted into a
PinnedInputStreamwhich is aPin<Box<dyn Stream<Item = Result<String, String>>>.
pub type PinnedInputStream = Pin<Box<dyn Stream<Item = Result<String, String>>>>;
pub fn gen_input_stream() -> PinnedInputStream {
let it = async_stream::stream! {
for event in get_input_vec() {
yield Ok(event);
}
};
Box::pin(it)
}
pub fn get_input_vec() -> Vec<String> {
vec![
"a".to_string(),
"b".to_string(),
"c".to_string(),
"d".to_string(),
]
}
#[tokio::test]
async fn test_stream() {
let mut count = 0;
let mut it = gen_input_stream();
while let Some(event) = it.next().await {
let lhs = event.unwrap();
let rhs = get_input_vec()[count].clone();
assert_eq!(lhs, rhs);
count += 1;
}
}
Example 6: Non-blocking event loops, channel safety, and graceful shutdown #
Let’s add the following code to the src/lib.rs file.
#[cfg(test)]
pub mod non_blocking_async_event_loops;
You can use non-blocking event loops to create a loop that runs async code, and waits for events to occur. This is useful for creating servers, clients, and other networked applications. You can even use the same pattern to create CLI and TUI applications that are non-blocking, and can handle multiple events concurrently, such as when you’re creating an interactive async REPL.
The source code for this example is here.
Add the following code to the src/non_blocking_async_event_loops.rs file.
#[tokio::test(flavor = "multi_thread", worker_threads = 5)]
async fn test_main_loop() {
// Register tracing subscriber.
tracing_subscriber::fmt()
.without_time()
.compact()
.with_target(false)
.with_line_number(false)
.with_thread_ids(true)
.with_thread_names(true)
.init();
// Create channels for events and shutdown signals.
let event_channel = tokio::sync::mpsc::channel::<String>(1_000);
let (event_sender, mut event_receiver) = event_channel;
let shutdown_channel = tokio::sync::broadcast::channel::<()>(1_000);
let (shutdown_sender, _) = shutdown_channel;
// Spawn the main event loop.
let mut shutdown_receiver = shutdown_sender.subscribe();
let safe_count: std::sync::Arc<std::sync::Mutex<usize>> = Default::default();
let safe_count_clone = safe_count.clone();
let join_handle = tokio::spawn(async move {
loop {
tokio::select! {
event = event_receiver.recv() => {
tracing::info!(?event, "task got event: event");
let mut count = safe_count_clone.lock().unwrap();
*count += 1;
}
_ = shutdown_receiver.recv() => {
tracing::info!("task got shutdown signal");
break;
}
}
}
});
// Send events, in parallel.
let mut handles = vec![];
for i in 0..10 {
let event_sender_clone = event_sender.clone();
let join_handle = tokio::spawn(async move {
tracing::info!(i, "sending event");
let event = format!("event {}", i);
let _ = event_sender_clone.send(event).await;
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
});
handles.push(join_handle);
}
// Wait for all events to be sent using tokio.
futures::future::join_all(handles).await;
// Shutdown the event loops.
shutdown_sender.send(()).unwrap();
// Wait for the event loop to shutdown.
join_handle.await.unwrap();
// Assertions.
assert_eq!(shutdown_sender.receiver_count(), 1);
assert_eq!(*safe_count.lock().unwrap(), 10);
}
Here are key points to note about this code:
- We use
tokio::sync::mpsc::channelto create a channel for events, andtokio::sync::broadcast::channelto create a channel for shutdown signals. - We spawn the main event loop, which listens for events and shutdown signals, and updates a shared counter.
- We spawn multiple tasks that send events to the event channel, in parallel.
- The
#[tokio::test(flavor = "multi_thread", worker_threads = 5)]attribute macro tellstokioto run the test on multiple threads (max of 5). - You can see this in the output when you run the test. By configuring Tokio
tracingsubscriber, we can see the thread IDs and names in the output (.with_thread_ids(true),.with_thread_names(true)). - We wait for all events to be sent using
futures::future::join_all(handles).await.
- The
- We shutdown the event loop (using
shutdown_sender.send(())), and wait for it to shutdown usingjoin_handle.await..
When you run this test, it will produce the following output:
running 1 test INFO tokio-runtime-worker ThreadId(05) sending event i=2 INFO tokio-runtime-worker ThreadId(04) sending event i=6 INFO tokio-runtime-worker ThreadId(06) sending event i=0 INFO tokio-runtime-worker ThreadId(07) sending event i=4 INFO tokio-runtime-worker ThreadId(03) sending event i=7 INFO tokio-runtime-worker ThreadId(04) sending event i=8 INFO tokio-runtime-worker ThreadId(06) sending event i=1 INFO tokio-runtime-worker ThreadId(05) task got event: event event=Some("event 2") INFO tokio-runtime-worker ThreadId(07) sending event i=5 INFO tokio-runtime-worker ThreadId(03) sending event i=9 INFO tokio-runtime-worker ThreadId(05) task got event: event event=Some("event 6") INFO tokio-runtime-worker ThreadId(04) sending event i=3 INFO tokio-runtime-worker ThreadId(05) task got event: event event=Some("event 0") INFO tokio-runtime-worker ThreadId(05) task got event: event event=Some("event 4") INFO tokio-runtime-worker ThreadId(05) task got event: event event=Some("event 7") INFO tokio-runtime-worker ThreadId(05) task got event: event event=Some("event 8") INFO tokio-runtime-worker ThreadId(05) task got event: event event=Some("event 1") INFO tokio-runtime-worker ThreadId(05) task got event: event event=Some("event 5") INFO tokio-runtime-worker ThreadId(05) task got event: event event=Some("event 9") INFO tokio-runtime-worker ThreadId(05) task got event: event event=Some("event 3") INFO tokio-runtime-worker ThreadId(05) task got shutdown signal test non_blocking_async_event_loops::test_main_loop ... ok
Interesting code links:
- Testing async code: https://github.com/r3bl-org/r3bl-open-core/blob/main/terminal_async/src/readline_impl/readline.rs#L612
- Using dependency injection and dealing with
dyn T(trait objects): https://github.com/r3bl-org/r3bl-open-core/blob/main/terminal_async/src/readline_impl/readline.rs#L344. - Event
loops and breaking out of them (lifecycle control mechanisms): https://github.com/nazmulidris/rust-scratch/blob/main/tcp-api-server/src/server_task.rs#L43 and https://github.com/nazmulidris/rust-scratch/blob/main/tcp-api-server/src/client_task.rs#L108.
Parting thoughts #
- Try not to use cancellation token: https://docs.rs/tokio-util/latest/tokio_util/sync/struct.CancellationToken.html, instead do this: https://github.com/nazmulidris/rust-scratch/pull/32 and https://github.com/nazmulidris/rust-scratch/commit/e129b0f681dd1eea1bcdd3372cd08a05081922ff
- Do not use async or Tokio for underlying sync OS file copy: https://users.rust-lang.org/t/tokio-copy-slower-than-std-io-copy/111242.
- Using the right
Mutexin conjunction withArcand holding them across await points from tokio docs. - Good videos:
Build with Naz video series on developerlife.com YouTube channel #
You can watch a video series on building this crate with Naz on the developerlife.com YouTube channel.
- YT channel
- Playlists
👀 Watch Rust 🦀 live coding videos on our YouTube Channel.
📦 Install our useful Rust command line apps usingcargo install r3bl-cmdr(they are from the r3bl-open-core project):
- 🐱
giti: run interactive git commands with confidence in your terminal- 🦜
edi: edit Markdown with style in your terminalgiti in action
edi in action