Futures Concurrency III: select!
— 2022-02-09

  1. process concurrently, yield sequentially
  2. the futures::select! macro
  3. issues with futures::select!
    1. modifying "hello world" can lead to data loss
    2. halt points are hidden in a macro
    3. fuse requirements
    4. trait signatures
    5. debugging errors is hard
    6. select! and the stdlib
  4. concurrent stream processing with stream::merge
  5. concurrently processing a mix of futures and streams
  6. concurrently processing futures in a loop
  7. fairness
  8. shiny future
  9. addendum: ergonomics explorations for operating exclusively on futures
  10. the async rust concurrency tables: completed
  11. conclusion
  12. appendix: async rust concurrency operations
    1. 1. concurrency adapters by fallibility mode and output handling
    2. 2. concurrency adapters by response start and output handling

In our last post on futures concurrency we talked about the various ways futures in Rust can be awaited concurrently. But we didn't cover one mode: awaiting multiple futures concurrently and resolving them as soon as they're ready, without ever discarding any data. This mode of concurrency is what is conventionally provided by the futures::select!{} macro.

In this post we're going to take a look at how this mode of concurrency works, take a closer look at the issues select! {} has, discuss Stream::merge as an alternative, and finally we'll look at what the ergonomics might look like in the future. This post is part of the "Futures Concurrency" series. You can find all library code mentioned in this post as part of the futures-concurrency crate.

process concurrently, yield sequentially

In our last post we wrote an example where we raced multiple futures, and got the output from the first future which resolves:

let a = future::ready(1).delay(Duration::from_millis(100));
let b = future::ready(2).delay(Duration::from_millis(200));
let c = future::ready(3).delay(Duration::from_millis(300));
assert_eq!([a, b, c].race().await, 1);

Using race gets the future a as soon as it resolves, but discards futures b and c, making their content unavailable. If we don't want to discard any futures we can use join, but this requires waiting until all futures have been resolved before we can operate on their output. What if we wanted something that sits between the two: How can we get the result of all futures, but process them one-by-one as soon as they've resolved?

If we pull up the table we introduced last post, we can now add a third column. Instead of waiting for all outputs or just the first output, we're now handling all outputs one-by-one, as soon as they're ready:

Wait for all outputsWait for first outputHandle output one-by-one
Continue on errorFuture::joinFuture::try_race???
Return early on errorFuture::try_joinFuture::race???

This is the table we've been working with so far, and it makes sense to continue filling it out. But maybe if we disregard "fallibility" for a moment, there's another table we can create? Like race we start handling items as soon as the first item is resolved, but instead of discarding the remaining items, like join we are interested in all items:

Handle all itemsDiscard some items
Response starts on first item???Future::race
Response starts on last itemFuture::join???

For the majority of this post we'll be focusing on the top-left of this new table: "response starts on first item" and "handle all items". We'll cover the bottom right mode of concurrency towards the end of this post.

Let's start by taking a look at how the futures crate exposes this mode of concurrency first.

the futures::select! macro

The futures crate has a solution for this mode of concurrency through the select!{} macro. The tokio runtime also has a variant of select! {}, but it behaves slightly differently. Both the futures and tokio variants of select behave largely the same, so the broader points we're making apply to both. However where relevant, we will look closer at how the two differ.

The select! {} macro introduces a custom DSL which resembles match blocks, but with a few twists. Unlike a match block which takes a single input, select! {} takes multiple futures, creates named bindings to them when they resolve, and executes a block when it executes. It also has a complete case (else in the tokio variant) which runs when no more data is available from any of the input futures.

Here's a basic example of how select!{} is used, adapted from the async book. This takes multiple asynchronous streams of numbers and sums them (playground):

use futures::stream::{self, StreamExt};
use futures::select;

// Create two streams of numbers. Both streams require being `fuse`d.
let mut a = stream::iter(vec![1, 2, 3]).fuse();
let mut b = stream::iter(vec![4, 5, 6]).fuse();

// Initialize the output counter.
let mut total = 0;

// Process each item in the stream;
// break once there are no more items left to sum.
loop {
    let item = select! {
        item = a.next() => item,  // 1. Get the next future in the stream,
        item = b.next() => item,  //    assign its `Output` to `item` when it resolves.
        complete => break,        // 2. The `complete` case runs when both streams are exhausted. 
    };
    if let Some(num) = item {
        // Increment the counter with the value from the stream.
        total += num;             
    }
}

// Validate the result.
assert_eq!(total, 21);

This example is simplified. It can roughly be broken into three steps:

  1. Create the streams and fuse them. fuse makes it so once a Stream returns None, it guarantees to keep returning None instead of potentially panicking 1.
  2. Create the main processing loop using select!, summing the numbers together.
  3. Validate the result is correct.
1

Unfused streams are not guaranteed to panic if they are accessed again after they've been exhausted. But they definitely can panic, and it's not unlikely that when we have generators they will, just like Future, panic if they're called again after they've been exhausted.

Issues with futures::select!

Now that we've talked about the select! {} macro and the mode of concurrency it enables, it's time we start looking at the issues with it. Because oh boy are there many. My personal opinion has always been that select! {} seems hard to use, and feels off. But in writing this post I've had to substantiate this feeling with actual evidence, and I don't like what I've learned. Not. At. All.

In this section we'll cover different aspects of how select! {} falls short. But if you want my brief, summarized opinion: I view select! {} as easy to misuse, hard to debug, difficult to learn, when used correctly makes for incredibly hard-to-read code, and seems unlikely to being included in the stdlib or language. I think this qualifies as a strong statement. Now let me back it up by sharing my reasoning for each part.

modifying "hello world" can lead to data loss

select! {} is not limited to implementing a single form of concurrency: it is flexible enough that it can be used to implement different concurrency primitives. This makes it hard for both the compiler and humans to validate whether it's used correctly. Which is not great when performing concurrent operations in a potentially multi-threaded system. This can be exceptionally hard to debug.

Let's take our earlier example, and instead of taking numbers from a stream and incrementing a counter, let's do something we might encounter in the real-world. For example, in one select arm we may want to process items from a stream, and in the other we may want to read data from some handle and send it into a channel:

// before
loop {
    let item = select! {
        item = a.next() => item,
        item = b.next() => item,
        complete => break,
    };
    ...
}
// after
loop {
    futures::select! {
        _ => read_send(&mut handle, &mut channel).fuse() => {} // read data from a handle and send it to a channel
        item => stream.next() => { ... }                       // process data from a stream
        complete => break,                                     // all work completed, stop the loop
    }
}

If you squint you can see that the shape of both loops remained roughly the same. The most significant change is that instead of calling stream.next() in both arms, we have switched out one arm to use read_send instead. We can no longer rely on the futures being fused, so we need to manually do that on the future now instead. But what you probably didn't expect is that this code is fundamentally broken, and your program will lose data.

The second example is the exact pattern Tomaka covers in their 2021 post on async Rust. They based this post on their experience writing production async Rust code for several years, and described this issue as follows (emphasis is Tomaka's):

It is always possible to solve this problem in some way, but what I would like to highlight is an even bigger problem: these kind of cancellation issues are hard to spot and debug. In the problematic example, all you will observe is that in some rare occasions some parts of the file seem to be skipped. There will not be any panic or any clear indication of where the problem could come from. This is the worst kind of bugs you can encounter.

What's happening here is that we intended to express: "process concurrently, yield sequentially" semantics. But what we ended up writing was "race" semantics. The reason for this is that read_send and stream.next() are racing, and if stream.next() completes first, the read_send future is discarded. This can lead to data from handle having being read, but not yet sent into the channel. Which means the next time we read data from handle, the cursor will have advanced, and data is now lost.

You may be wondering why stream.next() does work, but read_send does not. The short explanation is that the state of the stream.next future is entirely contained within stream. Which means that if we re-create the future on each iteration, we can continue right where we left off. read_send on the contrary has state local to the future. Which means when we recreate the future, we lose that state, regardless of how far we got.

In their post Tomaka appears to primarily attribute the issue they're experiencing to all futures being cancellable. And that makes sense: if future can't be cancelled, it can't lead to data loss. And if select! only ever operates un cancellable futures, then we've successfully guarded against this issue! But in my opinion the issue is less with all futures being cancellable (I think this is good), but instead that depending on how you configure select! you may unintentionally be presented with "race" semantics!

In general I'd recommend folks don't attempt to use select! for this mode of concurrency and instead spawn (local) tasks instead. This is largely how over the years I've avoided to ever have to write select! statements using futures. But because we're talking about the issues with select! we should at least take a look at what a correct solution using select! might look like:

// Create the future we're polling once, outside the loop so it
// doesn't drop between iterations.
let mut read_send_fut = read_send(&mut handle, &mut channel);
loop {
    futures::select! {
        _ => read_send_fut => {
            // Whenever the last future resolves, we need to re-create it for the next loop.
            read_send_fut = read_send(&mut handle, &mut channel);
        }
        item => stream.next() => { ... }
        complete => break,
    }
}

Let me be honest: I don't feel good about this solution. We're instantiating the exact same future in two separate places. And because this is an example, we're lucky in this case that we only do this for the one future. In real-world examples we might want to repeat this more often. We could probably abstract the future instantiation using a function. But I'm hesitant to introduce further abstractions.

To me this code feels brittle, riddled with assumptions which are not directly obvious from the code, and it's becoming hard to see what it is that we're actually trying to do here. And that's an issue. select! is so flexible, it stops being able to accurately capture intent. This makes it impossible for the compiler to validate we're using it right. And hard for people reading the code to understand what we're trying to do. Because it's possible to mis-use select! it means that any diligent code-reviewer will manually need to validate select! blocks are right, every time they're modified. Because the consequences of failing to do so can have very real consequences.

An alternative to select! {} should only expose a single concurrency mode from its API, making it impossible to accidentally opt-in to a different mode. One of Rust's core strengths is the ability for the compiler to validate our usage of APIs is correct. And for something as fundamental as a concurrency mode we should absolutely be making use of this.

halt points are hidden in a macro

In my post on async cancellation, I showed the following example displaying every point where code can be halted. You can think of "halting" as "this is where the function will potentially stop executing, so we need to think about how to clean up local state when it does":

// Regardless of where in the function we stop execution, destructors will be
// run and resources will be cleaned up.
async fn do_something(path: PathBuf) -> io::Result<Output> {
                                        // 1. the future is not guaranteed to progress after instantiation
    let file = fs::open(&path).await?;  // 2. `.await` and 3. `?` can cause the function to halt
    let res = parse(file).await;        // 4. `.await` can cause the function to halt
    res                                 // 5. execution has finished, return a value
}

Something which makes select! {} difficult to reason about with regards to halting (and cancellation) is that it obfuscates potential halt points by not requiring that they are marked with .await:

loop {
    let item = select! {
        item = a.next() => item, // 1. .await is hidden here, which can cause a halt
        item = b.next() => item, // 2. .await is hidden here, which can cause a halt
        complete => break,
    };
    ...
}

Generally the rule is that halt points contained in function bodies are annotated using ? or .await. But select! hides .await statements internally, meaning they can no longer be seen in in the function body. Someone unfamiliar with select!'s semantics may not expect it to cause the function to halt, which can lead to bugs. An alternative to select! {} should use .await to annotate it can halt.

fuse requirements

When a Future is created through using the async keyword, if it's polled after it's been completed, it will panic. Similarly, once we have generators in the language, creating a Stream through an async gen fn we would logically expect that to panic too if we call it after it's been completed 2.

2

The compiler guards against "poll-after-completion" errors in most cases because .await takes ownership of futures. This means if you call .await twice on the same future, it'll fail to compile with a "use of moved value" error. The "panic if this future is polled after it's completed" error is kind of like the runtime version of the move semantics guard, for when a future is manually polled to completion. While not required by the Future trait, I think it's good practice for most futures to implement similar "poll-after-completion" behavior as futures created through the async keyword.

"fuse semantics" mean slightly different things for futures, iterators, and streams:

Futures passed to select! must implement fuse semantics so that we can track the completion status of each individual future. If we poll a future and it This cannot be tracked inside the select! {} macro because select! {} does not take ownership of individual futures. Once a select! {} block has yielded, the futures within it can be accessed again, which is often used to pass them back into another select! {} macro on a subsequent loop.

"Does my future implement fuse semantics" is checked during compilation in the futures::select! macro, but left as an exercise to the user when using tokio::select!. This makes the futures variant more resilient than the tokio variant, but in both cases it's annoying that we have to care about this at all because Future and async/.await were not designed to default to implement fuse semantics.

This mismatch causes select! {} requires users to need to educate themselves on what fusing futures is, how to do it, and how not to be caught by runtime errors. select! {} as an abstraction does not fit well with the rest of async Rust, and as a result users end up needing to learn how to work around this mismatch. Which does not make for a great experience, and would be great if we could evade front-loading when teaching folks about async Rust.

trait signatures

select! {} can not only operate on futures, it can operate on asynchronous streams too. There is no real counterpart to Future in the stdlib (the syn version of "asynchronous value" is just "a value"), but the counterpart to AsyncIterator (née: Stream) is Iterator. An unfortunate consequence of select! {} is that it has lead to a mismatch between the signatures of futures::FusedStream and FusedIterator. FusedStream has a mandatory method: is_terminated which needs to be implemented. FusedIterator does not have this method.

The is_terminated method on Future allows a third state to be represented in the Future poll model: "this future has completed". For Stream we can represent this state by yielding Poll::Ready(None). But in Future we only have the choice between Poll::Ready(T) and Poll::Pending, neither of which means "I'm done". As remarked in the original issue discussing FusedFuture, the need for having the is_terminated method would go away if select! {} would have an internal loop statement. If the loop lives internally, state tracking can be moved interally as well, and we no longer have the same issues. But unfortunately select_loop! was deemed as being too inflexible, and instead the choice was made to go with FusedFuture / FusedStream instead 3.

3

select_loop! is so close to what we're proposing as the solution in this post. I only found out about it towards the end of editing this post, but it feels like this was just a few iterations away from what we're proposing we do here instead.

The reason why discussing trait mismatches is important is because a guiding principle for of the async foundations WG is parity between sync and async Rust:

Async Rust should be a small delta atop Sync Rust. People who are familiar with sync Rust should be able to leverage what they know to make adopting Async Rust straightforward. Porting a sync code base to async should be relatively smooth: just add async/await, adopt the async variants of the various libraries, and you're done.

Introducing FusedFuture, FusedStream and select! {} into the stdlib would mean we'd need to take one of three decisions:

  1. We accept that FusedStream and FusedIterator have a different signature, and all the consequences that carries regarding parity with std rust.
  2. We re-work select! {} to not rely on FusedFuture::is_terminated. I believe this has been tried though, and it didn't work out.
  3. We re-work select! {} to not require fuse semantics. This would require select! {} take ownership of futures, which would lead to data loss.

select! {}'s relationship to the Fused traits is not good, and all options how to resolve it carry distinct downsides. The best solution would be to look for alternatives to select! {} which implement the same concurrency mode, but do not rely on fuse semantics on the types they operate on.

debugging errors is hard

If you misuse select! {}, how do you find out? Implementing race semantics where you didn't mean will lead to data occasionally being discarded. Depending on the data you're reading this can lead to hard-to-detect bugs. There's no guarantees errors will be thrown, it's not obvious from the code that we're racing, and we have few (if any) tools or lints to detect races like these. The best chance at catching issues with this before they occur is to have a thorough test suite which can pinpoint this error before it's shipped. But in my experience this is not a given.

Debugging work is often 90% understanding the bug, and 10% fixing the bug. And in order to catch bugs with select! {} you need to understand how the macro works, how async cancellation works in Rust, find out exactly where it's used, and finally: employ a fix. This is the type of work that even experts in async Rust might struggle with (hi).

It doesn't mean that we can't do anything about this though. We could apply runtime probing to find out where futures are being cancelled in a loop. Or static lints which catch potential mis-configurations of select! {}. But that's speculation, and will never be as good a fix as having APIs where bad states are unrepresentable.

select! and the stdlib

Objectively select! {} has many other issues, the first being its relation to the stdlib. As we're looking to add more async functionality to the stdlib, foundational components for async concurrency will be part of this. So it's worth asking how we could add select! {}'s functionality to the stdlib.

Introducing select! {} as a macro would be the first option. But if we introduced it today, it would immediately become one of the most complicated macros in the stdlib. select! {} really is shaped like a control-flow primitive, and Rust tends to prefer keywords over macros for control flow 4. Rust cares deeply about diagnostics, and for something as fundamental as control flow we would want to provide comprehensive errors, explainers, and suggestions.

4

The try! macro became ?. The await! macro became .await. Control flow constructs in Rust tend to be introduced as keywords instead, but the bar for those is incredibly high.

Adding select as a keyword to Rust would possibly be harder; the barrier for keywords in Rust is incredibly high. In order for a new keyword to be introduced, an irrefutable case must be presented. For .await we proved that it enabled things that could otherwise only be done using unsafe. const in generic position were added because it enabled computation within the type system. select in comparison merely adds a fifth mode of concurrency to async Rust 5. This is hardly comparable in impact. Especially if we can prove that we can provide the same concurrency primitives entirely using library functions.

5

crossbeam_channel::select exists for non-async Rust and is similar in shape. But it operates over channels only, and cannot yet be mixed with async Rust. Any proposal for a select keyword would need to carefully consider how select would interact with both async and non-async Rust.

concurrent stream processing with Stream::merge

So far we've taken a look at the mode of concurrency we're trying to implement, and the issues the select! {} macro has. This is the part where we move past that and show a surprisingly simple alternative solution to this type of concurrency: Stream::merge. We'll be stepping away from futures for a second to talk about streams, but we'll get back how this relates to futures as well in a bit.

Fundamentally what we want to be doing is awaiting multiple streams concurrently; processing items from either stream as soon as they're ready. In async-std we introduced Stream::merge about two years ago (!) for this exact purpose. It now also exists in futures-concurrency, the companion library to this series. But for this example we'll stick to async-std.

The way Stream::merge works is that it takes two streams, and merges them as if they were a single stream. Data comes out as soon as it's ready, and can be handled just like any other stream:

use async_std::prelude::*;
use async_std::stream;

// Declare our streams.
let a = stream::iter(vec![1, 2, 3]);
let b = stream::iter(vec![4, 5, 6]);

// Initialize the output counter.
let mut total = 0;

// Combine both streams, and add each value to the total.
a.merge(b).for_each(|num| total += num).await;

// Validate the result.
assert_eq!(total, 21);

This code is functionally equivalent to the select! {} loop we introduced at the start of this post. Except it's significantly less code, does not require manually fuseing streams (Merge tracks the state), and importantly: does not introduce any new syntax 6.

This is not even the shortest version this can be, using the stream::Sum trait we can shorten this specific example to:

use async_std::prelude::*;
use async_std::stream;

let a = stream::from_iter(vec![1, 2, 3]);
let b = stream::from_iter(vec![4, 5, 6]);
assert_eq!(a.merge(b).sum().await, 21);

Conceptually this seems so simple, it almost feels too good to be true. But it isn't: there are no hidden Unpin bounds, no secret FusedStream requirements, or unexpected gotchas. We got to this solution by closely analyzing the mode of concurrency select! {} exposes, and finding prior art for it in rxjs.

Concurrently processing a mix of futures and streams

Perhaps you're wondering whether Stream::merge can only replace select! {} loops in simple cases. What about differently-typed streams with different signatures and individual futures mixed in. That's exactly what select! {} was designed for. For the sake of brevity I'll skip the select! {} variant and only show how well Stream::merge is capable of handling this7:

7

This reminds me we made mistakes in async-std. stream::once really should be able to take a Future rather than a regular value. This example pretends we didn't make that mistake.

use async_std::prelude::*;
use async_std::stream;

// Create a shared output enum, used by all stream.
enum Message {
    Num(u8),
    Text(&'static str),
}

// Create a variety of streams from futures and iterators, and map them to the
// same output type
let a = stream::from_iter([1, 2, 3]).map(Message::Num);
let b = stream::from_iter(["chashu", "nori"]).map(Message::Text);
let c = stream::once(async { "hello" }).map(Message::Text);

// Merge all streams and handle each output one-by-one.
let mut s = a.merge(b).merge(c);
while let Some(msg) = s.next().await {
    match msg {
        Num(n) => println!("received a number: {}", n),
        Text(s) => println!("received a string: {}", s),
    }
}

We can even add short-circuiting by using a generalized abort mechanism like stop-token, or adding another enum variant that maps to break whenever called.

This shows how Stream::merge implements the "process items as soon as they're available" mode of concurrency, the same as select! {}. And importantly, it does so using regular Rust types and techniques.

The least comfortable part of what we've written is type unification using enum Message {} and map(Message::Type). But making sure types align is a common Rust issue, and using an enum for that is the common solution. The semantics here are a lot less special than those introduced by select! {}, which should make this significantly easier to get the hang of. Especially if we document it right.

If we're thinking about a future where the stdlib provides a complete solution for futures concurrency out of the box, then the options for select! {} look rather unappealing. We could choose to ship the macro as-is, and forever deal with manual fuse calls, diverging traits [1, 2], and daunting errors.

Or we could promote select into a keyword. This could do away with the requirement to manually invoke fuse for us. But would come with the downside that we now have a syntax which is unlike anything else. The arms are different from those found in match, and it would likely be uniquely scoped to async contexts. Introducing new keywords is an incredibly high barrier to clear, and have my doubts whether select could make it.

Concurrently processing futures in a loop

Earlier in this post we looked at Tomaka's post which featured data-loss issues with select! {}. Let's bring up the example verbatim from the post:

// open our IO types
let mut file = ...;
let mut channel = ...;

// This loop re-creates the `read_send` future on each iteration. That means if
// `socket.read_packet` completes, `read_send` may have read data from the file,
// but not finished writing it to a channel, which can lead to data loss
loop {
    futures::select! {
        _ => read_send(&mut file, &mut channel) => {}, // uh oh, data loss can happen here
        some_data => socket.read_packet() => {
            // ...
        }
    }
}

The issue was that we accidentally opted into "race" semantics when we didn't mean to, which could lead to data loss. In their post, Tomaka mentions there are different ways of solving this, including my personal choice: spawn each individual operation on a separate task and await them But for the sake of example, let's take a look at how we can implement the logic above using Stream::merge:

// open our IO types
let mut file = ...;
let mut channel = ...;

// Create our shared message type
enum Message {
    None,
    Data<Vec<u8>>,
}

// Create two streams by creating and awaiting futures in a loop. We unify the output
// of both futures through the `Message` enum.
let a = stream::repeat_with(|| async { read_send(&mut file, &mut channel).await }).map(Message::None);
let b = stream::repeat_with(|| async { socket.read_packet.await }).map(Message::Data);

// Merge both the streams and wait for their output one-by-one.
let mut s = a.merge(b);
while let Some(msg) = s.next().await {
    match msg {
        Message::None => continue,
        Message::Data(s) =>  ...,
    }
}

This creates an endlessly looping stream for sending data from the file into the channel, and another endlessly looping stream to read data from the socket. It's definitely more verbose than the original select! {} example, but we also have 100% less data loss. Instead of dropping and re-creating read_send each time read_packet makes progress, both just keep moving forward the way you would expect them to.

Would I write this for a project I'm working on? Probably not; I'd likely use async-task-group instead. But I think this is helpful to show a practical example of a well-publicized issue with select! {} and how Stream::merge would help guard against that too.

Fairness

There is one issue with merge we haven't covered yet: fairness. Let's look at the example which merges three streams into one:

let a = stream::repeat(1).take(3);
let b = stream::repeat(2).take(3);
let c = stream::repeat(3).take(3);
let d = stream::repeat(4).take(3);

let s = a.merge(b).merge(c).merge(d);

Streams a, b, and c will yield a number exactly three times. Because they always have an item available futures will always immediately yield the data that they have, instead of breifly waiting. Which means that if we implement our merge function without accounting for this, we would end up with:

assert_eq!(s.collect(), vec![1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4]);

This is not what we want: this has exactly the same semantics of Stream::chain, and will exhaust streams in-order. Ideally we'd like a random mix of 1s, 2s, 3s, and 4s. In order to do that we need to randomly pick between the items available from merge. Unfortunately because we're merging streams by chaining calls to merge we can only pick between two streams at a time 8. This causes the chance we'll pick an item from a stream to be skewed. For the first 3 items in the stream, what we want is for each number to have exactly a 25% chance to be picked. But instead the chance we'll get a number is skewed as follows:

The way to fix this is by making merge aware of all streams we're merging, so we can pick fairly between them. We could do this either by creating a merge! macro, or by doing what we chose to do in futures-concurrency: implement a trait for tuples 9. Here's what using that looks like today:

8

The reason for this is that when we call a.merge(b) the merge function only knows about a and b. If b holds the stream produced by c.merge(d), then the probabilities divided between a, b, c are now skewed. This is really annoying, and the only way we can resolve this is if we know how many items we need to choose from ahead of time.

let a = stream::repeat(1).take(3);
let b = stream::repeat(2).take(3);
let c = stream::repeat(3).take(3);
let d = stream::repeat(4).take(3);

let s = (a, b, c, d).merge();

This will gives us exactly the probability we're looking for, ensuring one stream will not consistently be exhausted before another:

In past post's we've discussed the arity of the join APIs and its API consequences at length. But we haven't done so considering fairness specifically. For the join family of APIs, it's generally an all-or-nothing approach, so fairness is as not much of an issue 10. But for race, which only returns a single item, it might be. And so the same arguments we're making here in favor of a trait-based approach for merge apply to race as well.

10

Future::try_join can return a single item when handling an error, so fairness could be an issue. So it's not an exact science. But I'm not sure how important that is though. Maybe we just should account for it?

Shiny Future

Thinking about a future where we introduce Stream::merge (or more likely: AsyncIterator::merge) into the stdlib paints a wholly different picture. It already works well as-is. But as we add more features to the language that we want anyway, using Stream::merge would in turn become nicer to use as well. So let's have fun shall we. What if Rust had all the language features, what could our code look like then?:

use std::async_iter;

// The shared output type
enum Message {
    Num(u8),
    Text(&'static str),
}

// Create our streams and map them to the shared output type.
let a = [1, 2, 3].into_async_iter().map(Message::Num);
let b = ["chashu", "nori", "lily"].into_async_iter().map(Message::Text);
let c = async_iter::once(async { "hello" }).map(Message::Text);

// Merge the streams, iterate over them, and handle the output sequentially.
for await msg in (a, b, c).merge() match {
    Num(n) => println!("received a number: {n}"),
    Text(s) => println!("received a string: {s}"),
}

So far these features seem somewhat plausible: these are all features that are fairly straight-forward syntactic sugar for existing patterns. And importantly: none of these features are specific to the operations we're doing here. Which means that even if we're looking at possible new language features here, none of these are specific to async concurrency. But what if we really took it further. Let's dream big for a moment here. What if Rust also had:

// Create our streams and map them to the shared output type.
// `into_iter` here assumes "async overloading".
let a = [1, 2, 3];
let b = ["chashu", "nori", "lily"];
let c = async { "hello" };

// Merge the streams, iterate over them, and handle the output sequentially.
// `merge` expects `T: async IntoIter`, which all types implement, so it just works.
// The type of `msg` here is `u8 | &str`. This is inferred from the input types
// and represented in the match statement.
for await msg in (a, b, c).merge() match {
    n: u8 => println!("received a number: {n}"),
    s: &str => println!("received a string: {s}"),
}

I can't say for sure whether this is a good idea or not. We're actively investigating if async overloading can work. And I have no clue if an async Iterator for T where T: Future blanket impl is a good idea. But the point of this example is to have fun with it. To go big on the "what if". Maybe none of these features will ever land. But it's still a useful exercise to understand to which degree ergonomics are inherent of the API, and to which degree they're tied to other factors.

The future of Rust is what we make of it, so who knows if any of this will ever happen. But even if it doesn't: we've already shown that achieving this is possible using today's Rust. What we're showing here is purely just to make it so we need to write less code to achieve the same effect.

Addendum: ergonomics explorations for operating exclusively on futures

Hi, editor Yosh here. I'm adding this section close to the time I'd hit publish on the post. My co-worker nrc helped review this post, and they asked me whether I could show an example using futures exclusively.

We could directly implement merge on Future, creating a Stream, but that wouldn't lend itself well to chaining multiple futures: a.merge(b).merge(c) first creates a stream from a.merge(b), then tries to merge future c to it, which won't work.

Working around this problem requires implementing the Merge trait for Future containers as well, which would not work on stable Rust today. We'd want to use negative trait bounds to express the right bounds:

And even with these bounds, merging Stream and Future types would still mean we'd require casting all futures to streams first. We could overcome this by having a third trait, IntoMergable or something, and then making our generic container impls rely on that:

But that doesn't seem great. In general I think it's worth assessing the following considering ergonomics in this order:

  1. How often does merge-ing futures come up in practice?
  2. Is that often enough that it would warrant changes to the language?
  3. Would impl IntoStream for T: IntoFuture be a feasible?

As I mentioned in earlier examples, my intuition is that whenever we want to merge futures, we likely could solve the problem more elegantly using task::spawn{,_local}. Meaning if we design APIs such as TaskGroup well, we could carry Stream::merge-like semantics through this. In fact, that is exactly what Swift's TaskGroup type does. It can asynchronously iterate out all results from the group, allowing the caller to handle them one-by-one.

So to summarize this side-adventure: if we're looking to improve the semantics of using Stream::merge using futures only, there are a fair number of questions and avenues we should explore before we draw any definitive conclusions.

The async Rust concurrency tables: completed

We can now begin filling in the original async Rust concurrency table using Stream::merge. Because Stream::merge enables on items to be operated one at the time, it can both "continue on error" and "return early on error":

let s = a.merge(b);

// continue on error
while let Some(item) in s {
    // do something with `item`.
}

// return early on error
while let Some(item) in s {
    let item = item?;
    // do something with `item`.
}

This means Stream::merge allows you to choose from either short-circuiting behavior:

Wait for all outputsWait for first outputHandle output one-by-one
Continue on errorFuture::joinFuture::try_raceStream::merge
Return early on errorFuture::try_joinFuture::raceStream::merge

Regarding our second table, Stream::merge enables the concurrency mode where we "handle all items" and "start responding on the first item". This is the top-left of the table. But what about the case where we're only interested in the last item, and want to discard the rest? It turns out that that's a subset of the functionality exposed by Stream::merge, which we can get by using Stream::last:

Handle all itemsDiscard some items
Response starts on first itemStream::mergeFuture::race
Response starts on last itemFuture::joinStream::merge + Stream::last

Seeing this table as it is, you might wonder what the difference is between e.g. Future::join and Stream::collect. Or converting futures into streams and then breaking on the first item. You'd be right: in theory we could model everything using streams and call it a day. But in practice it wouldn't feel good.

The reason why Stream::merge is the right answer for combining N futures is because it allows us to express the time aspect of the operation. We don't want all or nothing. We want items, in any order, as soon as they're made available to us. Streams are "data over time". And even if the data is originally held in futures, the time aspect can only be expressed using streams.

In contrast, the Future::join and Future::race family of operations do not rely on the same time aspect. join yields all items. race just yields the one. So converting futures via streams back to singular items is an indirect mapping which ends up feeling clunky to author. And worse: it creates opportunities to introduce bugs. Future::join and Future::race may be subsets of Stream::merge, but they are unambiguous in what they express which makes them a useful abstraction.

Overall this We might still want to change the names of the concurrency combinators later on. But the core functionality, shape, and usage all feels stable and consistent.

Conclusion

In this post we've looked at Rust's 5th mode of concurrency: "Await multiple futures concurrently and operate on them as soon as they're ready." And in passing also covered the 6th mode of concurrency: "await multiple futures concurrently and discard all values but the last". We've shown how this can be handled using the futures::select! macro. How this can better be handled using Stream::merge. And talked about the various considerations and future directions we have for this design.

We've also done a real-world case analysis of an accidental mis-use of the select! macro, and shown how Stream::merge would've prevented the same issue from happening by making the failure case unrepresentable. Overall Stream::merge presents itself as a viable alternative to select! for the "handle one future at a time without ever discarding any" model of concurrency.

Overall I'm incredibly pleased with the progress we've made with this series. After nearly three years of work we finally have documented all modes of concurrency for async Rust, and their corresponding interfaces. We can now start experimenting with naming, polish, documentation, and then possibly (finally!) start bringing some of these things into mainline Rust via RFCs. Things will likely change between now and then, but it's good to finally stop needing to think about semantics, and being able to focus more on ergonomics. It's been a slow process, but I'm incredibly happy with the outcome. And thank you for following along over all of these years!

If you liked this post and would like to see my cats, you can follow me on Twitter.

Thanks to: Ryan Levick, Nick Cameron and Irina Shestak for reviewing drafts of this post.

Appendix: Async Rust Concurrency Operations

These are all the concurrency operations we want to be able to express in async Rust.

1. Concurrency adapters by fallibility mode and output handling

Wait for all outputsWait for first outputHandle output one-by-one
Continue on errorFuture::joinFuture::try_raceStream::merge
Return early on errorFuture::try_joinFuture::raceStream::merge

2. Concurrency adapters by response start and output handling

Handle all itemsDiscard some items
Response starts on first itemStream::mergeFuture::race
Response starts on last itemFuture::joinStream::merge + Stream::last