Streams Concurrency
— 2019-12-21

  1. primer: data modeling
  2. one-to-one
  3. many-to-one
  4. one-to-many
  5. collecting streams
  6. creating streams from collections
  7. cancelling streams
  8. parallel streams
  9. concurrency in futures-rs
  10. looking ahead: language support
  11. conclusion

In our last post about Async Rust we looked at Futures concurrency, and before that we looked at Rust streams. In this post we bring the two together, and will take a closer look at concurrency with Rust streams.

Primer: Data Modeling

Much of async programming is about modeling relationships in code. So before we dig into streams concurrency, it's worth talking about the relationships we're trying to model first.

In the wild you may see relationships expressed as "M:1" or "MPSC", but all they are is descriptors of relationships. I've written about these relationships before if you're interested in learning more. But for the sake of this post, here's a brief overview:

RelationshipKindGraph KindChannel KindExample
one-to-one1:1ListSPSCIterating over stream output in a loop.
one-to-many1:NTreeSPMCWriting stream output to multiple destination channels.
many-to-oneN:1TreeMPSCCombining multiple streams into a single stream.
many-to-manyM:NGraphMPMCAn event bus where each node can both emit and listen.

In theory any of the relationships can be expressed in terms of any other relationship. But in practice using constructs that were actually designed for the relationship is much nicer.

one-to-one

a sequence of nodes

The most basic way of processing a stream is in a sequence. For each element produced in the stream we perform an operation. We guaranteed a new operation is run only after the previous operation has ended.

In today's Rust this is usually written using a while let Some / await loop or for_each stream adapter:

let a = stream::repeat(1u8);
while let Some(num) = a.next().await {
    println!("{:?}", num);
}

let b = stream::repeat(1u8)
    .for_each(|num| println!("{:?}", num))
    .await;

for_each and while let Some loops are mostly comparable. The only difference is that the for_each cannot use break or continue statements the way while let Some loops can.

Serial parsing of streams is most comparable to synchronous loops. But instead of blocking between iterations of the loop, it waits for the task without blocking any threads through an executor. Just like serial processing of iterators is a common operation, so is serial processing of streams.

many-to-one

many nodes convering to a single node

Sometimes you want to combine multiple streams into a single stream. For example there might be a stream of events coming from a network connection, and a series of events coming from another part of the system through a local channel. You may want to combine these into a single stream of events. In async-std we expose 3 ways of combining streams:

Both chain and zip have counterparts in std::Iterator. However merge is a novel API we've introduced as part of async_std. It allows awaiting two streams in parallel, as if they were a single stream. To see why let's take a look at how we might solve this without Stream::merge:

// Define shared inner loop logic.
fn print(num: u8) {
    println!("num: {}", num);
}

// Exhaust the first stream in one thread.
let a = task::spawn(async {
    let nums = stream::repeat(1u8).take(100);
    while let Some(num) = nums.next().await {
        print(num);
    }
});

// Exhaust the second stream in another thread.
let b = task::spawn(async {
    let nums = stream::repeat(2u8).take(100);
    while let Some(num) = nums.next().await {
        print(num);
    }
});

// Wait until both streams have been exhausted
a.join(b).await;

However with Stream::merge we can greatly reduce this logic by creating a single stream that yields items from both streams as soon as they become available, and spawning fewer tasks in the process:

let a = stream::repeat(1u8).take(100);
let b = stream::repeat(2u8).take(100);
let nums = a.merge(b);

while let Some(num) = nums.next().await {
    println!("num: {}", num);
}

one-to-many

a single node being converted into multiple nodes

Going from a single stream to multiple streams is a bit trickier than what we've seen before. But luckily Rust makes this easy through the channel abstraction. And as part of async-std we've built a channel impl that's both easy and performant.

Channels always come in pairs. When you create a new channel, you get back a tuple of (Sender, Receiver). Whenever a value is written from a Sender, it's received by a Receiver. Both structs implement Clone, Send, and Sync so they can freely be shared around between as many threads and tasks as you want. Receiver also implements Stream which makes it easy to read values with.

In the following example we split a stream of numbers into two streams: one of even numbers, and one of uneven numbers. We pass both Senders on one side, which means that from the other side we're now free to read values from either stream.

use async_std::stream;
use async_std::sync;

let (even_reader, even_writer) = sync::channel(1);
let (uneven_reader, uneven_writer) = sync::channel(1);

let num_stream = stream::repeat(10u8).take(20);
while let Some(num) = num_stream.next().await {
    match num % 2 {
        0 => even_writer.send(num).await;
        _ => uneven_writer.send(num).await;
    }
}

// We can now asynchronously read from `even_reader`
// and `uneven_reader` in separate tasks.

Channels in async-std are incredibly versatile. They're very similar to event emitters in other languages, and can often be used in similar scenarios. The only difference is that channels in async-std currently don't support sending values from a single reader to all receivers; but that's something we're considering adding. This would allow our streams to model many-to-many relationships as well.

Collecting Streams

many nodes convering to a single node

Often times when a computation is over you'd like to store the results somewhere. An example is printing to stdout, but also collecting all output in a vector. In std this is done through Iterator::collect. Similarly with streams this can be done through Stream::collect.

The trick to making collect work is the FromStream trait. This converts a stream into a type asynchronously. In async-std we've implemented this for a lot of std's types. When combined with some of the other stream types this allows for creating really nice pipeline patterns.

// Create a stream of tuples, and collect into a hashmap.
let a = stream::once(1u8);
let b = stream::once(0u8);

let s = a.zip(b);

let map: HashMap<u8, u8> = s.collect().await;
assert_eq!(map.get(&1), Some(&0u8));

There are even more interesting patterns possible; for example if you have a stream of Result<T, E> you could collect into a Result<Vec<T>, E>. This allows short-circuiting the stream as soon as an error occurs.

use async_std::prelude::*;
use async_std::stream;

let v = stream::from_iter(vec![1, 2]);
let res: Result<Vec<u32>, &'static str> = v.map(|x: u32|
    x.checked_add(1).ok_or("Overflow!")
).collect().await;
assert_eq!(res, Ok(vec![2, 3]));

Creating streams from collections

The IntoStream trait implements how to convert any type into a stream. It's the asynchronous counterpart to IntoIterator:

// sync
vec![1, 2, 3].iter().for_each(|n| dbg!(n));

// async, proposed
vec![1, 2, 3].stream().for_each(|n| dbg!(n)).await;

Unfortunately due to the orphan rules we're currently not able to implement IntoStream for any of the std::collection types. This would need to happen as part of futures-core because that is where the Stream trait is defined.

We'd also love to propose adding FromStream to futures-core, but unfortunately it's hard to implement without language support for async fn in traits. In async-std we require a Pin<Box<T>> as the return type, which is less efficient than what we'd want it to be. And for collect to work, we really need both traits.

Cancelling streams

Often it's desirable to stop a stream remotely. If we want to shut down gracefully often the first step is to stop processing new data. And that's something that needs to be initialized remotely.

For that we have the stop-token crate. An experimental stream built on top of channels that provides remote cancellation of streams:

use stop_token::StopToken;

async fn do_work(work: impl Stream<Item = Event>, stop_token: StopToken) {
    // The `work` stream will end early: as soon as `stop_token` is cancelled.
    let mut work = stop_token.stop_stream(work);
    while let Some(event) = work.next().await {
        process_event(event).await
    }
}

We think cancellation is something that should be easy to do, and stop-token is a first attempt at implementing that. All credit to matklad for this.

Parallel Streams

a single stream becoming multiple streams becoming a single stream again

A common pattern when processing data is to create a parallel fan-out / fan-in pipeline. Say we have a stream of data; we'd like to process that data in parallel. And once it's done processing, we either collect it, or iterate over values one-by-one.

In synchronous Rust you can choose to hand-write such a pipeline, or perhaps use crossbeam::scope. But the nicest way of pipelining data is still rayon.

The way rayon works is by providing a parallel version of Iterator called ParallelIterator that, as its name implies allows processing iterators in parallel. Going from sequential execution to parallel execution usually means just replacing iter with par_iter:

use rayon::prelude::*;

fn sum_of_squares(input: &[i32]) -> i32 {
    input.par_iter() // <-- just change that!
        .map(|&i| i * i)
        .sum()
}

Similarly we should be able to apply this model to Stream by introducing a new ParallelStream trait that operates on items in parallel by calling task::spawn under the hood. All that would be needed would be an added call to par.

use parallel_stream::prelude::*;

async fn sum_of_squares(input: impl Stream<Item = u32>) -> i32 {
    input.par() // <-- just add that!
        .map(|&i| i * i)
        .sum()
        .await
}

Unfortunately despite being fairly convinced that this is possible to implement, we currently don't have an implementation of besides this sketch. If you'd like to help out on parallel-stream, let us know on GitHub or Discord and we'll gladly help. We really want to see this exist!

Concurrency in futures-rs

You may have noticed that we haven't mentioned the futures library much. In part that is because it doesn't provide many abstractions that work with executors. But let's take a quick look at the stream concurrency abstractions that are provided:

futures_unordered, for_each_concurrent, and friends occupy roughly the same space as ParallelStream. But ParallelStream has the benefit that it should be a bit more flexible, and make better use of system resources by leveraging an executor.

select! is an abstraction which is somewhat similar to match but operates on streams and futures directly. But it's not without its shortcoming. select! introduces a two new keywords: default and complete. It requires all futures and streams to be manually fused. And has also required changes in futures that have created deviations from the stdlib, which is a cost in itself.

let a_fut = async_identity_fn(1).fuse();
let b_fut = async_identity_fn(2).fuse();

let mut total = 0;
loop {
    select! {
        a = a_fut => total += a,
        b = b_fut => total += b,
        complete => break,
        default => panic!(), // never runs (futures run first, then complete)
    };
}
assert_eq!(total, 10);

It seems that as we're moving towards extending the stdlib with more of futures' core types, we'll eventually need to provide a solution on how to operate on streams. And because there's no precedent for including a macro as complex as select! in the stdlib, it seems likely it would either need to be upgraded to a language construct, or we would need to look at alternatives that can be included.

With async-std we've chosen to look for alternatives instead. And between Stream::merge for combining streams, and stop-token for cancelling them it seems we're well on our way. But we're not at the end of the road yet, and we'd like to keep experimenting and documenting stream adapters until we confidently can replace all uses of select!.

Looking Ahead: Language Support

A bit further down the line it might be interesting to consider what language support could look like for parallel Rust. If we can define parallel counterparts to Iterator and Stream, it begs the question if we could also define parallel counterparts to loop, for, and while.

Take for example the following TCP server. It listens for incoming requests, and processes them in sequence. It has a maximum concurrency of 1, but the code is really easy to follow.

let mut listener = TcpListener::bind("127.0.0.1:8080");
println!("Listening on {}", listener.local_addr()?);

for stream in listener.incoming() {
    let stream = stream?;
    println!("Accepting from: {}", stream.peer_addr()?);
    io::copy(&stream, &stream)?;
}

Now if we try and parallelize the server using async-std today we can suddenly handle thousands of requests concurrently, but the code itself is much less readable. And worse, we can no longer pass errors from the inner scope back up to the outer scope without calling .await on each JoinHandle, and making the request handler serial again.

let mut listener = TcpListener::bind("127.0.0.1:8080").await?;
println!("Listening on {}", listener.local_addr()?);

while let Some(stream) = listener.incoming().await {
    let stream = stream?;
    task::spawn(async move {
        println!("Accepting from: {}", stream.peer_addr().unwrap());
        io::copy(&stream, &stream).await.unwrap();
    });
}

An API such as ParallelStream would provide some alleviation here. It would allow us to implement a parallel server without losing the ability to return errors from scopes.

let mut listener = TcpListener::bind("127.0.0.1:8080").await?;
println!("Listening on {}", listener.local_addr()?);

listener
    .incoming()
    .par()
    .try_for_each(|stream| async move {
        let stream = stream?;
        println!("Accepting from: {}", stream.peer_addr()?);
        io::copy(&stream, &stream).await?;
        Ok(())
    })
    .await?;

But this suddenly feels very different from the API we started with. Code is just as much about reading as it is about writing. And being able to spot loops at a glance is very useful. So instead of only having chained operators, wouldn't it be nice if we could write parallel loops instead? Perhaps something like this:

let mut listener = TcpListener::bind("127.0.0.1:8080").await?;
println!("Listening on {}", listener.local_addr()?);

for par stream.await? in listener.incoming() {
    println!("Accepting from: {}", stream.peer_addr()?);
    io::copy(&stream, &stream).await?;
}

This again looks a lot like the synchronous TCP server we started with. But instead of processing requests serially, it now uses all available cores without blocking.

Now this is not so much a concrete proposal, as a sketch to share what things could look like if we designed them that way. I'd like us to think big on this; following the tradition of making seemingly complex concepts surprisingly accessible. Not only would we be free of data races in parallel code. It'd be incredibly convenient to write as well.

Conclusion

In this post we've looked at different variants of streams concurrency, discussed the current state of support they have in Async Rust, and shared avenues worth exploring to improve the status quo.

Personally I'm very excited about how this post has come together. I feel like Rust is currently on a path to achieve what no other mainstream language has achieved: to make writing parallel code not only correct and performant. But to make it as easy as sequential code.

This might seem like a tall order, but between the diagnostics efforts, language design, compiler work, and libraries it really feels like that's where we're heading. And I couldn't be more excited.


Thanks to: Irina Shestak for the illustrations. And Stjepan Glavina, Sunjay Varma, Niko Matsakis, Withoutboats, Aaron Turon, Aleksey Kladov, Ryan Levick, and Friedel Ziegelmayer.