Async Cancellation II: Time and Signals
— 2022-06-10
For the past few years I've been working on the async-std
library, which
provides an async implementation of the APIs exposed by std
. However, we also
added several new APIs related to things unique to async Rust: concurrency,
control over execution, and the interaction between the two.
These APIs were initially introduced in async-std
as "unstable", and have been
the main focus of my work to design since. On this blog there are numerous posts
related to for example:
concurrency,
cancellation, and
parallelism. Today I want to
share a new experiment I've been working for time-based operations in async
Rust. I've designed it as a stand-alone crate for now, but I intend to PR its
addition to async-std
in the near future.
Timer basics
Relevant APIs here are:
In order to do anything with time you need to have timers. There are two basic
types of timers: a timer which fires once at a specific time, and a timer which
fires repeatedly after each interval 1. In futures-time
you can create both
these timers by concstructing these futures directly. But more interestingly: we
allow people to construct these futures by convert an existing
time::Duration
or time::Instant
directly into a timer through the
IntoFuture
and IntoStream
traits.
People would be right to point out that Interval
can be
implemented in terms of Sleep
. But even though that's the case, APIs will have
to choose between "do I fire once" or "do I fire repeatedly", and for that
reason it's worth distinguishing between the two. Besides: Interval
is so common
that we don't want folks to have to implement it over and over, so we just provide it
as one of the two basic timer types.
Because both these traits are also implemented for all T: Future
and
T: Stream
respectively, time-based APIs can take timer: impl IntoFuture
or
instant: impl IntoStream
and pass either Duration
/Instant
directly, or
another future.
use futures_timer::prelude::*;
use futures_timer::{task, time::Duration};
async fn wait_until(timer: impl IntoFuture) {
let timer = timer.into_future();
timer.await;
}
#[async_std::main]
async fn main() {
// Wait for a `Duration`.
wait_until(Duration::from_secs(1));
// Wait for a concrete future.
let deadline = task::sleep(Duration::from_secs(1));
wait_until(deadline);
}
To simplify working with the orphan rules, we ended up defining our own variants
of IntoFuture
, IntoStream
, Duration
, and Instant
. When we move this to
async-std
we can probably simplify some of it - but we'll likely need to keep
our own definitions of Duration
and Instant
just for this to keep working.
Cancelling execution at a distance
Relevant APIs here are:
As I showed in the last section, thanks to the implementation of the Into*
traits functions can be agnostic over whether they take a Duration
/Instant
instance, or whether they take a Future
. For the last three years we've been
trying to figure out how we could integrate the stop-token
crate into async-std
.
For a deep-dive into how cancellation works in async Rust, you can read the
last post in this series.
But the short version is: futures can be cancelled between .await
points just
by dropping the future. A timeout is nothing more than a "race" between a future
F
and a timer future. If F
completes first, you get an Ok(value)
. And if the
timer completes first, you get an Err(TimeoutError)
. That's concepts composing
nicely with each other!
We tried various APIs for cancellation, but none really felt good. So I figured I
just needed to give it some time, and started looking at async temporal
operations. When I saw RxJS's
timeout
and
timeoutWith
operators I realized
we might actually be able to combine both functionality through a single API. And instead of using a dedicated
StopSource
/
StopToken
pair, we can instead reuse the async-channel
crate to create a sender + receiver.
This is what it looks like when used together:
use futures_lite::prelude::*;
use futures_time::prelude::*;
use futures_time::channel;
use futures_time::time::Duration;
fn main() {
async_io::block_on(async {
let (send, mut recv) = channel::bounded::<()>(1); // create a new send/receive pair
let value = async { "meow" }
.delay(Duration::from_millis(100))
.timeout(recv.next()) // time-out when the sender emits a message
.await;
assert_eq!(value.unwrap(), "meow");
})
}
This example will always return meow because send
is never dropped. But in a
concurrent example, a timeout could be triggered for all instances of recv
(which implements Clone
) by dropping send
. This is pretty nice because it
combines several individually useful components to create "cancel at a distance
semantics" which are usually reserved for dedicated APIs.
But there are still several rough edges in this API, which I think we may be able to address:
- There are multiple imports from different crates required to get this to
work. Though this will likely be smoothed out somewhat by exposing it from
async-std
. - When constructing a channel you have to declare a
mut recv
instead ofrecv
. This is becauserecv.next()
expects&mut self
. ImplementingIntoFuture
forasync_channel::Receiver
would remove that requirement, streamlining things somewhat. channel::bounded
needs to know what type it is, so it takes a::<()>
turbo-fish. Creating zero-sized channels for messaging is useful on its own, so once we're able to we may want the constructor to default to a zero-sized type.
If we migrate to async-std
and have all these changes applied, usage might end
up looking somewhat like this:
use async_std::prelude::*;
use async_std::channel;
use async_std::time::Duration;
#[async_std::main]
async fn main() {
let (send, recv) = channel::bounded(1);
let value = async { "meow" }
.delay(Duration::from_millis(100))
.timeout(recv)
.await;
assert_eq!(value.unwrap(), "meow");
}
Delaying execution
Relevant APIs here are:
futures_time::future::FutureExt::delay
futures_time::stream::StreamExt::delay
futures_time::future::FutureExt::park
futures_time::stream::StreamExt::park
Arbitrary cancellation of execution might be one of the most important features of async Rust, but it's not the only one. By gaining control over execution we also gain the ability to delay, pause, and resume execution as well. Here's an example of how to delay an execution by a set time limit, or until a signal is sent:
use futures_time::prelude::*;
use futures_time::time::{Instant, Duration};
use std::future;
fn main() {
async_io::block_on(async {
let value = future::ready("meow")
.delay(Duration::from_millis(100)) // delay resolving the future by 100ms
.await;
assert_eq!(value, Some("meow"));
});
}
I just authored the park
methods for this post, but didn't get around
to writing examples for them yet. park
works similar to std::thread::park
,
but instead of only working on threads it works with any future or stream. You
call it by passing it a channel which can park or unpark the future, allowing
for convenient pausing and resuming of execution.
In a previous post I
mentioned that I suspect the right analogy for "task" is "parallelizable future"
rather than "async thread", and showing that we can implement park
for all
futures rather than only on Task
is another indication that this new
model may in fact be the right one.
Throttling execution
Relevant APIs here are:
futures_time::stream::StreamExt::buffer
futures_time::stream::StreamExt::debounce
futures_time::stream::StreamExt::sample
futures_time::stream::StreamExt::throttle
Generally data loss is considered a bad thing. But sometimes… data loss can be a good thing? Take for example mouse presses: when something is clicked once, you want to handle that. But if something is pressed 15 times in .3 seconds, perhaps not so much. This is an example of "external input", and since we cannot apply backpressure to external conditions, the only method we have to reduce strain on our system is to actively drop load.
In the crate we provide some versatile, basic methods to shed load based on
different circumstances. The debounce
and sample
methods get the last item
seen during a particular period. The throttle
method will let through the first items received
during a particular period. And finally buffer
groups data from a particular
time range into a vector, allowing operators to operate on chunks of data rather
than needing to decide on individual items.
use futures_lite::prelude::*;
use futures_time::prelude::*;
use futures_time::time::{Instant, Duration};
use futures_time::stream;
fn main() {
async_io::block_on(async {
let mut counter = 0;
stream::interval(Duration::from_millis(5)) // yield every 5ms
.take(10) // stop after 10 iterations
.buffer(Duration::from_millis(20)) // create 20ms windows
.for_each(|buf| counter += buf.len()) // sum all items
.await;
assert_eq!(counter, 10);
})
}
Conclusion
And that concludes the whirlwind-tour of the
futures-time
crate. I'm pretty happy with the
resulting APIs. For the longest time we were struggling to create APIs which
could operate on {Duration, Instant, impl Future}
, and this usually resulted in
needing to declare three different APIs for each piece of functionality. The
insight that we could implement IntoFuture
for Instant
and Duration
closed
that gap:
async fn run(mut recv: Receiver<()>) {
// Notice how we're using the same API:
let db = start_db().timeout(Duration::from_secs(10)).await; // Timeout based on a duration
let server = start_server().timeout(recv.next()).await; // Timeout based on a signal
}
One of the things I'm least sure about though is what it feels like to convert a
Duration
to a Future
outside a trait bound. Admittedly this feels somewhat
awkward to me:
use futures_time::time::Duration;
use futures_time::prelude::*;
async fn main() {
Duration::from_secs(2).await; // await a duration of 2 secs?
}
I'm not sure. It might be the verb-noun thing, but it feels a bit weird. But remembering an API Sean McArthur proposed years ago, we might be able to streamline this a bit using conversion traits2:
Note that IntoDuration
does not yet exist, but here's a playground link to what that might look like.
use futures_time::time::Duration;
use futures_time::prelude::*;
use into_duration::IntoDuration;
async fn main() {
2.secs().await; // wait for 2 secs.
}
This reads a bit more fluent than what we might be used from most Rust APIs, but I don't think that's a bad thing. I think it just takes some getting used to. And it actually works quite nicely in other places too:
use async_std::prelude::*;
use async_std::channel;
use async_std::time::Duration;
#[async_std::main]
async fn main() {
let value = async { "meow" }
.delay(100.millis()) // <- using this API
.await;
assert_eq!(value.unwrap(), "meow");
}
And thinking further ahead: if we get async IntoIterator
+ async iteration
syntax, having it implemented for Duration
could allow us to write this:
let start = Instant::now();
for await now in 1.secs() { // loop every 1 second
dbg!(now.duration_since(start)); // print elapsed time
}
Either way, there are some fun options available here. Implementing IntoFuture
/ async IntoIterator
for Duration
is one of the things I'm still least sure
about - but I wonder how much of it is just a matter of getting used to
something novel that's now suddenly possible. I really don't know.
That said though; I'm really happy with the way the library has turned out. And
I hope with some feedback and iteration it'll be good enough to include into
async-std
, and eventually also help us inform whether and how we want to expose this
functionality from the stdlib.
There are a bunch of time-based APIs we haven't covered. But we certainly have covered the fundamentals. And in particular I'm happy that we've finally found a promising API for remote cancellation / pausing / resumption of arbitrary futures. This is one of the key features async Rust provides, and actually providing APIs for them seems important.
Either way, that's what I have for now. If you want to check out the repo, you
can find it here:
yoshuawuyts/futures-time
. If
you'd like to keep up with my thoughts in near-real-time you can follow me on
Twitter. And if you think the work I'm doing
is useful and you'd like to send me a tip, you can sponsor me
here. Thanks!