Designing an Async Runtime for WASI 0.2
— 2024-02-29

  1. the wasi 0.2 polling model
  2. designing the poller abstraction
  3. designing the reactor abstraction
    1. registering interest in a pollable
    2. blocking until events are ready
  4. designing the block_on abstraction
  5. using the async runtime
  6. on the absence of a local executor
  7. conclusion

In 2019 Stjepan Glavina and I developed the async-std runtime. That was an off-shoot from the runtime project, which itself was an attempt to make it easier to abstract over different async runtimes. One of the things I'm most proud of in the work I did on async-std is the core IO abstraction which Stjepan later factored out into the polling and async-io libraries as part of the smol project, where he took them beyond my initial prototype code into robust building blocks which can work on their own.

Anyway, that little stroll down memory lane serves a purpose: I just finished building Yet Another Async runtime. Not to tell people to actually go and use this - but intended to be more like a working, minimal, but also correct implementation of an async runtime for WASI 0.2. The purpose of this post is to detail how I built it, so you can build your own (if you want to). I'm one of the first to write this code, and maybe even the first to write a dedicated runtime, which means that if Smol, Monoio, Glommio, or Tokio want to add support for WASI 0.2 they're going to have to implement what I already implemented. So I figured I might save folks some trouble, and document the work I just finished doing.

The WASI 0.2 polling model

WASI 0.2 is readiness-based rather than completion-based. That means we wait for the host system to tell us we're ready to take an action, rather than wait for the host system that an action has successfully completed (completion-based). WASI 0.3 will likely switch to a completion-based system because Linux io_uring and Windows' ioringapi are completion-based and perform really well, but we don't have that yet.

The core polling system in WASI consists of two components: the Pollable type and the poll function. The Pollable type represents "interest" in an event. Say we had some kind of read call; there would be an associated Pollable for that call, which we could submit to the host to say: "please let me know when there's a reason for me to call read". This is the "readiness" part of the system - you submit your interest in an operation to the host system, and then the host system will periodically yield a list of which operations are good to be called. The way to schedule that interest is via the poll function.

A key difference between the epoll and WASI 0.2's poll model is that in WASI we don't schedule interest in resources (e.g. file descriptors), but we schedule interest in concrete operations. Under epoll we'd tell the kernel something like: "hey, here's an fd representing some socket - I'm interested anytime I can read or write new data to it". In WASI we're more precise; we make a specific method call, which returns a type. That type will have some way to get the underlying data, but also a method subscribe which returns a pollable. We're then supposed to wait for the poll call to tell us that the Pollable is ready - and then we can call the method to get the underlying data without it returning an error. That's probably a lot of words, so here's a basic example:

use wasi::http::outgoing_handler::{handle, OutgoingRequest};
use wasi::http::types::{Fields, Method, Scheme};
use wasi::io::poll;

fn main() {
    // Construct an HTTP request
    let fields = Fields::new();
    let req = OutgoingRequest::new(fields);
    req.set_method(&Method::Get).unwrap();
    req.set_scheme(Some(&Scheme::Https)).unwrap();
    req.set_path_with_query(Some("/")).unwrap();
    req.set_authority(Some("example.com")).unwrap();

    // Send the request and wait for it to complete
    let res = handle(req, None).unwrap();  // 1. We're ready to send the request over the network
    let pollable = res.subscribe();        // 2. We obtain the `Pollable` from the response future
    poll::poll(&[&pollable]);              // 3. Block until we're ready to look at the response

    // We're now ready to try and access the response headers. If
    // the request was unsuccessful we might still get an error here,
    // but we won't get an error that we tried to read data before the
    // operation was completed.
    let res = res.get().unwrap().unwrap().unwrap();
    for (name, _) in res.headers().entries() {
        println!("header: {name}");
    }
}

When run via cargo-component (and the -- -S http flag is passed), this should print the following:

header: age
header: cache-control
header: content-type
header: date
header: etag
header: expires
header: last-modified
header: server
header: vary
header: x-cache
header: content-length

Designing the poller abstraction

In the HTTP request example we've shown how to make a single HTTP request and registered that one operation with the poll call. One of the main benefits of async computing is the ability to perform ad-hoc concurrency: we don't just want to be able to wait for the one operation to complete; we want to be able to wait on any number of operations to complete at the same time. This is why the poll function takes a list of Pollable types, and returns a list of which ones are ready to continue doing work.

For that we need to create some type which can hold the pollables for us, be somewhat efficient about it, and allow us to associate the list of "ready" events with some notion of identity. This is important because we're building a runtime for async Rust, which requires that we associate "readiness events" with "calls to specific wakers". We can begin by defining a struct containing a slab. This is an efficient key-value data structure which on entry gives you a key to access the value. It is unordered and reuses allocations - which is great because we'll be continuously registering and deregistering interest in different Pollables.

#[derive(Debug)]
pub(crate) struct Poller {
    pub(crate) targets: Slab<Pollable>,
}

Whenever we insert a type into Slab it returns a key of type usize. To make working with it easier, and make bridging into the WASI world easier, we're going to define our own EventKey type to wrap the keys for us, like so:

/// A key representing an entry into the poller
#[repr(transparent)]
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
pub(crate) struct EventKey(pub(crate) u32);

Now we're ready to implement the base CRUD methods on the Poller: new, insert, remove, and get. Aside from the translation to EventKey, there's nothing interesting going on here.

impl Poller {
    /// Create a new instance of `Poller`
    pub(crate) fn new() -> Self {
        Self {
            targets: Slab::new()
        }
    }

    /// Insert a new `Pollable` target into `Poller`
    pub(crate) fn insert(&mut self, target: Pollable) -> EventKey {
        let key = self.targets.insert(target);
        EventKey(key as u32)
    }

    /// Get a `Pollable` if it exists.
    pub(crate) fn get(&self, key: &EventKey) -> Option<&Pollable> {
        self.targets.get(key.0 as usize)
    }

    /// Remove an instance of `Pollable` from `Poller`.
    ///
    /// Returns `None` if no entry was found for `key`.
    pub(crate) fn remove(&mut self, key: EventKey) -> Option<Pollable> {
        self.targets.try_remove(key.0 as usize)
    }
}

And finally we're ready to implement the most interesting part: implementing the method to wait for events in the Poller, and map them to their respective EventKeys. When we submit a list of Pollables to the poll call, we get a list of indexes back which refer to the indexes of the pollables we submitted. These are not the same values as the EventKeys we have, so we have to construct a lookup table to map the indexes of the pollables back to the event keys.

impl Poller {
    /// Block the current thread until a new event has triggered.
    ///
    /// This will clear the value of `ready_list`.
    pub(crate) fn block_until(&mut self) -> Vec<EventKey> {
        // We're about to wait for a number of pollables. When they wake we get
        // the *indexes* back for the pollables whose events were available - so
        // we need to be able to associate the index with the right waker.

        // We start by iterating over the pollables, and keeping note of which
        // pollable belongs to which waker index
        let mut indexes = Vec::with_capacity(self.targets.len());
        let mut targets = Vec::with_capacity(self.targets.len());
        for (index, target) in self.targets.iter() {
            indexes.push(index);
            targets.push(target);
        }

        // Now that we have that association, we're ready to poll our targets.
        // This will block until an event has completed.
        let ready_indexes = poll(&targets);

        // Once we have the indexes for which pollables are available, we need
        // to convert it back to the right keys for the wakers. Earlier we
        // established a positional index -> waker key relationship, so we can
        // go right ahead and perform a lookup there.
        ready_indexes
            .into_iter()
            .map(|index| EventKey(indexes[index as usize] as u32))
            .collect()
    }
}

And with that, our Pollable abstraction is complete!

Designing the reactor abstraction

Now that we can track lists of pollables and a way to submit them - it's time to bring wakers into the mix and create a type which can be used by individual futures to register interest in events. The WASI async runtime is fundamentally single-threaded, so what we'll want to use here are just the Rc and RefCell types. We can now construct our data types; again starting with the core structures:

use super::polling::{EventKey, Poller};

use std::collections::HashMap;
use std::task::Poll;
use std::task::Waker;
use std::{cell::RefCell, rc::Rc};
use wasi::io::poll::Pollable;

/// Manage async system resources for WASI 0.1
#[derive(Debug, Clone)]
pub struct Reactor {
    inner: Rc<RefCell<InnerReactor>>,
}

/// The private, internal `Reactor` implementation - factored out so we can take
/// a lock of the whole.
#[derive(Debug)]
struct InnerReactor {
    poller: Poller,
    wakers: HashMap<EventKey, Waker>,
}

impl Reactor {
    /// Create a new instance of `Reactor`
    pub(crate) fn new() -> Self {
        Self {
            inner: Rc::new(RefCell::new(InnerReactor {
                poller: Poller::new(),
                wakers: HashMap::new(),
            })),
        }
    }
}

The reason we can get away with Rc<RefCell<Inner>> is because we control all the accesses in the methods. We guarantee they are always short-lived and will never overlap. Which in turn provides us with a convenient way to to be able to call methods on &self rather than &mut self. Speaking of which, time to implement the first method - which is a way to register interest in a pollable.

Registering interest in a pollable

This provides an async method wait_for which allows our reactor to wait asynchronously until the call related to the pollable is ready to be performed.

impl Reactor {
    /// Wait for the pollable to resolve.
    pub async fn wait_for(&self, pollable: Pollable) {
        let mut pollable = Some(pollable);
        let mut key = None;

        // This function is the core loop of our function; it will be called
        // multiple times as the future is resolving.
        future::poll_fn(|cx| {
            // Start by taking a lock on the reactor. This is single-threaded
            // and short-lived, so it will never be contended.
            let mut reactor = self.inner.borrow_mut();

            // Schedule interest in the `pollable` on the first iteration. On
            // every iteration, register the waker with the reactor.
            let key = key.get_or_insert_with(|| reactor.poller.insert(pollable.take().unwrap()));
            reactor.wakers.insert(*key, cx.waker().clone());

            // Check whether we're ready or need to keep waiting. If we're
            // ready, we clean up after ourselves.
            if reactor.poller.get(key).unwrap().ready() {
                reactor.poller.remove(*key);
                reactor.wakers.remove(key);
                Poll::Ready(())
            } else {
                Poll::Pending
            }
        })
        .await
    }
}

What's nice about this is that unlike other runtimes, where interest is registered on the object, here we merely register interest on an operation. That means that "wait for this operation to be ready" is a matter of calling reactor.wait_for(pollable).await. That is significantly different than the juggling of syscalls required in other polling models; and it means we can encode all of the invariants necessary for the reactor directly as part of the reactor.

In recent conversations in the Rust WG Async we've been discussing whether it makes sense to have some kind of reactor hook as part of the Context in future - or whether perhaps other mechanisms would be better. From this example I believe we can conclude that while it might be possible to share a reactor through the future's context - at a high level that will always be mapped the same way to the underlying function calls. Which at least to me puts into question whether that is the most natural mapping.

Blocking until events are ready

With that out of the way, we're ready to implement the wrapper around our Poller::block_until call. As we implemented, it will wait for all registered pollers, and give us back a list of EventKeys for the ones which are ready to be woken. All we have to do is iterate over the list of keys, and call each related waker:

/// Block until new events are ready. Calls the respective wakers once done.
pub(crate) fn block_until(&self) {
    let mut reactor = self.inner.borrow_mut();
    for key in reactor.poller.block_until() {
        match reactor.wakers.get(&key) {
            Some(waker) => waker.wake_by_ref(),
            None => panic!("tried to wake the waker for non-existent `{key:?}`"),
        }
    }
}

At first glance it might seem silly that this goes through the motions of calling the wakers. WASI 0.2 is currently single-threaded, so by default it will always just wake the one waker. However, it is common and encouraged to use wakers to distinguish between events. Concurrency primitives may construct their own wakers to keep track of identity and wake more precisely. We do not control the wakers constructed by libraries, and it is for this reason that we have to call all the wakers - even if by default they will do nothing.

Designing the block_on abstraction

Now that we have our reactor done, we need a way to drive it. For that we're going to define a function block_on which takes a closure returning a future and gives it access to the Reactor. As long as the future returned by the closure is live, we'll keep making progress on it - waiting on the reactor each time the future returns Pending.

We'll be driving futures by hand here, so the first step is to define our own instance of Waker. We're not supporting any form of spawn function, and instead rely on users to leverage libraries for structured concurrency, so out of the box we can just make it a noop.

/// Construct a new no-op waker
// NOTE: we can remove this once <https://github.com/rust-lang/rust/issues/98286> lands
fn noop_waker() -> Waker {
    const VTABLE: RawWakerVTable = RawWakerVTable::new(
        // Cloning just returns a new no-op raw waker
        |_| RAW,
        // `wake` does nothing
        |_| {},
        // `wake_by_ref` does nothing
        |_| {},
        // Dropping does nothing as we don't allocate anything
        |_| {},
    );
    const RAW: RawWaker = RawWaker::new(ptr::null(), &VTABLE);

    // SAFETY: all fields are no-ops, so this is safe
    unsafe { Waker::from_raw(RAW) }
}

Next up is the actual block_on implementation. The core logic is basically a loop { match {} } statement which just calls reactor.block_until on each iteration until the future completes.


/// Start the event loop
pub fn block_on<F, Fut>(f: F) -> Fut::Output
where
    F: FnOnce(Reactor) -> Fut,
    Fut: Future,
{
    // Construct the reactor
    let reactor = Reactor::new();

    // Create the future and pin it so it can be polled
    let fut = (f)(reactor.clone());
    let mut fut = pin!(fut);

    // Create a new context to be passed to the future.
    let waker = noop_waker();
    let mut cx = Context::from_waker(&waker);

    // Either the future completes and we return, or some IO is happening
    // and we wait.
    loop {
        match fut.as_mut().poll(&mut cx) {
            Poll::Ready(res) => return res,
            Poll::Pending => reactor.block_until(),
        }
    }
}

And with that, our async runtime is complete!

Using the async runtime

This section was added post-publication on 2024-03-07

Now that we have our runtime, we can use it to wrap the calls from the wasi library and make them async. For example, say we wanted to have a non-blocking sleep function which waits for some Duration. We could write that by using the subscribe_duration function and connecting it to the reactor. All we need to do is wrap it, call it, and then wait on the pollable. That should be simple enough to do with what we've written:

use std::time::Duration;
use wasi::time::monotonic_clock::subscribe_duration;

pub async fn sleep(duration: Duration, reactor: &Reactor) {
    let duration = duration.as_nanos() as u64;     // 1. Convert the duration to nanosecond resolution
    let pollable = subscribe_duration(duration);   // 2. Obtain the pollable
    reactor.wait_for(pollable).await;              // 3. Wait for the pollable to be ready
}

It should be simple enough to take APIs from the wasi crate, and pair them with a call to reactor.wait_for(..).await. That's the base pattern we's use to asyncify basically the entire wasi API surface.

On the absence of a local executor

WASI 0.2 is single-threaded, and fundamentally doesn't need access to a task.spawn abstraction. In sync Rust we use threads to combine parallelism + concurrency; but in async Rust we can separate the two. The futures-concurrency library provides access to any mode of concurrency you might want; meaning that in the absence of parallelism there is no reason for an executor to exist.

Instead APIs such as Vec::join and StreamGroup even provide access to the unbounded concurrency primitives people typically use "local executors" for - but without any of the scoped lifetime issues plaguing those abstractions. Here is an example of how to concurrently make two separate HTTP requests without having to rely on "local tasks":

fn main() {
    block_on(|reactor| async {
        let client = Client::new(reactor); // <- using an unpublished wrapper around `wasi::http`

        let a = async {
            let url = "https://example.com".parse().unwrap();
            let req = Request::new(Method::Get, url);
            let res = client.send(req).await;

            let body = read_to_end(res).await;
            let body = String::from_utf8(body).unwrap();
            println!("{body}");
        };

        let b = async {
            let url = "https://example.com".parse().unwrap();
            let req = Request::new(Method::Get, url);
            let res = client.send(req).await;

            let body = read_to_end(res).await;
            let body = String::from_utf8(body).unwrap();
            println!("{body}");
        };

        (a, b).join().await; // concurrently await both `a` and `b`.
    })
}

Conclusion

In this post I've explained WASI's polling model and showed step-by-step how to use it to build your own async runtime. I hope this will be useful for maintainers of async Rust runtimes, as well as hobbyists wanting to go through the motions of writing their own. If you prefer to just use the code I've shared here today, you can do so by installing the wasi-async-runtime crate.