Designing an Async Runtime for WASI 0.2
— 2024-02-29
- the wasi 0.2 polling model
- designing the poller abstraction
- designing the reactor abstraction
- designing the block_on abstraction
- using the async runtime
- on the absence of a local executor
- conclusion
In 2019 Stjepan Glavina and I developed the async-std
runtime. That was an off-shoot from the runtime
project, which itself was an attempt to make it easier
to abstract over different async runtimes. One of the things I'm most proud of
in the work I did on async-std
is the core IO abstraction which Stjepan later
factored out into the polling
and
async-io
libraries as part of
the smol
project, where he took them beyond my initial
prototype code into robust building blocks which can work on their own.
Anyway, that little stroll down memory lane serves a purpose: I just finished building Yet Another Async runtime. Not to tell people to actually go and use this - but intended to be more like a working, minimal, but also correct implementation of an async runtime for WASI 0.2. The purpose of this post is to detail how I built it, so you can build your own (if you want to). I'm one of the first to write this code, and maybe even the first to write a dedicated runtime, which means that if Smol, Monoio, Glommio, or Tokio want to add support for WASI 0.2 they're going to have to implement what I already implemented. So I figured I might save folks some trouble, and document the work I just finished doing.
The WASI 0.2 polling model
WASI 0.2 is readiness-based rather than completion-based. That means we wait for
the host system to tell us we're ready to take an action, rather than wait for
the host system that an action has successfully completed (completion-based).
WASI 0.3 will likely switch to a completion-based system because Linux
io_uring
and Windows'
ioringapi
are
completion-based and perform really well, but we don't have that yet.
The core polling system in WASI consists of two components: the Pollable
type and the
poll
function. The
Pollable
type represents "interest" in an event. Say we had some kind of
read
call; there would be an associated Pollable
for that call, which we
could submit to the host to say: "please let me know when there's a reason for
me to call read
". This is the "readiness" part of the system - you submit your
interest in an operation to the host system, and then the host system will
periodically yield a list of which operations are good to be called. The way to
schedule that interest is via the poll
function.
A key difference between the epoll
and WASI 0.2's poll
model is that in WASI
we don't schedule interest in resources (e.g. file descriptors), but we
schedule interest in concrete operations. Under epoll
we'd tell the kernel
something like: "hey, here's an fd representing some socket - I'm interested
anytime I can read or write new data to it". In WASI we're more precise; we make
a specific method call, which returns a type. That type will have some way to
get the underlying data, but also a method subscribe
which returns a pollable.
We're then supposed to wait for the poll
call to tell us that the Pollable
is ready - and then we can call the method to get the underlying data without it
returning an error. That's probably a lot of words, so here's a basic example:
use wasi::http::outgoing_handler::{handle, OutgoingRequest};
use wasi::http::types::{Fields, Method, Scheme};
use wasi::io::poll;
fn main() {
// Construct an HTTP request
let fields = Fields::new();
let req = OutgoingRequest::new(fields);
req.set_method(&Method::Get).unwrap();
req.set_scheme(Some(&Scheme::Https)).unwrap();
req.set_path_with_query(Some("/")).unwrap();
req.set_authority(Some("example.com")).unwrap();
// Send the request and wait for it to complete
let res = handle(req, None).unwrap(); // 1. We're ready to send the request over the network
let pollable = res.subscribe(); // 2. We obtain the `Pollable` from the response future
poll::poll(&[&pollable]); // 3. Block until we're ready to look at the response
// We're now ready to try and access the response headers. If
// the request was unsuccessful we might still get an error here,
// but we won't get an error that we tried to read data before the
// operation was completed.
let res = res.get().unwrap().unwrap().unwrap();
for (name, _) in res.headers().entries() {
println!("header: {name}");
}
}
When run via
cargo-component (and the
-- -S http
flag is passed), this should print the following:
header: age
header: cache-control
header: content-type
header: date
header: etag
header: expires
header: last-modified
header: server
header: vary
header: x-cache
header: content-length
Designing the poller abstraction
In the HTTP request example we've shown how to make a single HTTP request and
registered that one operation with the poll
call. One of the main benefits of
async computing is the ability to perform ad-hoc
concurrency:
we don't just want to be able to wait for the one operation to complete; we want
to be able to wait on any number of operations to complete at the same time.
This is why the poll
function takes a list of Pollable
types, and returns a
list of which ones are ready to continue doing work.
For that we need to create some type which can hold the pollables for us, be
somewhat efficient about it, and allow us to associate the list of "ready"
events with some notion of identity. This is important because we're building a
runtime for async Rust, which requires that we associate "readiness events" with
"calls to specific
wakers". We can begin by
defining a struct containing a slab. This
is an efficient key-value data structure which on entry gives you a key to
access the value. It is unordered and reuses allocations - which is great
because we'll be continuously registering and deregistering interest in
different Pollable
s.
#[derive(Debug)]
pub(crate) struct Poller {
pub(crate) targets: Slab<Pollable>,
}
Whenever we insert a type into Slab
it returns a key of type usize
. To make
working with it easier, and make bridging into the WASI world easier, we're
going to define our own EventKey
type to wrap the keys for us, like so:
/// A key representing an entry into the poller
#[repr(transparent)]
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
pub(crate) struct EventKey(pub(crate) u32);
Now we're ready to implement the base CRUD methods on the Poller
: new
,
insert
, remove
, and get
. Aside from the translation to EventKey
, there's
nothing interesting going on here.
impl Poller {
/// Create a new instance of `Poller`
pub(crate) fn new() -> Self {
Self {
targets: Slab::new()
}
}
/// Insert a new `Pollable` target into `Poller`
pub(crate) fn insert(&mut self, target: Pollable) -> EventKey {
let key = self.targets.insert(target);
EventKey(key as u32)
}
/// Get a `Pollable` if it exists.
pub(crate) fn get(&self, key: &EventKey) -> Option<&Pollable> {
self.targets.get(key.0 as usize)
}
/// Remove an instance of `Pollable` from `Poller`.
///
/// Returns `None` if no entry was found for `key`.
pub(crate) fn remove(&mut self, key: EventKey) -> Option<Pollable> {
self.targets.try_remove(key.0 as usize)
}
}
And finally we're ready to implement the most interesting part: implementing the
method to wait for events in the Poller
, and map them to their respective
EventKey
s. When we submit a list of Pollable
s to the poll
call, we get a
list of indexes back which refer to the indexes of the pollables we submitted.
These are not the same values as the EventKey
s we have, so we have to
construct a lookup table to map the indexes of the pollables back to the event
keys.
impl Poller {
/// Block the current thread until a new event has triggered.
///
/// This will clear the value of `ready_list`.
pub(crate) fn block_until(&mut self) -> Vec<EventKey> {
// We're about to wait for a number of pollables. When they wake we get
// the *indexes* back for the pollables whose events were available - so
// we need to be able to associate the index with the right waker.
// We start by iterating over the pollables, and keeping note of which
// pollable belongs to which waker index
let mut indexes = Vec::with_capacity(self.targets.len());
let mut targets = Vec::with_capacity(self.targets.len());
for (index, target) in self.targets.iter() {
indexes.push(index);
targets.push(target);
}
// Now that we have that association, we're ready to poll our targets.
// This will block until an event has completed.
let ready_indexes = poll(&targets);
// Once we have the indexes for which pollables are available, we need
// to convert it back to the right keys for the wakers. Earlier we
// established a positional index -> waker key relationship, so we can
// go right ahead and perform a lookup there.
ready_indexes
.into_iter()
.map(|index| EventKey(indexes[index as usize] as u32))
.collect()
}
}
And with that, our Pollable
abstraction is complete!
Designing the reactor abstraction
Now that we can track lists of pollables and a way to submit them - it's time to
bring wakers into the mix and create a type which can be used by individual
futures to register interest in events. The WASI async runtime is fundamentally
single-threaded, so what we'll want to use here are just the Rc
and RefCell
types. We can now construct our data types; again starting with the core structures:
use super::polling::{EventKey, Poller};
use std::collections::HashMap;
use std::task::Poll;
use std::task::Waker;
use std::{cell::RefCell, rc::Rc};
use wasi::io::poll::Pollable;
/// Manage async system resources for WASI 0.1
#[derive(Debug, Clone)]
pub struct Reactor {
inner: Rc<RefCell<InnerReactor>>,
}
/// The private, internal `Reactor` implementation - factored out so we can take
/// a lock of the whole.
#[derive(Debug)]
struct InnerReactor {
poller: Poller,
wakers: HashMap<EventKey, Waker>,
}
impl Reactor {
/// Create a new instance of `Reactor`
pub(crate) fn new() -> Self {
Self {
inner: Rc::new(RefCell::new(InnerReactor {
poller: Poller::new(),
wakers: HashMap::new(),
})),
}
}
}
The reason we can get away with Rc<RefCell<Inner>>
is because we control all
the accesses in the methods. We guarantee they are always short-lived and will
never overlap. Which in turn provides us with a convenient way to to be able to
call methods on &self
rather than &mut self
. Speaking of which, time to
implement the first method - which is a way to register interest in a pollable
.
Registering interest in a pollable
This provides an async method wait_for
which allows our reactor to wait
asynchronously until the call related to the pollable
is ready to be
performed.
impl Reactor {
/// Wait for the pollable to resolve.
pub async fn wait_for(&self, pollable: Pollable) {
let mut pollable = Some(pollable);
let mut key = None;
// This function is the core loop of our function; it will be called
// multiple times as the future is resolving.
future::poll_fn(|cx| {
// Start by taking a lock on the reactor. This is single-threaded
// and short-lived, so it will never be contended.
let mut reactor = self.inner.borrow_mut();
// Schedule interest in the `pollable` on the first iteration. On
// every iteration, register the waker with the reactor.
let key = key.get_or_insert_with(|| reactor.poller.insert(pollable.take().unwrap()));
reactor.wakers.insert(*key, cx.waker().clone());
// Check whether we're ready or need to keep waiting. If we're
// ready, we clean up after ourselves.
if reactor.poller.get(key).unwrap().ready() {
reactor.poller.remove(*key);
reactor.wakers.remove(key);
Poll::Ready(())
} else {
Poll::Pending
}
})
.await
}
}
What's nice about this is that unlike other runtimes, where interest is
registered on the object, here we merely register interest on an operation.
That means that "wait for this operation to be ready" is a matter of calling
reactor.wait_for(pollable).await
. That is significantly different than the
juggling of syscalls required in other polling models; and it means we can
encode all of the invariants necessary for the reactor directly as part of the
reactor.
In recent conversations in the Rust WG Async we've been discussing whether it makes sense
to have some kind of reactor hook as part of the Context
in future - or
whether perhaps other mechanisms would be better. From this example I believe we
can conclude that while it might be possible to share a reactor through the
future's context - at a high level that will always be mapped the same way to
the underlying function calls. Which at least to me puts into question whether
that is the most natural mapping.
Blocking until events are ready
With that out of the way, we're ready to implement the wrapper around our
Poller::block_until
call. As we implemented, it will wait for all registered
pollers
, and give us back a list of EventKey
s for the ones which are ready
to be woken. All we have to do is iterate over the list of keys, and call each related waker:
/// Block until new events are ready. Calls the respective wakers once done.
pub(crate) fn block_until(&self) {
let mut reactor = self.inner.borrow_mut();
for key in reactor.poller.block_until() {
match reactor.wakers.get(&key) {
Some(waker) => waker.wake_by_ref(),
None => panic!("tried to wake the waker for non-existent `{key:?}`"),
}
}
}
At first glance it might seem silly that this goes through the motions of calling the wakers. WASI 0.2 is currently single-threaded, so by default it will always just wake the one waker. However, it is common and encouraged to use wakers to distinguish between events. Concurrency primitives may construct their own wakers to keep track of identity and wake more precisely. We do not control the wakers constructed by libraries, and it is for this reason that we have to call all the wakers - even if by default they will do nothing.
Designing the block_on abstraction
Now that we have our reactor done, we need a way to drive it. For that we're
going to define a function block_on
which takes a closure returning a future
and gives it access to the Reactor
. As long as the future returned by the
closure is live, we'll keep making progress on it - waiting on the reactor each
time the future returns Pending
.
We'll be driving futures by hand here, so the first step is to define our own
instance of Waker
. We're not supporting any form of spawn
function, and
instead rely on users to leverage libraries for structured concurrency, so out
of the box we can just make it a noop.
/// Construct a new no-op waker
// NOTE: we can remove this once <https://github.com/rust-lang/rust/issues/98286> lands
fn noop_waker() -> Waker {
const VTABLE: RawWakerVTable = RawWakerVTable::new(
// Cloning just returns a new no-op raw waker
|_| RAW,
// `wake` does nothing
|_| {},
// `wake_by_ref` does nothing
|_| {},
// Dropping does nothing as we don't allocate anything
|_| {},
);
const RAW: RawWaker = RawWaker::new(ptr::null(), &VTABLE);
// SAFETY: all fields are no-ops, so this is safe
unsafe { Waker::from_raw(RAW) }
}
Next up is the actual block_on
implementation. The core logic is basically a
loop { match {} }
statement which just calls reactor.block_until
on each
iteration until the future completes.
/// Start the event loop
pub fn block_on<F, Fut>(f: F) -> Fut::Output
where
F: FnOnce(Reactor) -> Fut,
Fut: Future,
{
// Construct the reactor
let reactor = Reactor::new();
// Create the future and pin it so it can be polled
let fut = (f)(reactor.clone());
let mut fut = pin!(fut);
// Create a new context to be passed to the future.
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
// Either the future completes and we return, or some IO is happening
// and we wait.
loop {
match fut.as_mut().poll(&mut cx) {
Poll::Ready(res) => return res,
Poll::Pending => reactor.block_until(),
}
}
}
And with that, our async runtime is complete!
Using the async runtime
This section was added post-publication on 2024-03-07
Now that we have our runtime, we can use it to wrap the calls from the wasi
library and make them async. For example, say we wanted to have a non-blocking
sleep
function which waits for some Duration
. We could write that by using
the
subscribe_duration
function and connecting it to the reactor. All we need to do is wrap it, call
it, and then wait on the pollable. That should be simple enough to do with what
we've written:
use std::time::Duration;
use wasi::time::monotonic_clock::subscribe_duration;
pub async fn sleep(duration: Duration, reactor: &Reactor) {
let duration = duration.as_nanos() as u64; // 1. Convert the duration to nanosecond resolution
let pollable = subscribe_duration(duration); // 2. Obtain the pollable
reactor.wait_for(pollable).await; // 3. Wait for the pollable to be ready
}
It should be simple enough to take APIs from the wasi
crate, and pair them
with a call to reactor.wait_for(..).await
. That's the base pattern we's use to
asyncify basically the entire wasi
API surface.
On the absence of a local executor
WASI 0.2 is single-threaded, and fundamentally doesn't need access to a
task.spawn
abstraction. In sync Rust we use threads to combine parallelism +
concurrency; but in async Rust we can separate the two. The
futures-concurrency library provides access to any mode of concurrency you
might want; meaning that in the absence of parallelism there is no reason for an
executor to exist.
Instead APIs such as
Vec::join
and
StreamGroup
even provide access to the unbounded concurrency primitives people typically use
"local executors" for - but without any of the scoped lifetime issues plaguing
those abstractions. Here is an example of how to concurrently make two separate
HTTP requests without having to rely on "local tasks":
fn main() {
block_on(|reactor| async {
let client = Client::new(reactor); // <- using an unpublished wrapper around `wasi::http`
let a = async {
let url = "https://example.com".parse().unwrap();
let req = Request::new(Method::Get, url);
let res = client.send(req).await;
let body = read_to_end(res).await;
let body = String::from_utf8(body).unwrap();
println!("{body}");
};
let b = async {
let url = "https://example.com".parse().unwrap();
let req = Request::new(Method::Get, url);
let res = client.send(req).await;
let body = read_to_end(res).await;
let body = String::from_utf8(body).unwrap();
println!("{body}");
};
(a, b).join().await; // concurrently await both `a` and `b`.
})
}
Conclusion
In this post I've explained WASI's polling model and showed step-by-step how to
use it to build your own async runtime. I hope this will be useful for
maintainers of async Rust runtimes, as well as hobbyists wanting to go through
the motions of writing their own. If you prefer to just use the code I've shared
here today, you can do so by installing the wasi-async-runtime
crate.