👀 Watch Rust 🦀 live coding videos on our YouTube Channel.

What is Linux io_uring? #

When using async Rust and tokio, you don’t get async file IO at the OS level. Here are links from the official docs that discourage using tokio for file IO:

This is because tokio uses the mio crate, which uses epoll on Linux. These are not the most efficient ways to do async IO on Linux. The most efficient way to do async IO on Linux is to use the io_uring syscall. This is a new syscall that was added to the Linux kernel in version 5.1. It is a more efficient way to do async IO on Linux, and is used by the tokio-uring crate. Here are some great links to learn more about io_uring:

In this article, we will explore how to use tokio-uring to do async file IO at the OS level, and how to use it to build a simple echo TCP server, for use with netcat.

YouTube video for this article #

This blog post has short examples on how to use the tokio-uring crate. If you like to learn via video, please watch the companion video on the developerlife.com YouTube channel.


Examples of using tokio-uring in Rust #

Let’s create some examples to illustrate how to use tokio-uring. You can run cargo new --bin tokio-uring to create a new binary crate.

The code in the video and this tutorial are all in this GitHub repo.

Then add the following to the Cargo.toml file that’s generated. These pull in all the dependencies that we need for these examples.

[package]
name = "tokio-uring"
version = "0.1.0"
edition = "2021"

[[bin]]
name = "readfile"
path = "src/readfile.rs"

[[bin]]
name = "socketserver"
path = "src/socketserver.rs"

[dependencies]
tokio-uring = "0.4.0"
tokio = { version = "1.37.0", features = ["full", "tracing"] }
tokio-util = "0.7.11"
tracing = "0.1.40"
tracing-subscriber = "0.3.18"

ctrlc = "3.4.4"
miette = { version = "7.2.0", features = ["fancy"] }

crossterm = { version = "0.27.0", features = ["event-stream"] }

r3bl_terminal_async = { version = "0.5.3" }

Example 1: Read a file using tokio-uring and async, non-blocking IO #

Then you can add the following code to the src/readfile.rs file.

use crossterm::style::Stylize;
use miette::IntoDiagnostic;
use std::path::Path;

fn main() -> miette::Result<()> {
    tokio_uring::start(read_file("Cargo.toml"))?;
    Ok(())
}

async fn read_file(name: impl AsRef<Path>) -> miette::Result<()> {
    let file = tokio_uring::fs::File::open(name).await.into_diagnostic()?;

    let buf_move = vec![0; 4096];

    // Read some data, the buffer is passed by ownership and submitted
    // to the kernel. When the operation completes, we get the buffer
    // back.
    let (result, buf_from_kernel) = file.read_at(buf_move, 0).await;
    let bytes_read = result.into_diagnostic()?;

    println!(
        "{}",
        format!("Read {} bytes", bytes_read)
            .yellow()
            .underlined()
            .bold()
    );

    println!(
        "{}\n{}",
        "Data (bytes):".yellow().bold().underlined(),
        format!("{:?}", &buf_from_kernel[..bytes_read])
            .blue()
            .bold()
    );

    println!(
        "{}\n{}",
        "Data (string):".yellow().bold().underlined(),
        String::from_utf8_lossy(&buf_from_kernel[..bytes_read])
            .cyan()
            .bold()
    );

    Ok(())
}

The code for this example is here.

The main things to note about this code.

  • We use the tokio_uring::fs::File struct to open a file.
  • We use the read_at method to read from the file at a specific offset. The buffer is passed by ownership to the kernel, and when the operation completes, we get the buffer back. This is different than how it works with tokio and std.
  • We print out the bytes that were read from the file, and the string representation of those bytes.

When you run this code (using cargo run --bin readfile), it should produce the following output:

read file using tokio_uring: Cargo.toml
read 604 bytes from file
file contents: [package]
name = "tokio-uring"
version = "0.1.0"
edition = "2021"

[[bin]]
name = "readfile"
path = "src/readfile.rs"

[[bin]]
name = "socketserver"
path = "src/socketserver.rs"

[dependencies]
tokio-uring = "0.4.0"
tokio = { version = "1.37.0", features = ["full", "tracing"] }
tokio-util = "0.7.11"
tracing = "0.1.40"
tracing-subscriber = "0.3.18"

ctrlc = "3.4.4"

miette = { version = "7.2.0", features = ["fancy"] }

crossterm = { version = "0.27.0", features = ["event-stream"] }

r3bl_terminal_async = { version = "0.5.3" }
# r3bl_terminal_async = { path = "../../r3bl-open-core/terminal_async" }

Example 2: Building a TCP echo server using tokio-uring that also uses tokio #

For this example, let’s add the following code to the src/socketserver.rs file.

  • This will simply add the required imports to tokio_uring for TcpListener and TcpStream.
  • And we will also configure the tracing_subscriber to use the formatted subscriber, so that we get pretty printed log output to stdout and we have information about what thread generated that log event.
  • We use the tokio_uring::start function to spawn the runtime. This runtime isn’t the same as the one that we get from using #[tokio::main] and later in this example, we will see how we can handle both.
use crossterm::style::Stylize;
use miette::IntoDiagnostic;
use r3bl_terminal_async::port_availability;
use std::net::SocketAddr;
use tokio::task::AbortHandle;
use tokio_uring::{
    buf::IoBuf,
    net::{TcpListener, TcpStream},
};
use tokio_util::sync::CancellationToken;

/// Run `netcat localhost:8080` to test this server (once you run this main function).
fn main() -> miette::Result<()> {
    // Register tracing subscriber.
    tracing_subscriber::fmt()
        .without_time()
        .compact()
        .with_target(false)
        .with_line_number(false)
        .with_thread_ids(true)
        .with_thread_names(true)
        .init();

    let cancellation_token = CancellationToken::new();

    // TODO: Add ctrlc handler.

    // TODO: Add code to use the `tokio` runtime and run some futures on it.

    tokio_uring::start(start_server(cancellation_token))?;
}

You can get the source code for this example here.

Next, we will add the code to handle the server logic. The following code handles the incoming connections (using tokio_uring structs). This code is very similar to what we would write if we were using tokio directly.

  • The main difference is that we are checking for port availability before binding to the address, and we are using tokio_uring::spawn to spawn the futures, to handle incoming connections.
  • We will also use tokio::select! to create the main event loop. Since tokio_uring is in the same family as tokio, we can do that!
  • The port_availability module comes from r3bl_terminal_async crate, which is a dependency in the Cargo.toml file. It allows us to check whether a port is available or not, and find a free port in a given port range.
async fn start_server(cancellation_token: CancellationToken) -> miette::Result<()> {
    let tcp_listener = {
        let addr: SocketAddr = "0.0.0.0:8080".parse().into_diagnostic()?;
        // You can bind to the same address repeatedly, and it won't return
        // an error! Might have to check to see whether the port is open or
        // not before binding to it!
        match port_availability::check(addr).await? {
            port_availability::Status::Free => {
                tracing::info!("Port {} is available", addr.port());
            }
            port_availability::Status::Occupied => {
                tracing::info!(
                    "Port {} is NOT available, can't bind to it",
                    addr.port()
                );
                return Err(miette::miette!(
                    "Port {} is NOT available, can't bind to it",
                    addr.port()
                ));
            }
        }
        TcpListener::bind(addr).into_diagnostic()?
    };

    tracing::info!("{}", "server - started".to_string().red().bold());

    let mut abort_handles: Vec<AbortHandle> = vec![];

    loop {
        tokio::select! {
            _ = cancellation_token.cancelled() => {
                abort_handles.iter().for_each(|handle| handle.abort());
                break;
            }
            it = tcp_listener.accept() => {
                let (tcp_stream, _addr) = it.into_diagnostic()?;
                let join_handle = tokio_uring::spawn(
                    handle_connection(tcp_stream)
                );
                abort_handles.push(join_handle.abort_handle());
            }
        }
    }

    tracing::info!("{}", "server - stopped".to_string().red().bold());
    Ok(())
}

Add the following code to handle the echo logic. This code reads from the stream using tokio_uring and its function signature is quite different from what we would write if we were using tokio directly. It is similar to what happens with read_at in the previous example, and it moves ownership to read. Which returns a tuple:

  1. Result containing the number of bytes read.
  2. Buffer that was passed from the kernel.

The write_all function also returns a tuple that is similar.

async fn handle_connection(stream: TcpStream) -> miette::Result<()> {
    tracing::info!("handle_connection - start");

    let mut total_bytes_read = 0;
    let mut buf = vec![0u8; 10];

    loop {
        // Read from the stream.
        // Read some data, the buffer is passed by ownership and submitted
        // to the kernel. When the operation completes, we get the buffer
        // back.
        let (result_num_bytes_read, return_buf) = stream.read(buf).await;
        buf = return_buf;
        let num_bytes_read = result_num_bytes_read.into_diagnostic()?;

        // Check for EOF.
        if num_bytes_read == 0 {
            break;
        }

        // Write to the stream.
        let (result_num_bytes_written, slice) =
            stream.write_all(buf.slice(..num_bytes_read)).await;
        result_num_bytes_written.into_diagnostic()?; // Make sure no errors.

        // Update the buffer.
        buf = slice.into_inner();
        total_bytes_read += num_bytes_read;

        tracing::info!(
            "{}: {}",
            "handle_connection - num_bytes_read".to_string().red(),
            num_bytes_read
        );
    }

    tracing::info!(
        "handle_connection - end, total_bytes_read: {}",
        total_bytes_read
    );
    Ok(())
}

To test this, you can run the server using cargo run --bin socketserver. Then you can connect to the server using netcat (or nc) by running netcat localhost 8080. You can type some text and hit enter, and you should see the text echoed back to you.

This is what the output from netcat might look like:

netcat localhost 8080
echo echo echo
echo echo echo

This is what the output from the server might look like:

 cargo run --bin socketserver
 INFO main ThreadId(01) Port is available
 INFO main ThreadId(01) server - started - 0.0.0.0:8080
 INFO main ThreadId(01) handle_connection - start
 INFO main ThreadId(01) handle_connection - num_bytes_read: 10
 INFO main ThreadId(01) handle_connection - num_bytes_read: 5
 INFO main ThreadId(01) handle_connection - end, total bytes read : 15 bytes

There are two more bonus rounds that we can add to this example:

  1. Add a ctrlc handler to gracefully shutdown the server, when the user types Ctrl+C.
  2. Add code to use the tokio runtime and run some futures on it.

In the socketserver.rs file, you can add the following code to replace the comment //TODO: Add ctrlc handler.. The following code will add a ctrlc handler to gracefully shutdown the server, by cancelling the cancellation_token.

let cancellation_token_clone = cancellation_token.clone();
ctrlc::set_handler(move || {
    tracing::info!("Received Ctrl+C!");
    cancellation_token_clone.cancel();
})
.into_diagnostic()?;

And finally, the following code will replace the comment // TODO: Add code to use the `tokio` runtime and run some futures on it.. This code will spawn a new OS thread (using std) and then create a new multi-threaded tokio runtime on that thread. We will then run some futures on that runtime by passing an async block to the block_on function of the runtime.

// Can't use #[tokio::main] for `main()`, so we have to use the
// `tokio::runtime::Builder` API. However, we have to launch this in a separate
// thread, because we don't want it to collide with the `tokio_uring::start()`
// call.
let cancellation_token_clone = cancellation_token.clone();
std::thread::spawn(move || {
    // If you use `Builder::new_current_thread()`, the runtime will
    // use the single / current thread scheduler.
    // `Builder::new_multi_thread()` will use a thread pool.
    tokio::runtime::Builder::new_multi_thread()
        .enable_all()
        .worker_threads(4)
        .build()
        .into_diagnostic()
        .unwrap()
        .block_on(async_main(cancellation_token_clone))
});

Here’s the async_main function that we are calling in the code above. This function simply runs some futures on the tokio runtime that we created in the code above. You can see from the log output that the tasks are run in parallel (sometimes on the same thread and sometimes on different threads), and are scheduled in a non-deterministic order.

async fn async_main(cancellation_token: CancellationToken) {
    tracing::info!("{}", "async_main - start".to_string().magenta().bold());

    let mut interval =
        tokio::time::interval(std::time::Duration::from_millis(2_500));

    loop {
        tokio::select! {
            _ = interval.tick() => {
                tracing::info!(
                    "{}",
                    "async_main - tick".to_string().magenta().bold()
                    );

                // Notice in the output, that these tasks are NOT spawned
                // in the same order repeatedly. They are run in parallel
                // on different threads. And these are scheduled in a
                // non-deterministic order.
                let task_1 = tokio::spawn(async {
                    tokio::time::sleep(
                        std::time::Duration::from_millis(10)
                    ).await;
                    tracing::info!("async_main - tick {} - spawn", "#1"
                        .to_string().on_green().black().bold()
                    );
                });
                let task_2 = tokio::spawn(async {
                    tokio::time::sleep(
                        std::time::Duration::from_millis(10)
                    ).await;
                    tracing::info!("async_main - tick {} - spawn", "#2"
                        .to_string().on_red().black().bold()
                    );
                });
                let task_3 = tokio::spawn(async {
                    tokio::time::sleep(
                        std::time::Duration::from_millis(10)
                    ).await;
                    tracing::info!("async_main - tick {} - spawn", "#3"
                        .to_string().on_blue().black().bold()
                    );
                });
                let _ = tokio::join!(task_1, task_2, task_3);
            }
            _ = cancellation_token.cancelled() => {
                tracing::info!("async_main - cancelled");
                break;
            }
        }
    }

    tracing::info!("{}", "async_main - end".to_string().magenta().bold());
}

Here’s what the output from the server might look like, after adding the ctrlc handler and the tokio runtime code and running it for about 10 seconds.

 cargo run --bin socketserver
 INFO main ThreadId(01) Port is available
 INFO ThreadId(03) async_main - start
 INFO main ThreadId(01) server - started - 0.0.0.0:8080
 INFO ThreadId(03) async_main - tick
 INFO tokio-runtime-worker ThreadId(04) async_main - tick #3 - spawn
 INFO tokio-runtime-worker ThreadId(06) async_main - tick #2 - spawn
 INFO tokio-runtime-worker ThreadId(05) async_main - tick #1 - spawn
 INFO ThreadId(03) async_main - tick
 INFO tokio-runtime-worker ThreadId(06) async_main - tick #2 - spawn
 INFO tokio-runtime-worker ThreadId(05) async_main - tick #1 - spawn
 INFO tokio-runtime-worker ThreadId(04) async_main - tick #3 - spawn
 INFO ThreadId(03) async_main - tick
 INFO tokio-runtime-worker ThreadId(06) async_main - tick #3 - spawn
 INFO tokio-runtime-worker ThreadId(05) async_main - tick #1 - spawn
 INFO tokio-runtime-worker ThreadId(04) async_main - tick #2 - spawn
 INFO ThreadId(03) async_main - tick

Parting thoughts #

There are areas of improvement in this codebase, such as port binding issues, and connection management issues.

  1. If you run more than one instance of the process cargo run --bin startserver then the log output is pretty strange. The 2nd process that’s started seems to trigger the handle_connection function of the first process.
  2. When you run the server and connect a client to it using netcat, and then kill the server process, using Ctrl+C, the client doesn’t drop the connection.

If you can figure out how to fix these issues, please raise a PR on the GitHub repo. I’d love to see how you solve these problems!

Build with Naz video series on developerlife.com YouTube channel #

You can watch a video series on building this crate with Naz on the developerlife.com YouTube channel.

📦 Install our useful Rust command line apps using cargo install r3bl-cmdr (they are from the r3bl-open-core project):
  • 🐱giti: run interactive git commands with confidence in your terminal
  • 🦜edi: edit Markdown with style in your terminal

giti in action

edi in action

Related Posts