Async Cancellation II: Time and Signals
— 2022-06-10

  1. timer basics
  2. cancelling execution at a distance
  3. delaying execution
  4. throttling execution
  5. conclusion

For the past few years I've been working on the async-std library, which provides an async implementation of the APIs exposed by std. However, we also added several new APIs related to things unique to async Rust: concurrency, control over execution, and the interaction between the two.

These APIs were initially introduced in async-std as "unstable", and have been the main focus of my work to design since. On this blog there are numerous posts related to for example: concurrency, cancellation, and parallelism. Today I want to share a new experiment I've been working for time-based operations in async Rust. I've designed it as a stand-alone crate for now, but I intend to PR its addition to async-std in the near future.

Timer basics

Relevant APIs here are:

In order to do anything with time you need to have timers. There are two basic types of timers: a timer which fires once at a specific time, and a timer which fires repeatedly after each interval 1. In futures-time you can create both these timers by concstructing these futures directly. But more interestingly: we allow people to construct these futures by convert an existing time::Duration or time::Instant directly into a timer through the IntoFuture and IntoStream traits.

1

People would be right to point out that Interval can be implemented in terms of Sleep. But even though that's the case, APIs will have to choose between "do I fire once" or "do I fire repeatedly", and for that reason it's worth distinguishing between the two. Besides: Interval is so common that we don't want folks to have to implement it over and over, so we just provide it as one of the two basic timer types.

Because both these traits are also implemented for all T: Future and T: Stream respectively, time-based APIs can take timer: impl IntoFuture or instant: impl IntoStream and pass either Duration/Instant directly, or another future.

use futures_timer::prelude::*;
use futures_timer::{task, time::Duration};

async fn wait_until(timer: impl IntoFuture) {
    let timer = timer.into_future();
    timer.await;
}

#[async_std::main]
async fn main() {
    // Wait for a `Duration`.
    wait_until(Duration::from_secs(1));

    // Wait for a concrete future.
    let deadline = task::sleep(Duration::from_secs(1));
    wait_until(deadline);
}

To simplify working with the orphan rules, we ended up defining our own variants of IntoFuture, IntoStream, Duration, and Instant. When we move this to async-std we can probably simplify some of it - but we'll likely need to keep our own definitions of Duration and Instant just for this to keep working.

Cancelling execution at a distance

Relevant APIs here are:

As I showed in the last section, thanks to the implementation of the Into* traits functions can be agnostic over whether they take a Duration/Instant instance, or whether they take a Future. For the last three years we've been trying to figure out how we could integrate the stop-token crate into async-std.

For a deep-dive into how cancellation works in async Rust, you can read the last post in this series. But the short version is: futures can be cancelled between .await points just by dropping the future. A timeout is nothing more than a "race" between a future F and a timer future. If F completes first, you get an Ok(value). And if the timer completes first, you get an Err(TimeoutError). That's concepts composing nicely with each other!

We tried various APIs for cancellation, but none really felt good. So I figured I just needed to give it some time, and started looking at async temporal operations. When I saw RxJS's timeout and timeoutWith operators I realized we might actually be able to combine both functionality through a single API. And instead of using a dedicated StopSource / StopToken pair, we can instead reuse the async-channel crate to create a sender + receiver. This is what it looks like when used together:

use futures_lite::prelude::*;
use futures_time::prelude::*;
use futures_time::channel;
use futures_time::time::Duration;

fn main() {
    async_io::block_on(async {
        let (send, mut recv) = channel::bounded::<()>(1); // create a new send/receive pair
        let value = async { "meow" }
            .delay(Duration::from_millis(100))
            .timeout(recv.next()) // time-out when the sender emits a message
            .await;

        assert_eq!(value.unwrap(), "meow");
    })
}

This example will always return meow because send is never dropped. But in a concurrent example, a timeout could be triggered for all instances of recv (which implements Clone) by dropping send. This is pretty nice because it combines several individually useful components to create "cancel at a distance semantics" which are usually reserved for dedicated APIs.

But there are still several rough edges in this API, which I think we may be able to address:

  1. There are multiple imports from different crates required to get this to work. Though this will likely be smoothed out somewhat by exposing it from async-std.
  2. When constructing a channel you have to declare a mut recv instead of recv. This is because recv.next() expects &mut self. Implementing IntoFuture for async_channel::Receiver would remove that requirement, streamlining things somewhat.
  3. channel::bounded needs to know what type it is, so it takes a ::<()> turbo-fish. Creating zero-sized channels for messaging is useful on its own, so once we're able to we may want the constructor to default to a zero-sized type.

If we migrate to async-std and have all these changes applied, usage might end up looking somewhat like this:

use async_std::prelude::*;
use async_std::channel;
use async_std::time::Duration;

#[async_std::main]
async fn main() {
    let (send, recv) = channel::bounded(1);
    let value = async { "meow" }
        .delay(Duration::from_millis(100))
        .timeout(recv)
        .await;

    assert_eq!(value.unwrap(), "meow");
}

Delaying execution

Relevant APIs here are:

Arbitrary cancellation of execution might be one of the most important features of async Rust, but it's not the only one. By gaining control over execution we also gain the ability to delay, pause, and resume execution as well. Here's an example of how to delay an execution by a set time limit, or until a signal is sent:

use futures_time::prelude::*;
use futures_time::time::{Instant, Duration};
use std::future;

fn main() {
    async_io::block_on(async {
        let value = future::ready("meow")
            .delay(Duration::from_millis(100)) // delay resolving the future by 100ms
            .await;
        assert_eq!(value, Some("meow"));
    });
}

I just authored the park methods for this post, but didn't get around to writing examples for them yet. park works similar to std::thread::park, but instead of only working on threads it works with any future or stream. You call it by passing it a channel which can park or unpark the future, allowing for convenient pausing and resuming of execution.

In a previous post I mentioned that I suspect the right analogy for "task" is "parallelizable future" rather than "async thread", and showing that we can implement park for all futures rather than only on Task is another indication that this new model may in fact be the right one.

Throttling execution

Relevant APIs here are:

Generally data loss is considered a bad thing. But sometimes… data loss can be a good thing? Take for example mouse presses: when something is clicked once, you want to handle that. But if something is pressed 15 times in .3 seconds, perhaps not so much. This is an example of "external input", and since we cannot apply backpressure to external conditions, the only method we have to reduce strain on our system is to actively drop load.

In the crate we provide some versatile, basic methods to shed load based on different circumstances. The debounce and sample methods get the last item seen during a particular period. The throttle method will let through the first items received during a particular period. And finally buffer groups data from a particular time range into a vector, allowing operators to operate on chunks of data rather than needing to decide on individual items.

use futures_lite::prelude::*;
use futures_time::prelude::*;
use futures_time::time::{Instant, Duration};
use futures_time::stream;

fn main() {
    async_io::block_on(async {
        let mut counter = 0;
        stream::interval(Duration::from_millis(5)) // yield every 5ms
            .take(10)                              // stop after 10 iterations
            .buffer(Duration::from_millis(20))     // create 20ms windows
            .for_each(|buf| counter += buf.len())  // sum all items
            .await;

        assert_eq!(counter, 10);
    })
}

Conclusion

And that concludes the whirlwind-tour of the futures-time crate. I'm pretty happy with the resulting APIs. For the longest time we were struggling to create APIs which could operate on {Duration, Instant, impl Future}, and this usually resulted in needing to declare three different APIs for each piece of functionality. The insight that we could implement IntoFuture for Instant and Duration closed that gap:

async fn run(mut recv: Receiver<()>) {
                                                                // Notice how we're using the same API:
    let db = start_db().timeout(Duration::from_secs(10)).await; // Timeout based on a duration
    let server = start_server().timeout(recv.next()).await;     // Timeout based on a signal
}

One of the things I'm least sure about though is what it feels like to convert a Duration to a Future outside a trait bound. Admittedly this feels somewhat awkward to me:

use futures_time::time::Duration;
use futures_time::prelude::*;

async fn main() {
    Duration::from_secs(2).await; // await a duration of 2 secs?
}

I'm not sure. It might be the verb-noun thing, but it feels a bit weird. But remembering an API Sean McArthur proposed years ago, we might be able to streamline this a bit using conversion traits2:

2

Note that IntoDuration does not yet exist, but here's a playground link to what that might look like.

use futures_time::time::Duration;
use futures_time::prelude::*;
use into_duration::IntoDuration;

async fn main() {
    2.secs().await; // wait for 2 secs.
}

This reads a bit more fluent than what we might be used from most Rust APIs, but I don't think that's a bad thing. I think it just takes some getting used to. And it actually works quite nicely in other places too:

use async_std::prelude::*;
use async_std::channel;
use async_std::time::Duration;

#[async_std::main]
async fn main() {
    let value = async { "meow" }
        .delay(100.millis())     // <- using this API
        .await;

    assert_eq!(value.unwrap(), "meow");
}

And thinking further ahead: if we get async IntoIterator + async iteration syntax, having it implemented for Duration could allow us to write this:

let start = Instant::now();
for await now in 1.secs() {          // loop every 1 second
    dbg!(now.duration_since(start)); // print elapsed time
}

Either way, there are some fun options available here. Implementing IntoFuture / async IntoIterator for Duration is one of the things I'm still least sure about - but I wonder how much of it is just a matter of getting used to something novel that's now suddenly possible. I really don't know.

That said though; I'm really happy with the way the library has turned out. And I hope with some feedback and iteration it'll be good enough to include into async-std, and eventually also help us inform whether and how we want to expose this functionality from the stdlib.

There are a bunch of time-based APIs we haven't covered. But we certainly have covered the fundamentals. And in particular I'm happy that we've finally found a promising API for remote cancellation / pausing / resumption of arbitrary futures. This is one of the key features async Rust provides, and actually providing APIs for them seems important.

Either way, that's what I have for now. If you want to check out the repo, you can find it here: yoshuawuyts/futures-time. If you'd like to keep up with my thoughts in near-real-time you can follow me on Twitter. And if you think the work I'm doing is useful and you'd like to send me a tip, you can sponsor me here. Thanks!