Futures Concurrency I: Introduction
— 2019-09-02
- futures concurrency
- infallible: join
- infallible: select
- summary: infallible functions
- fallible: join
- fallible: select
- summary: fallible functions
- variadics
- methods
- async concurrency: javascript
- async concurrency: golang
- pattern: log all errors in try_select
- conclusion
In a previous post we've looked at Rust streams. In this post we're going to discuss another problem in the async space: futures concurrency combinators.
We're going to cover the different forms of concurrency that can be expressed with Futures, and cover both fallible and infallible variants.
For the sake of this article I'm going to assume you're already familiar with Futures as a mechanism for scheduling async work. If you're not, I recommend starting with the Rust async book.
Futures Concurrency
In general when you're running Futures you can schedule them in one of 3 ways:
- Run a future, and wait for it to complete.
- Run multiple futures, and wait for all of them to complete.
- Run multiple futures, and wait for the first one to complete.
The first way of scheduling futures is "sequential", and the default if we
await
multiple futures in a row. The other two scheduling approaches are
"concurrent", and have some nuances and variations to them. We also distinguish
between "fallible" and "infallible", where we'll start with the infallible
versions first.
Infallible: Join
The first concurrent approach is "parallel". In this mode we have a multiple futures that we all want to complete. We await them all together, and write the result of each future to a tuple. An example of this:
use async_std::future;
let a = future::ready(1);
let b = future::ready(2);
let c = future::join(a, b);
assert_eq!(c.await, (1, 2));
Where the function signature would be:
pub async fn join<L, R, T1, T2>(left: L, right: R) -> (T1, T2)
where L: Future<Output = T1>,
R: Future<Output = T2>;
If we want to join more than two futures we can nest calls to join
, and
later in this post we'll look at possible ways to make this an even better
experience.
Infallible: Select
The other form of concurrency is having multiple futures, and wanting one to resolve. This is generally less common than waiting for all futures to resolve, but still useful when for example wanting to set a time limit on a future.
There are many concurrency scenarios imaginable, but in general this means
waiting for the first future to resolve. This is generally done through the
select
function. An example:
use async_std::future;
let a = future::pending();
let b = future::ready(1);
let c = future::select(a, b);
assert_eq!(c.await, 1);
Where the function signature would be:
pub async fn select<F, T>(left: F, right: F) -> T
where F: Future<Output = T>;
Summary: Infallible functions
For operating on infallible futures (e.g. futures that don't return Result
),
we've defined the following methods:
Name | Return signature | When does it return? |
---|---|---|
future::join | (T1, T2) | Wait for all to complete |
future::select | T | Return on first value |
Let's move onto fallible futures next!
Fallible: Join
When calling join
on two futures that return Result
, being handed back a
(Result<T, E>, Result<T, E>)
makes error handling rather awkward:
use async_std::future;
let a = future::ready(Ok(1));
let b = future::ready(Ok(2));
let c = future::join(a, b).await;
assert_eq!((c.0?, c.1?), (1, 2)); // this is not great
Instead it'd be much nicer if we were handed back a Result<(T1, T2), E>
, so we
could handle errors more or less like we'd expect to:
use async_std::future;
let a = future::ready(Ok(1));
let b = future::ready(Ok(2));
let c = future::try_join(a, b);
assert_eq!(c.await?, (1, 2)); // much better
This introduces a new function try_join
with a signature of:
pub async fn try_join<L, R, T1, T2, E>(left: L, right: R) -> Result<(T1, T2), E>
where L: Future<Output = Result<T1, E>>,
R: Future<Output = Result<T2, E>>;
An implication of the API returning Result<(T1, T2), E>
is that if either of the
futures returns Err
, we should drop the other future because it will never
yield its result. This is commonly referred to as early rejection.
If it's undesirable to reject early, using join
instead of try_join
allows
all futures to complete before returning.
Fallible: Select
Like we said earlier, the purpose of select
is to get back the first result
from multiple futures. However, say we want to check both our local cache and a
remote cache if something is stored there, we don't necessarily care if either
of them fails, as long as one of them has the result we want. We cannot express
this using the regular select
function:
use async_std::future;
let a = future::ready(err!("oh no!")); // some error
let b = future::ready(Ok(1));
let c = future::select(a, b);
assert!(c.await.is_err()); // if `a` is an error, we cannot proceed to `b`
The semantics we want are: "give back the first response that succeeds, or if none of them succeed return the last error."
use async_std::future;
let a = future::ready(err!("oh no!")); // some error
let b = future::ready(Ok(1));
let c = future::try_select(a, b);
assert_eq!(c.await?, 1); // `a` failed, `b` succeeded -- so we get `b`
Where the function signature would be:
pub async fn try_select<F, T, E>(left: F, right: F) -> Result<T, E>
where F: Future<Output = Result<T, E>>;
If it's undesirable to get keep trying until all options have been exhausted,
then it's acceptable to use select
for fallible futures too, to get the
first future regardless of whether it succeeded, and reject early if it failed.
Summary: Fallible functions
When talking about fallible futures, we have access to a total of 4 functions:
Name | Return signature | When does it return? |
---|---|---|
future::join | (Result<T, E>, Result<T, E>) | Wait for all to complete |
future::try_join | Result<(T1, T2), E> | Return on first Err , wait for all to complete |
future::select | Result<T, E> | Return on first value |
future::try_select | Result<T, E> | Return on first Ok , reject on last Err |
In most cases the try_
functions will be the right choice for fallible
futures, but in the off chance it's not, the regular variants allow bypassing
the semantics to do more custom things. And even better: by mixing both kinds
of functions, more complicated flows can be created.
Variadics
The functions we've talked about so far are quite nice, but it's not rare to
want join
or select
more than two futures. A way to do this is to apply
combinators multiple times:
use async_std::future;
let a = future::ready(1);
let b = future::ready(2);
let c = future::ready(3);
let j = future::join(a, b);
let k = future::join(j, c);
assert_eq!(k.await, ((1, 2), 3)); // this is probably not great to work with though
Instead it'd be much nicer if we could provide an n
length input, and get back
an n
length tuple. The way to do this is through variadics; functions with
varying input. The way we can write variadics in Rust today is by using
macros:
use async_std::future;
let a = future::ready(1);
let b = future::ready(2);
let c = future::ready(3);
let j = future::join!(a, b, c);
assert_eq!(j.await, (1, 2, 3)); // much better
Being able to accept multiple arguments is useful for all methods we've described so far. Additionally all of them could, and probably should, be written as variadics rather than regular functions.
Methods
So far we've only talked about "free functions" (standalone functions), and not
"methods" (functions that are part of a struct
). In Rust, structs can't have
associated macros, which means that variadics wouldn't be possible there, which
in turn means we'd lose out on ergonomics.
But there's another reason: to our knowledge it's currently impossible to define
the try_
variants directly on Future
without introducing an intermediate
TryFuture
type.
In the future specialization might make it possible to implement the try_
methods for futures with the right signature. But in the mean time if we're
thinking of the core concurrency mechanisms and how they may some day live in
std, having them be variadic free functions seems like the better choice all
around.
Async Concurrency: JavaScript
JavaScript is an asynchronous-first language that in recent years has chosen
Promise
+ async/await
as its core concurrency mechanism. There are two core
differences between JavaScript Promises and Rust's Futures:
- When a
Promise
is created it is immediately scheduled on the microtask queue (executor), while in Rust aFutures
is only scheduled when.await
ed. - Every
Promise
in JavaScript is fallible (reject/resolve
), while a RustFuture
can be either fallible or infallible, depending on whether it returns aResult
.
Since the last TC39 meeting, JavaScript has the following 4 methods for concurrency in Promises:
name | signature | description | stage |
---|---|---|---|
Promise.allSettled | (iterable) | does not short-circuit | 4 (stable) |
Promise.all | (iterable) | short-circuits when an input value is rejected | 4 (stable) |
Promise.race | (iterable) | short-circuits when an input value is settled | 4 (stable) |
Promise.any | (iterable) | short-circuits when an input value is fulfilled | 2 (experimental) |
What we're proposing in this post is fairly similar, but unlike JavaScript we're able to discern between fallible / infallible Futures which makes allows us to make use of that in the design. In fact, for fallible Futures they compare really well:
JavaScript | Rust | description |
---|---|---|
Promise.allSettled | future::join | does not short-circuit |
Promise.all | future::try_join | short-circuits when an input value is rejected |
Promise.race | future::select | short-circuits when an input value is settled |
Promise.any | future::try_select | short-circuits when an input value is fulfilled |
Async Concurrency: Golang
So I'm by no means a Golang expert, but I know we've been looking at it quite a
bit when trying to figure out futures concurrency. In particular the Golang
select
blocks have an equivalent in the
existing
futures::select
macro:
use futures::select;
let mut a = future::ready(4);
let mut b = future::pending::<()>();
let res = select! {
a_res = a => a_res + 1,
_ = b => 0,
};
assert_eq!(res, 5);
It seems well-understood that the macro in its current form wouldn't make it
into the standard library. Which would mean that this form of select
would
probably need to be elevated to the language level, which would require a fair
amount of design work, and at waiting until at least a new edition.
Side note: this is not to say that I'm proposing the addition of any API to stdlib quite yet. But I like to consider the possibility that at some point in the future we may want to. So in order to have a fruitful conversation about whether we should, I want to make sure we don't have to question whether we could.
While select
blocks seem cool, I think it'd be preferable if we could solve
the same problems without needing changes to the language. And I think with the
select
/ try_select
functions we're proposing we do!
The biggest difference with Go's select
blocks is that all "branches" in our
select
functions resolve to the same type, while in at least the Rust/Go
version it seems like it's possible to return multiple return types trough an
Either
type.
Pattern: log all errors in try_select
One question that comes up frequently is: "how do I log all errors inside a
select
call?" The way to do that is to map_err
(or equivalent) the future
before passing it to select
:
let a = a.map_err(|e| dbg!(e));
let b = b.map_err(|e| dbg!(e));
let c = future::try_select(a, b).await?;
If map_err
is not available (as is the case with async_std
), it's possible
to achieve the same behavior by defining a new async block:
let a = async { Ok(a.await.map_err(dbg!(e))?) }
let b = async { Ok(b.await.map_err(dbg!(e))?) }
let c = future::try_select(a, b).await?;
Conclusion
In this post we've covered the challenges of both fallible and infallible async concurrency in Rust, compared it to approaches from two different languages, and formulated a coherent model that covers all cases.
The next step for this proposal is to write experimental implementations for
async-std
to see how well it
works in practice.
In the future we may also want to cover streams concurrency. In particular ways
we could improve upon Stream::for_each_concurrent
/
Stream::try_for_each_concurrent
/ Stream::FuturesUnordered
seem like an
interesting topic. But that's not in scope for today.
Thanks all for reading, and have a good week!
Special thanks to Tirr-c and David Barsky for proof reading!