Arc Troopers: Adventures in Async Rust
The trouble with arguing about software design is that there’s precious little room between “too short for a useful example” and “too long; didn’t read”. Good examples are concise, but many important problems are too complex to reveal themselves in small snippets.
Let’s talk about something complex—Async Rust. We’ll need a concurrent Rust program, so let’s invent a dumb archive format that compresses a directory, then use its implementation as a little demo.
We’ll start by making a simple CLI with clap
:
#[derive(Parser)]
enum Command {
Write(WriteCommand),
Read(ReadCommand),
}
#[derive(Parser)]
struct WriteCommand {
from: Utf8PathBuf,
#[arg(short, long)]
output: Utf8PathBuf,
}
#[derive(Parser)]
struct ReadCommand {
from: Utf8PathBuf,
// Extract the archive in this directory
#[arg(short = 'C')]
to: Option<Utf8PathBuf>,
}
fn main() -> Result<()> {
match Command::parse() {
Command::Write(w) => write(w),
Command::Read(r) => read(r),
}
}
Now to implement those. A filesystem is just a tree, right?
#[derive(Debug, Serialize, Deserialize)]
enum Node<T> {
Directory(BTreeMap<Utf8PathBuf, Node<T>>),
File(T),
}
We want a generic way to walk trees so we don’t have to copy-paste the same code to print one, read one, etc.:
// Given a root node `n` with path `p`,
// call `f(path, node)` for each in the tree
fn walk_node<T, F>(p: &Utf8Path, n: &Node<T>, f: &mut F) -> Result<()>
where
F: FnMut(&Utf8Path, &T) -> Result<()>,
{
match n {
Node::Directory(d) => {
for (child_name, child) in d {
// Append the child's name to the directory path.
let mut new_p = p.to_owned();
new_p.push(child_name);
walk_node(&new_p, child, f)?;
}
Ok(())
}
Node::File(v) => f(p, v),
}
}
And we’ll want a generic way to build these trees. Directory entries aren’t guaranteed to be in any particular order, so we’ll sort them to recurse in a predictable way.
// Walk path `p`, producing a node from each file with `visit(path, metadata)`
fn walk_path<T, V>(p: &Utf8Path, visit: &mut V) -> Result<(Utf8PathBuf, Node<T>)>
where
V: FnMut(&Utf8Path, &Metadata) -> Result<T>,
{
let name: Utf8PathBuf = p.file_name().expect("no filename").to_owned().into();
let m = p.metadata()?;
if m.is_file() {
Ok((name, Node::File(visit(p, &m)?)))
} else if m.is_dir() {
let mut children = BTreeMap::new();
let d = p.read_dir_utf8()?;
let mut sorted_kids = vec![];
for e in d {
sorted_kids.push(e?.path().to_owned());
}
sorted_kids.sort();
for e in &sorted_kids {
let (kid_name, kid) = walk_path(e, visit)?;
children.insert(kid_name, kid);
}
Ok((name, Node::Directory(children)))
} else {
bail!("{p} is special; not a file or directory");
}
}
Now for some concurrency. One thread will walk the given path and read files, another thread will compress them with Ol’ Reliable, and a third will print out our progress. When we’re done, we’ll print the tree, serialize it into a nice binary format like CBOR, and tack that onto the end. We’ll finish off our archive with the length of that tree, so it’s trivial for readers to seek to it. None of this EOCD bullshit.
#[derive(Default)]
struct WriteStats {
files_read: AtomicU64,
bytes_uncompressed: AtomicU64,
bytes_compressed: AtomicU64,
}
fn write(cmd: WriteCommand) -> Result<()> {
let (tx, rx) = sync_channel(0);
let stats = WriteStats::default();
let done = AtomicBool::new(false);
thread::scope(|s| -> Result<()> {
// Walk the given path and send files' bytes to the writer.
let walker_thread = s.spawn(|| -> Result<(Utf8PathBuf, Node<u64>)> {
let tx = tx; // Move just tx (but not stats, done)
let mut send_and_collect_lengths =
|p: &Utf8Path, _m: &std::fs::Metadata| -> Result<u64> {
let contents = fs::read(p)?;
let len = contents.len() as u64;
tx.send(contents)?;
stats.files_read.fetch_add(1, Ordering::Relaxed);
stats.bytes_uncompressed.fetch_add(len, Ordering::Relaxed);
Ok(len)
};
walk_path(&cmd.from, &mut send_and_collect_lengths)
});
// Receive bytes from the walker, and compress them into our file.
let writer_thread = s.spawn(|| -> Result<zstd::Encoder<File>> {
let rx = rx; // Move just rx (but not stats, done)
let mut zw = zstd::Encoder::new(File::create(&cmd.output)?, 0)?;
while let Ok(buf) = rx.recv() {
zw.write_all(&buf)?;
stats
.bytes_compressed
.store(zw.get_ref().stream_position()?, Ordering::Relaxed);
}
Ok(zw)
});
// Print a silly little progress message.
let progress_thread = s.spawn(|| {
loop {
let nf = stats.files_read.load(Ordering::Relaxed);
let ub = stats.bytes_uncompressed.load(Ordering::Relaxed);
let cb = stats.bytes_compressed.load(Ordering::Relaxed);
// Pretty-print bytes to appropriate units (KB, MB, GB).
let ub = Byte::from_u64(ub)
.get_appropriate_unit(UnitType::Decimal);
let cb = Byte::from_u64(cb)
.get_appropriate_unit(UnitType::Decimal);
// ANSI nonsense
print!("\x1B[1K\r{nf} files, {ub:#.0} -> {cb:#.0}");
std::io::stdout().lock().flush().expect("stdout flush failed");
if done.load(Ordering::Acquire) {
break;
} else {
// Uninterruptible sleep()s are a capital offense.
thread::park_timeout(
std::time::Duration::from_millis(1000 / 20));
}
}
});
// Now that we've spawned the worker threads,
// wait for them to finish...
let tree = walker_thread.join().expect("tree walk panicked")?;
let zw = writer_thread.join().expect("archive write panicked")?;
// ...then wake the progress thread and tell it to quit.
done.store(true, Ordering::Release);
progress_thread.thread().unpark();
progress_thread.join().expect("progress print panicked");
// Print the completed tree below the last progress message.
println!();
walk_node(&tree.0, &tree.1, &mut |p, l| {
println!("{p} ({l})");
Ok(())
})?;
let mut serialized_tree = vec![];
ciborium::into_writer(&tree, &mut serialized_tree)?;
let mut fh = zw.finish()?;
fh.write_all(&serialized_tree)?;
fh.write_all(&(serialized_tree.len() as u64).to_le_bytes())?;
Ok(())
})
}
To read the archive, we just do the opposite. Since we put the files’ lengths in the tree, and they were stored in a predictable order, we know how much of the compressed stream to read for each one. (We’ll skip a progress thread for brevity.)
fn read(cmd: ReadCommand) -> Result<()> {
let (tx, rx) = sync_channel(0);
thread::scope(|s| -> Result<()> {
let reader = s.spawn(move || -> Result<()> {
let mut fh = BufReader::new(File::open(cmd.from)?);
// Get the size of the serialized tree so we know where to read it.
fh.seek(std::io::SeekFrom::End(-8))?;
let mut tree_length_bytes = vec![];
fh.read_to_end(&mut tree_length_bytes)?;
let tree_length =
u64::from_le_bytes(tree_length_bytes.try_into().unwrap());
// Go there and get our tree.
fh.seek(std::io::SeekFrom::End(-(tree_length as i64 + 8)))?;
let (root, tree): (Utf8PathBuf, Node<u64>) =
ciborium::from_reader(&mut fh)?;
// Start shoveling bytes and paths to the other thread,
// printing as we go:
fh.rewind()?;
let mut zr = zstd::Decoder::new(fh)?;
walk_node(&root, &tree, &mut |p, l| {
// Prepend the -C directory if given
let to = match &cmd.to {
Some(prefix) => {
let mut total = prefix.to_owned();
total.push(p);
total
}
None => p.to_owned(),
};
println!("{to} ({l})");
let mut buf = vec![0; *l as usize];
zr.read_exact(&mut buf)?;
tx.send((to, buf))?;
Ok(())
})
});
let dumper = s.spawn(move || -> Result<()> {
while let Ok((p, buf)) = rx.recv() {
if let Some(dir) = p.parent() {
fs::create_dir_all(dir)?;
}
fs::write(p, &buf)?;
}
Ok(())
});
dumper.join().expect("file write panicked")?;
reader.join().expect("archive extract panicked")?;
Ok(())
})
}
Let’s try it!
$ cargo run -- write -o out.test ~/src/rust/library/std
648 files, 6 MB -> 1 MB
std/Cargo.toml (5611 bytes)
std/benches/hash/map.rs (1639 bytes)
std/benches/hash/mod.rs (22 bytes)
std/benches/hash/set_ops.rs (1025 bytes)
std/benches/lib.rs (139 bytes)
std/benches/path.rs (3112 bytes)
std/benches/time.rs (1484 bytes)
...
$ cargo run -- read out.test -C /tmp
/tmp/std/Cargo.toml (5611)
/tmp/std/benches/hash/map.rs (1639)
/tmp/std/benches/hash/mod.rs (22)
/tmp/std/benches/hash/set_ops.rs (1025)
/tmp/std/benches/lib.rs (139)
/tmp/std/benches/path.rs (3112)
/tmp/std/benches/time.rs (1484)
And look, Ma, no differences!
$ diff -r -u ~/src/rust/library/std /tmp/std
There are a dozen details we’d have to fix up to turn this into useful software. But it’s an awesome little Rust showcase:
-
A rich ecosystem of crates like
anyhow
,camino
,clap
, andserde
make quick work of common tasks. -
Great tooling (Cargo) makes it trivial to fetch and build these dependencies.
-
thread::scope()
makes it simple for threads to safely reference local state because we know when those threads will exit. -
Channels make communication and synchronization simple.
There’s a lot to love here—we get code that’s as efficient as anything we’d write in C or C++, but more expressive and a hell of a lot harder to screw up. But what if we wanted more? Powers only… async can give us?
Universal Cancellation
Once we have multiple tasks running concurrently, how do we tell them to stop? Here we really only care if something fails—if we have trouble walking the file system, there’s no point in writing the rest of the archive. But general-purpose cancellation has plenty of graceful use cases, too! What if you’re building a language server, and want to opportunistically parse code in the background… until the user changes it?1 Or what if you want to try multiple solutions to a problem in parallel, and cancel the slower one when the faster finishes?
This is infamously hard to do with OS threads—POSIX has
pthread_cancel()
,
but look it up and you’ll mostly find warnings that it’s a terrible idea.
Conventional wisdom says to thread some sort of “cancellation token” through your work,
check it periodically, then exit “gracefully” whenever it’s set.
In the bad old days this might have just been an atomic int;
now C++ provides a
std::stop_token
:
void threadWorker(std::stop_token st)
{
while (!st.stop_requested()) {
doThreadedWork();
}
}
But this prompts more questions than it answers.
If doThreadedWork()
takes a while,
should we pass the token in and distribute it to the callees?
Should we make otherwise-infallible code bubble up some E_CANCELLED
error when we notice the token is set?
What if someone makes a blocking call?
And even if we can answer all of these questions for our own code,
what happens if we call a library that doesn’t play by the same rules?
One option for the narrow “I want to die as soon as there’s an error” case
is to just call exit()
and
let the OS do the cleanup,
but this also has its limits.
Memory will be freed, files and sockets will be closed,
but (at least in Unix) spawned processes will keep running.
If you want them dead, or have any other cleanup that absolutely must happen,
you’ll find yourself maintaining global state about what needs doing
on the way out.
Futures in Rust provide an attractive alternative—while not all of them actually cancel their work, many do. And in either case, dropping them provides a universal way to return control to the await…er instead of blocking.
Who to await?
A smaller but related problem is collecting asynchronous results in whatever order they arrive. Who should we join first?
thread::scope(|s| -> Result<()> {
let thing_one = s.spawn(...)
let thing_two = s.spawn(...)
let result_one = thing_one.join().expect("thing 1 panicked")?;
let result_two = thing_two.join().expect("thing 2 panicked")?;
// Do things with the results.
}
The choice matters. If thing_two
fails,
what keeps thing_one
from happily chugging along?2
Even if both succeed, what if result_two
arrives much faster,
and we could do useful work with it while we wait for result_one
?
We could solve this with a channel:
thread::scope(|s| -> Result<()> {
let (tx, rx) = sync_channel(0);
let tx2 = tx.clone();
let thing_one = s.spawn(move || {
// ...
tx.send(result_one).unwrap();
});
let thing_two = s.spawn(move || {
// ...
tx2.send(result_two).unwrap();
});
// Receive the results in whatever order they arrive
while let Ok(res) = rx.recv() {
// do something with res
}
}
But it’s clunky, doesn’t catch panics, and we’d find ourselves reinventing this wheel often.
Meanwhile in Async Rust, it’s common to wait on several futures at once with select!()
,
or to group them into
“join sets”
that yield their results—success or failure—in the order they arrive.
.into::<Async>()
I think these are the most compelling arguments for Async Rust over
the “normal” flavor we saw above.
Yes, the performance is nice too—userspace context-switching with non-blocking IO3
is fast—but having a universal concurrency and cancellation primitive seems
like a huge win in our modern, multicore world.
In Rust that’s Future
,
driven by some async runtime. Let’s use Tokio since it’s the most popular.
Before we start using async IO and other fancy stuff, we’ll change our OS threads to Tokio tasks. Our writer becomes:
async fn write(cmd: WriteCommand) -> Result<()> {
let (tx, rx) = sync_channel(0);
let stats = Arc::new(WriteStats::default());
let done = Arc::new(tokio::sync::Notify::new());
let kstats = stats.clone();
let walker: tokio::task::JoinHandle<Result<_>> = tokio::spawn(async move {
let tx = tx; // Move just tx (but not stats, done) in
let mut send_and_collect_lengths =
|p: &Utf8Path, _m: &std::fs::Metadata| -> Result<u64> {
...
kstats.files_read.fetch_add(...)
...
Ok(len)
};
...
});
let wstats = stats.clone();
let writer: tokio::task::JoinHandle<Result<_>> = tokio::spawn(async move {
let rx = rx; // Move just rx (but not stats, done) in
...
while let Ok(buf) = rx.recv() {
...
wstats.bytes_compressed.store(...);
}
...
});
let done2 = done.clone();
let progress = tokio::spawn(async move {
let mut leaving = false;
loop {
...(printing)...
if leaving {
break;
}
tokio::select!(
_ = done2.notified() => {
leaving = true; // Once more to print the final counts
},
_ = tokio::time::sleep(Duration::from_millis(1000 / 20)) => {},
)
}
});
let tree = walker.await.expect("tree walk panicked")?;
let mut fh = writer.await.expect("archive write panicked")?;
done.notify_one();
progress.await.expect("progress print panicked");
...
}
The reader function changes in similar ways, and those ways make me deeply unhappy.
thread::scope()
let us use local variables (stats
and done
) in multiple threads, safely,
without any extra ceremony.4
But a tokio::task::scope()
is impossible, since
async functions package their state into the returned future,
and we have no way to constrain how long the caller holds onto that future
before running (or dropping) it. In other words, anything our futures capture must have
a 'static
lifetime.5
Lots of ink has been spilled about the pains of making Rust futures Send
so they can be bounced between threads in a runtime’s pool—so much so
that Tokio added a spawn_local
,
and popular Tokio alternatives like Smol default to
single-threaded, blocking execution.
But Send
isn’t nearly as painful as 'static
.
One of Rust’s selling points is that the compiler helps you reason about how
long objects live, and now the answer for anything that touches async tasks is,
“I dunno, forever?”
Oh, it gets worse.
Async callback hell
Remember our nice little functions for walking and building a tree?
fn walk_node<T, F>(p: &Utf8Path, n: &Node<T>, f: &mut F) -> Result<()>
where
F: FnMut(&Utf8Path, &T) -> Result<()>,
{
...
}
fn walk_path<T, V>(p: &Utf8Path, visit: &mut V) -> Result<(Utf8PathBuf, Node<T>)>
where
V: FnMut(&Utf8Path, &Metadata) -> Result<T>,
{
...
}
We should make those async
, so that we don’t block our Tokio thread pool with nasty
synchronous IO, right? Welcome to a world of hurt.
Pause here if you want to work through the sea of compiler errors yourself.
But if you want answers:
-
Since an async function returns a future, and futures in Rust are compiler-generated state machines, recursive async functions become recursively-defined types. That’s no good, so we have to introduce some indirection. Some pinned indirection with
Box::pin(walk_node(...))
. -
Because everything that ends up in said state machine must be
'static
, passingUtf8Path
or other reference types as arguments will sometimes produce lifetime errors. Understanding when gets tricky in the face of recursion. -
Our callbacks change from “A mutable closure that returns some result”, i.e.,
FnMut() -> Result<T>
to “a mutable closure that returns some future that yields some result”. As of Rust 1.85, we can write that asAsyncFnMut() -> Result<()>
instead of using a crate likeasync_fn_traits
. But diagnostics are iffy—passasync |...|
instead ofasync move |...|
and in Rust 1.89 I get:
error: implementation of `Send` is not general enough
not on the callback, but on the enclosing async block.6 -
Capturing state gets fun too. Remember how we decompress the archive by walking our tree of file lengths?
let mut zr = zstd::Decoder::new(fh)?; walk_node(&root, &tree, &mut |p, l| { ... let mut buf = vec![0; *l as usize]; zr.read_exact(&mut buf)?; ... })
Just like the arguments, captured state must become
'static + Send
, so it gets anArc
. But if we want mutability, and a decompressor definitely has some mutable state, we then need some thread-safe way to get that.Welcome to
Arc<Mutex<T>>
, may I take your order?
Ecosystem splits
I don’t want to talk about the Tokio vs. Not Tokio debate—we have bigger problems. Because Async is a different color than Not Async, we need different-colored interfaces for each. We need red sleep and blue sleep. We need red read and blue read.
When we mix the colors, results vary wildly.
I can call std::thread::sleep()
anywhere, but calling it from an async function
blocks a thread in my runtime’s pool, which is almost certainly a bug.
Should I use a synchronous mutex in async code? Sometimes.
Usually?
Unless I want it to guard IO, which, by the way, the compiler totally won’t stop you
from doing.
This fills me with sadness because “the compiler not letting me do obvious bugs”
is one of my favorite parts of Rust.
Then there’s cases where they don’t compose at all.
I can’t take a std::io::Read
and turn it into a tokio::io::AsyncRead
.
That’s a real bummer when we’re talking about
the fundamental streaming IO primitive of the language.7
Our nice Zstd library is out the window because it
built abstractions around std::io::BufRead
and std::io::Write
,
so I have to find some other crate
that wraps the Tokio traits instead.
Repeat this tragedy for every useful library in the ecosystem,
multiplied by the number of runtimes they want to support—unless
you want to
rewrite everything to just pass buffers.
But having fundamental streaming IO primitives was trying to get away from that, wasn’t it?
I swear I’m not making this shit up.
Maybe someone out there doesn’t have these problems, but I haven’t met those lucky souls. Every substantial piece of Async Rust software I’ve seen gets crammed with arcs and mutexes that wouldn’t be there in an equivalent “synchronous” Rust program. And these aren’t just more letters to type: they’re fundamental changes to your code’s data model, with very real performance hits on modern hardware.
I’ve often joked that instead of picking up Djikstra’s cute acronym we should have called the basic synchronization object “the bottleneck”. Bottlenecks are useful at times, sometimes indispensible – but they’re never GOOD. At best they’re a necessary evil. Anything. ANYTHING that encourages anyone to overuse them, to hold them too long, is bad.
None of these problems are especially deep or nuanced—criticism I got last time I complained about them—they’re worse. They’re pervasive, and they have nothing to do with the real problems we’re trying to solve with software. Look at all these vapid little puzzles! This isn’t knowledge I want to have to know.
So, what the hell are you suggesting?
Use vanilla Rust where you can! It’s a fast, expressive language with a fantastic ecosystem that fits nicely into the systems software niche. Embedded, too—when you aren’t allocating and you control the entire machine, this pain falls away. Frameworks like Embassy, which use futures for cooperative multitasking on bare metal, are cool as hell!
But Tokio and friends are a bad time.
If you want a massively-concurrent runtime on a general-purpose OS,
you want one where all the functions are the same color.
And you want one where objects live as long as their tasks need them to.
You want a garbage collector.
And you can get one much faster, and much easier to use, than Arc
.
-
Alex Kladov has a fantastic article on this (and a clever alternative!) at https://matklad.github.io/2023/05/06/zig-language-server-and-cancellation.html ↩
-
In our example program, threads are connected by a channel, and if one end hangs up, the other finds out. Score another point for Team Hoare and CSP! But you can imagine many scenarios where data doesn’t flow cleanly through the program as an acyclic graph, so channels can’t always save us. ↩
-
See:
epoll()
,io_uring
, etc. ↩ -
To be fair, using bindings like
let tx = tx
to move some-but-not-all variables into a closure is also clunky. But it’s not especially surprising once you understand how move semantics and closures mix in Rust. And unlike introducingArc
, it has little-to-no impact on the performance or behavior of your program. ↩ -
Crates like
async-scoped
andtokio-scoped
exist and seem to enjoy some popularity. But look at how they work and you’ll find some gotcha—either there’s danger if their futures are dropped, or they sidestep the issue by blocking a thread. (Weren’t we trying to avoid that?) ↩ -
This could be a compiler bug or something else that improves in the near future—it’s not fundamental to Async Rust’s design. But damn if it wasn’t making me tear my hair out. ↩
-
Apparently Mr. Zig has been thinking a lot about this too. What if IO is like a burrito? ↩