Streams Concurrency
— 2019-12-21
- primer: data modeling
- one-to-one
- many-to-one
- one-to-many
- collecting streams
- creating streams from collections
- cancelling streams
- parallel streams
- concurrency in futures-rs
- looking ahead: language support
- conclusion
In our last post about Async Rust we looked at Futures concurrency, and before that we looked at Rust streams. In this post we bring the two together, and will take a closer look at concurrency with Rust streams.
Primer: Data Modeling
Much of async programming is about modeling relationships in code. So before we dig into streams concurrency, it's worth talking about the relationships we're trying to model first.
In the wild you may see relationships expressed as "M:1" or "MPSC", but all they are is descriptors of relationships. I've written about these relationships before if you're interested in learning more. But for the sake of this post, here's a brief overview:
Relationship | Kind | Graph Kind | Channel Kind | Example |
---|---|---|---|---|
one-to-one | 1:1 | List | SPSC | Iterating over stream output in a loop. |
one-to-many | 1:N | Tree | SPMC | Writing stream output to multiple destination channels. |
many-to-one | N:1 | Tree | MPSC | Combining multiple streams into a single stream. |
many-to-many | M:N | Graph | MPMC | An event bus where each node can both emit and listen. |
In theory any of the relationships can be expressed in terms of any other relationship. But in practice using constructs that were actually designed for the relationship is much nicer.
one-to-one
The most basic way of processing a stream is in a sequence. For each element produced in the stream we perform an operation. We guaranteed a new operation is run only after the previous operation has ended.
In today's Rust this is usually written using a while let Some / await
loop or
for_each
stream adapter:
let a = stream::repeat(1u8);
while let Some(num) = a.next().await {
println!("{:?}", num);
}
let b = stream::repeat(1u8)
.for_each(|num| println!("{:?}", num))
.await;
for_each
and while let Some
loops are mostly comparable. The only difference
is that the for_each
cannot use break
or continue
statements the way
while let Some
loops can.
Serial parsing of streams is most comparable to synchronous loops. But instead of blocking between iterations of the loop, it waits for the task without blocking any threads through an executor. Just like serial processing of iterators is a common operation, so is serial processing of streams.
many-to-one
Sometimes you want to combine multiple streams into a single stream. For example
there might be a stream of events coming from a network connection, and a series
of events coming from another part of the system through a local channel. You
may want to combine these into a single stream of events. In async-std
we
expose 3 ways of combining streams:
- chain: exhaust "a", then exhaust "b".
- zip: combine "a" and "b" into a stream of tuples.
- merge: combine "a" and "b" into a stream that outputs items from either as they become ready.
Both chain
and zip
have counterparts in std::Iterator
. However merge
is
a novel API we've introduced as part of async_std
. It allows awaiting two
streams in parallel, as if they were a single stream. To see why let's take a
look at how we might solve this without Stream::merge
:
// Define shared inner loop logic.
fn print(num: u8) {
println!("num: {}", num);
}
// Exhaust the first stream in one thread.
let a = task::spawn(async {
let nums = stream::repeat(1u8).take(100);
while let Some(num) = nums.next().await {
print(num);
}
});
// Exhaust the second stream in another thread.
let b = task::spawn(async {
let nums = stream::repeat(2u8).take(100);
while let Some(num) = nums.next().await {
print(num);
}
});
// Wait until both streams have been exhausted
a.join(b).await;
However with Stream::merge
we can greatly reduce this logic by creating a
single stream that yields items from both streams as soon as they become
available, and spawning fewer tasks in the process:
let a = stream::repeat(1u8).take(100);
let b = stream::repeat(2u8).take(100);
let nums = a.merge(b);
while let Some(num) = nums.next().await {
println!("num: {}", num);
}
one-to-many
Going from a single stream to multiple streams is a bit trickier than what we've
seen before. But luckily Rust makes this easy through the channel
abstraction. And as part of async-std
we've built a channel impl that's both
easy and performant.
Channels always come in pairs. When you create a new channel, you get back a
tuple of (Sender, Receiver)
. Whenever a value is written from a Sender
, it's
received by a Receiver
. Both structs implement Clone
, Send
, and Sync
so
they can freely be shared around between as many threads and tasks as you want.
Receiver
also implements Stream
which makes it easy to read values with.
In the following example we split a stream of numbers into two streams: one of
even numbers, and one of uneven numbers. We pass both Sender
s on one side,
which means that from the other side we're now free to read values from either
stream.
use async_std::stream;
use async_std::sync;
let (even_reader, even_writer) = sync::channel(1);
let (uneven_reader, uneven_writer) = sync::channel(1);
let num_stream = stream::repeat(10u8).take(20);
while let Some(num) = num_stream.next().await {
match num % 2 {
0 => even_writer.send(num).await;
_ => uneven_writer.send(num).await;
}
}
// We can now asynchronously read from `even_reader`
// and `uneven_reader` in separate tasks.
Channels in async-std
are incredibly versatile. They're very similar to event
emitters in other languages, and can often be used in similar scenarios. The
only difference is that channels in async-std
currently don't support sending
values from a single reader to all receivers; but that's something we're
considering adding. This would allow our streams to model many-to-many
relationships as well.
Collecting Streams
Often times when a computation is over you'd like to store the results
somewhere. An example is printing to stdout, but also collecting all output in a
vector. In std this is done through Iterator::collect
. Similarly with
streams this can be done through Stream::collect
.
The trick to making collect
work is the FromStream
trait. This converts a
stream into a type asynchronously. In async-std
we've implemented this for a
lot of std's types. When combined with some of the other stream types this
allows for creating really nice pipeline patterns.
// Create a stream of tuples, and collect into a hashmap.
let a = stream::once(1u8);
let b = stream::once(0u8);
let s = a.zip(b);
let map: HashMap<u8, u8> = s.collect().await;
assert_eq!(map.get(&1), Some(&0u8));
There are even more interesting patterns possible; for example if you have a
stream of Result<T, E>
you could collect into a Result<Vec<T>, E>
. This
allows short-circuiting the stream as soon as an error occurs.
use async_std::prelude::*;
use async_std::stream;
let v = stream::from_iter(vec![1, 2]);
let res: Result<Vec<u32>, &'static str> = v.map(|x: u32|
x.checked_add(1).ok_or("Overflow!")
).collect().await;
assert_eq!(res, Ok(vec![2, 3]));
Creating streams from collections
The
IntoStream
trait implements how to convert any type into a stream. It's the asynchronous
counterpart to
IntoIterator
:
// sync
vec![1, 2, 3].iter().for_each(|n| dbg!(n));
// async, proposed
vec![1, 2, 3].stream().for_each(|n| dbg!(n)).await;
Unfortunately due to the orphan rules we're currently not able to implement
IntoStream
for any of the std::collection
types. This would need to happen
as part of futures-core
because that is where the Stream
trait is defined.
We'd also love to propose adding FromStream
to futures-core
, but
unfortunately it's hard to implement without language support for async fn
in
traits. In async-std
we require a
Pin<Box<T>>
as the return type, which is less efficient than what we'd want it to be. And
for collect
to work, we really need both traits.
Cancelling streams
Often it's desirable to stop a stream remotely. If we want to shut down gracefully often the first step is to stop processing new data. And that's something that needs to be initialized remotely.
For that we have the stop-token
crate. An experimental stream built on top of channels that provides remote
cancellation of streams:
use stop_token::StopToken;
async fn do_work(work: impl Stream<Item = Event>, stop_token: StopToken) {
// The `work` stream will end early: as soon as `stop_token` is cancelled.
let mut work = stop_token.stop_stream(work);
while let Some(event) = work.next().await {
process_event(event).await
}
}
We think cancellation is something that should be easy to do, and stop-token
is a first attempt at implementing that. All credit to
matklad for this.
Parallel Streams
A common pattern when processing data is to create a parallel fan-out / fan-in
pipeline. Say we have a
stream of data; we'd like to process that data in parallel. And once it's done
processing, we either collect
it, or iterate over values one-by-one.
In synchronous Rust you can choose to hand-write such a pipeline, or perhaps use
crossbeam::scope
.
But the nicest way of pipelining data is still rayon
.
The way rayon
works is by providing a parallel version of
Iterator
called
ParallelIterator
that, as its name implies allows processing iterators in parallel. Going from
sequential execution to parallel execution usually means just replacing iter
with par_iter
:
use rayon::prelude::*;
fn sum_of_squares(input: &[i32]) -> i32 {
input.par_iter() // <-- just change that!
.map(|&i| i * i)
.sum()
}
Similarly we should be able to apply this model to Stream
by introducing a new
ParallelStream
trait that operates on items in parallel by calling
task::spawn
under the hood. All that would be needed would be an added call to
par
.
use parallel_stream::prelude::*;
async fn sum_of_squares(input: impl Stream<Item = u32>) -> i32 {
input.par() // <-- just add that!
.map(|&i| i * i)
.sum()
.await
}
Unfortunately despite being fairly convinced that this is possible to implement,
we currently don't have an implementation of besides this
sketch.
If you'd like to help out on parallel-stream
, let us know on
GitHub or
Discord and we'll gladly help. We really want to
see this exist!
Concurrency in futures-rs
You may have noticed that we haven't mentioned the
futures
library much. In part that
is because it doesn't provide many abstractions that work with executors. But
let's take a quick look at the stream concurrency abstractions that are provided:
stream::futures_unordered
: aSet
of futures that can resolve in any order..StreamExt::for_each_concurrent
: a concurrent version ofStream::for_each
.StreamExt::try_for_each_concurrent
: a concurrent version ofStream::try_for_each
.select!
: Polls multiple futures and streams simultaneously, executing the branch for the future that finishes first.
futures_unordered
, for_each_concurrent
, and friends occupy roughly the same
space as ParallelStream
. But ParallelStream
has the benefit that it should
be a bit more flexible, and make better use of system resources by leveraging an
executor.
select!
is an abstraction which is somewhat similar to match
but operates
on streams and futures directly. But it's not without its shortcoming. select!
introduces a two new keywords: default
and complete
. It requires all futures
and streams to be manually fused. And has also required changes in
futures
that have created deviations from the stdlib, which is a cost in
itself.
let a_fut = async_identity_fn(1).fuse();
let b_fut = async_identity_fn(2).fuse();
let mut total = 0;
loop {
select! {
a = a_fut => total += a,
b = b_fut => total += b,
complete => break,
default => panic!(), // never runs (futures run first, then complete)
};
}
assert_eq!(total, 10);
It seems that as we're moving towards extending the stdlib with more of
futures
' core types, we'll eventually need to provide a solution on how to
operate on streams. And because there's no precedent for including a macro as
complex as select!
in the stdlib, it seems likely it would either need to be
upgraded to a language construct, or we would need to look at alternatives that
can be included.
With async-std
we've chosen to look for alternatives instead. And between
Stream::merge
for combining streams, and stop-token
for cancelling them
it seems we're well on our way. But we're not at the end of the road yet, and
we'd like to keep experimenting and documenting stream adapters until we
confidently can replace all uses of select!
.
Looking Ahead: Language Support
A bit further down the line it might be interesting to consider what language
support could look like for parallel Rust. If we can define parallel
counterparts to Iterator
and Stream
, it begs the question if we could also
define parallel counterparts to loop
, for
, and while
.
Take for example the following TCP server. It listens for incoming requests, and processes them in sequence. It has a maximum concurrency of 1, but the code is really easy to follow.
let mut listener = TcpListener::bind("127.0.0.1:8080");
println!("Listening on {}", listener.local_addr()?);
for stream in listener.incoming() {
let stream = stream?;
println!("Accepting from: {}", stream.peer_addr()?);
io::copy(&stream, &stream)?;
}
Now if we try and parallelize the server using async-std
today we can suddenly
handle thousands of requests concurrently, but the code itself is much less
readable. And worse, we can no longer pass errors from the inner scope back up
to the outer scope without calling .await
on each JoinHandle
, and making the
request handler serial again.
let mut listener = TcpListener::bind("127.0.0.1:8080").await?;
println!("Listening on {}", listener.local_addr()?);
while let Some(stream) = listener.incoming().await {
let stream = stream?;
task::spawn(async move {
println!("Accepting from: {}", stream.peer_addr().unwrap());
io::copy(&stream, &stream).await.unwrap();
});
}
An API such as ParallelStream
would provide some alleviation here. It would
allow us to implement a parallel server without losing the ability to return
errors from scopes.
let mut listener = TcpListener::bind("127.0.0.1:8080").await?;
println!("Listening on {}", listener.local_addr()?);
listener
.incoming()
.par()
.try_for_each(|stream| async move {
let stream = stream?;
println!("Accepting from: {}", stream.peer_addr()?);
io::copy(&stream, &stream).await?;
Ok(())
})
.await?;
But this suddenly feels very different from the API we started with. Code is just as much about reading as it is about writing. And being able to spot loops at a glance is very useful. So instead of only having chained operators, wouldn't it be nice if we could write parallel loops instead? Perhaps something like this:
let mut listener = TcpListener::bind("127.0.0.1:8080").await?;
println!("Listening on {}", listener.local_addr()?);
for par stream.await? in listener.incoming() {
println!("Accepting from: {}", stream.peer_addr()?);
io::copy(&stream, &stream).await?;
}
This again looks a lot like the synchronous TCP server we started with. But instead of processing requests serially, it now uses all available cores without blocking.
Now this is not so much a concrete proposal, as a sketch to share what things could look like if we designed them that way. I'd like us to think big on this; following the tradition of making seemingly complex concepts surprisingly accessible. Not only would we be free of data races in parallel code. It'd be incredibly convenient to write as well.
Conclusion
In this post we've looked at different variants of streams concurrency, discussed the current state of support they have in Async Rust, and shared avenues worth exploring to improve the status quo.
Personally I'm very excited about how this post has come together. I feel like Rust is currently on a path to achieve what no other mainstream language has achieved: to make writing parallel code not only correct and performant. But to make it as easy as sequential code.
This might seem like a tall order, but between the diagnostics efforts, language design, compiler work, and libraries it really feels like that's where we're heading. And I couldn't be more excited.
Thanks to: Irina Shestak for the illustrations. And Stjepan Glavina, Sunjay Varma, Niko Matsakis, Withoutboats, Aaron Turon, Aleksey Kladov, Ryan Levick, and Friedel Ziegelmayer.