Write a simple TCP chat server in Rust
- Build a chat server using Tokio
- The chat server comprises all these pieces
- Add dependencies to Cargo.toml
- Main function
- Handle client task function
- Build with Naz video series on developerlife.com YouTube channel
Build a chat server using Tokio #
In this tutorial we will build a simple chat server using Tokio. The server will be able to handle multiple clients, and each client will be able to send messages to the server, which will then broadcast the message to all other connected clients.
- We will use Tokioβs
tokio::net::TcpListener
andtokio::net::TcpStream
to create a TCP server that listens for incoming connections and handles them concurrently. - We will also use Tokioβs
tokio::sync::broadcast
to broadcast messages to all connected clients.
Read this tutorial to learn more about the basics of TCP client and server programming in Rust (without using Tokio).
Hereβs a video of the app that we are going to build in action.
You can find the finished source code for this tutorial here.
The chat server comprises all these pieces #
ββCLIENT-1ββββββββ ββCLIENTβ2ββββββββ ββCLIENTβ3βββββββ
β β β β β β
βββββββββΌβββββββββ ββββββββΌββββββββββ βββββββΌββββββββββ
β β β
ββSERVERββββΌβββββββββββββββββββββββΌβββββββββββββββββββββββββΌβββββββββββββ
β β β β β
β β
β handle_client_task() handle_client_task() handle_client_task() β
β βββββββββββββββββββββ ββββββββββββββββββββββ βββββββββββββββββββββββ β
β β ββββββ ββββββ β β ββββββ ββββββ β β ββββββ ββββββ β β
β β β TX β β RX β β β β TX β β RX β β β β TX β β RX β β β
β β βββ¬βββ βββ²βββ β β βββ¬βββ βββ²βββ β β βββ¬βββ βββ²βββ β β
β β β β β β β β β β β β β β
β βββββββΌβββββββΌβββββββ βββββββΌβββββββΌββββββββ ββββββββΌβββββββΌββββββββ β
β β β β β β β β
β β β β β β β β
β βββββββΌβββββββ΄βββββββββββββββΌβββββββ΄ββββββββββββββββββΌβββββββ΄ββββββββ β
β β (TX, RX) = channel::broadcast() β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
The server has a main
function that creates a tokio::net::TcpListener
and listens for
incoming connections. When a new connection is received, it spawns a new task to handle
the connection using tokio::spawn()
.
Using tokio::select!
, the task tries to do the following concurrently, and waits until
one of them completes:
- The task reads messages from its client and broadcasts them to all other connected clients. It also echoes the message back to its client.
- The task listens for messages from other clients and sends them to its client.
When one task above completes, the other is dropped. Then the code path with the completed task executes. Then the code returns to the infinite loop, if it hasnβt returned already.
A client can be any TCP client, such as telnet
, nc
, or PuTTY.
Add dependencies to Cargo.toml #
Letβs create a new project by running cargo create --bin tcp-server-netcat-client
. Then
we will add the following dependencies to our Cargo.toml
file.
# tokio.
tokio = { version = "1.35.1", features = ["full"] }
# stdout logging.
femme = { version = "2.2.1" }
log = { version = "0.4.20" }
# r3bl_rs_utils_core - friendly name generator.
r3bl_rs_utils_core = { version = "0.9.12" }
r3bl_tui = { version = "0.5.1" }
Main function #
We will implement the following algorithm for our server in our main function:
- Create a broadcast channel. It will be shared by all the client tasks.
- Create
TcpListener
and bind to an address & port. - Loop:
- Accept socket connection, and get its
TCPStream
. - Use
tokio::spawn()
to spawn a task to handle this client connection and itsTCPStream
.
- Accept socket connection, and get its
In the task that handles the connection:
- Get
BufReader
&BufWriter
from theTCPStream
. The reader and writer allow us to read data from and write data to the client socket. - Loop:
- Use
tokio::select!
to concurrently:- Read from broadcast channel (via
recv()
):- Send the message to the client (only if it is from a different client) over the
socket (use
BufWriter
to write the message).
- Send the message to the client (only if it is from a different client) over the
socket (use
- Read from socket (via
BufReader::read_line()
):- Read
incoming
from reader. - Call
process(incoming)
and generateoutgoing
. This colorizes theincoming
message with a lolcat effect to generate theoutgoing
message. - Send
incoming
message to other connected clients (via the broadcast channel).
- Read
- Read from broadcast channel (via
- Use
You can find the finished source code for this tutorial here.
Hereβs the code for the main function, and some supporting type aliases and structs:
pub type IOResult<T> = std::io::Result<T>;
#[derive(Debug, Clone)]
pub struct MsgType {
pub socket_addr: SocketAddr,
pub payload: String,
pub from_id: String,
}
#[tokio::main]
pub async fn main() -> IOResult<()> {
let addr = "127.0.0.1:3000";
// Start logging.
femme::start();
// Create TCP listener.
let tcp_listener = TcpListener::bind(addr).await?;
log::info!("Server is ready to accept connections on {}", addr);
// Create channel shared among all clients that connect to the server loop.
let (tx, _) = broadcast::channel::<MsgType>(10);
// Server loop.
loop {
// Accept incoming socket connections.
let (tcp_stream, socket_addr) = tcp_listener.accept().await?;
let tx = tx.clone();
tokio::spawn(async move {
let result = handle_client_task(tcp_stream, tx, socket_addr).await;
match result {
Ok(_) => {
log::info!("handle_client_task() terminated gracefully")
}
Err(error) => log::error!("handle_client_task() encountered error: {}", error),
}
});
}
}
To run the server, you can run cargo run
. There are no command line arguments to pass or
parse.
tokio::spawn does not spawn a new thread, so what does it actually do? #
Since tokio::spawn
sounds similar to thread::spawn
it might be easy to assume that
tokio::spawn
creates a new thread. This would go against the idea of even using tokio
(which is all about concurrency and non blocking IO), since handling one connection per
thread isnβt scalable, which is what we did in
this tutorial: Write a simple TCP chat server in Rust.
tokio::spawn
does not create a thread; it creates a Tokio task, which is a
co-operatively scheduled entity that Tokio knows how to schedule on the Tokio runtime (in
turn, the Tokio runtime can have as many worker threads as you want - from 1 upwards).
By using tokio::spawn
, you allow the Tokio runtime to switch to another task at points
in the task where it has a .await
, and only those points. Your alternative, if you donβt
want multiple tasks, is to use things like select!
, join!
and functions with select
or ` join` in their name to have concurrent I/O in a single task.
The point of spawning in Tokio is twofold:
- If your runtime has multiple threads, then two tasks can execute in parallel on different threads, reducing latency.
- It is almost always easier to understand a complex program in terms of different tasks
doing their work, than in terms of a single large task doing lots of work concurrently
(e.g. using
select
to wait for one of many options, orjoin
to wait for all options to finish).
More information:
- You can get more info on this topic here.
- For an even deeper dive into how Tokio tasks themselves are implemented for intra-task concurrency, please take a look at this excellent article.
Handle client task function #
The handle_client_task
function is where all the magic happens.
- It reads messages from its client (over TCP socket) and broadcasts them to all other connected clients.
- It processes the message from its client and echoes it back to its client (over TCP socket).
- It reads messages from other clients (over broadcast channel) and sends them to its client (over socket).
Hereβs the code for the handle_client_task()
function:
async fn handle_client_task(
mut tcp_stream: TcpStream,
tx: Sender<MsgType>,
socket_addr: SocketAddr,
) -> IOResult<()> {
log::info!("Handle socket connection from client");
let id = friendly_random_id::generate_friendly_random_id();
let mut rx = tx.subscribe();
// Set up buf reader and writer.
let (reader, writer) = tcp_stream.split();
let mut reader = BufReader::new(reader);
let mut writer = BufWriter::new(writer);
// Send welcome message to client w/ ids.
let welcome_msg_for_client =
ColorWheel::lolcat_into_string(&format!("addr: {}, id: {}\n", socket_addr, id));
writer.write(welcome_msg_for_client.as_bytes()).await?;
writer.flush().await?;
let mut incoming = String::new();
loop {
let tx = tx.clone();
tokio::select! {
// Read from broadcast channel.
result = rx.recv() => {
read_from_broadcast_channel(result, socket_addr, &mut writer, &id).await?;
}
// Read from socket.
network_read_result = reader.read_line(&mut incoming) => {
let num_bytes_read: usize = network_read_result?;
// EOF check.
if num_bytes_read == 0 {
break;
}
handle_socket_read(num_bytes_read, &id, &incoming, &mut writer, tx, socket_addr).await?;
incoming.clear();
}
}
}
Ok(())
}
Two concurrent tasks in the tokio::select! block #
- Read from broadcast channel. The function
read_from_broadcast_channel()
does this work. - Read from socket. The function
handle_socket_read()
does this work.
Whichever task completes first, the tokio::select!
block will go down that code path,
and drop the other task.
Handle read from broadcast channel function #
Hereβs the code for the read_from_broadcast_channel()
function:
async fn read_from_broadcast_channel(
result: Result<MsgType, RecvError>,
socket_addr: SocketAddr,
writer: &mut BufWriter<WriteHalf<'_>>,
id: &str,
) -> IOResult<()> {
match result {
Ok(it) => {
let msg: MsgType = it;
log::info!("[{}]: channel: {:?}", id, msg);
if msg.socket_addr != socket_addr {
writer.write(msg.payload.as_bytes()).await?;
writer.flush().await?;
}
}
Err(error) => {
log::error!("{:?}", error);
}
}
Ok(())
}
Handle socket read function #
Hereβs the code for the handle_socket_read()
function:
async fn handle_socket_read(
num_bytes_read: usize,
id: &str,
incoming: &str,
writer: &mut BufWriter<WriteHalf<'_>>,
tx: Sender<MsgType>,
socket_addr: SocketAddr,
) -> IOResult<()> {
log::info!(
"[{}]: incoming: {}, size: {}",
id,
incoming.trim(),
num_bytes_read
);
// Process incoming -> outgoing.
let outgoing = process(&incoming);
// outgoing -> Writer.
writer.write(outgoing.as_bytes()).await?;
writer.flush().await?;
// Broadcast outgoing to the channel.
let _ = tx.send(MsgType {
socket_addr,
payload: incoming.to_string(),
from_id: id.to_string(),
});
log::info!(
"[{}]: outgoing: {}, size: {}",
id,
outgoing.trim(),
num_bytes_read
);
Ok(())
}
fn process(incoming: &str) -> String {
// Remove new line from incoming.
let incoming_trimmed = format!("{}", incoming.trim());
// Colorize it.
let outgoing = ColorWheel::lolcat_into_string(&incoming_trimmed);
// Add new line back to outgoing.
format!("{}\n", outgoing)
}
Build with Naz video series on developerlife.com YouTube channel #
If you have comments and feedback on this content, or would like to request new content (articles & videos) on developerlife.com, please join our discord server.
You can watch a video series on building this crate with Naz on the developerlife.com YouTube channel.
- YT channel
- Playlists
π Watch Rust π¦ live coding videos on our YouTube Channel.
π¦ Install our useful Rust command line apps usingcargo install r3bl-cmdr
(they are from the r3bl-open-core project):
- π±
giti
: run interactive git commands with confidence in your terminal- π¦
edi
: edit Markdown with style in your terminalgiti in action
edi in action