Async Iteration I: Iteration Semantics
— 2020-09-18
- the problem space
- control flow operators
- consistency with non-async rust
- how would tasks be spawned?
- parallelizing !send streams
- backpressure & concurency control
- how common is parallel iteration?
- compiler hints
- tool-assisted optimization
- parallel iteration syntax
- addendum: lending streams
- conclusion
This post is part of the Async Iteration series:
- Async Iteration I: Semantics (this post)
- Async Iteration II: Async Iterator Crate
- Async Iteration III: Async Iterator Trait
In a recent lang team meeting we discussed
the upcoming
Stream
RFC. We covered both the steps required to land it, and the steps we'd want
to take after that. One of the things we'd want eventually for streams is:
"async iteration syntax". Just like for x in y
works for Iterator
, we'd
want something similar to work for Stream
. For example JavaScript support
both
sync
and
async
iteration through for..of
loops:
// sync
for (let num of iter) {
console.log(i);
}
// async
for await (let event of emitter) {
console.log(event);
}
However when talking about async iteration syntax in Rust one question has come up repeatedly:
Should async iteration be parallel or sequential by default?
In this post I'd like to cover the challenges I see with parallel async iteration, and make the case for, spoilers, defaulting to sequential iteration semantics.
However before we continue, the usual disclaimers: I'm not on the lang team. I do not hold any direct decision making power about the Rust language. I am a member of the Async Foundations WG, but I do not speak for anyone on the team other than myself. This post is from my perspective only.
The problem space
So the core question we're trying to answer is: "should async iteration default to sequential or parallel?" Putting this into code, it's a question of what some hypothetical future "async iteration" syntax should compile down to, and which semantics are provided:
// This code...
for event.await in channel {
dbg!(event);
}
// ... should it lower into this? (sequential)
while let Some(event) = channel.next().await {
dbg!(event);
}
// ... or should it lower into this? (parallel)
channel.into_par_stream().for_each(async |event| dbg!(event));
The "lowering" we're describing here is not the precise code we'd expect
the compiler to output. But hopefully it gets the point across. In the
"sequential" case we'd expect async iteration loops to behave much the same
as async while let Some()
loops do today. And in the parallel case, we'd
expect something to work with semantics similar to those of
parallel-stream.
The main argument I've heard in favor of parallel async iteration semantics is that in most cases it's what people want anyway. For example take the following code:
use async_std::io;
use async_std::net::TcpListener;
use async_std::prelude::*;
let listener = TcpListener::bind("127.0.0.1:8080").await?;
for stream.await? in listener.incoming() {
io::copy(&stream, &stream).await?;
}
Under sequential iteration semantics this loop would process at most 1 TCP connection at the time, irregardless of system resources available. But under parallel iteration semantics, this will have access to all resources available. The idea is that parallel iteration semantics provide a language level solution to this problem.
Control flow operators
The first issue that arises with parallel async iteration is that the break
and continue
control flow operators will have different semantics, or need
to be disallowed altogether. Take the following code:
use async_std::io;
use async_std::net::TcpListener;
use async_std::prelude::*;
let listener = TcpListener::bind("127.0.0.1:8080").await?;
// Let's pretend this is valid syntax..
for (i, stream?).await in listener.incoming().enumerate() {
if i === 1000 {
break;
}
io::copy(&stream, &stream).await?;
}
Say we have a few dozen parallel requests open when we hit break
. We should
probably stop accepting new requests. What should happen to the still open
connections?
There is no right answer. We may want to drop them. We may want to keep them running. More likely the desired behavior may depend on what you're doing. But the semantics will always be different than in regular for loops because of parallelism.
Adding new semantics to a well-established concept is tricky, and may actually lead to subtle translation issues when converting existing algorithms to async Rust. The safest option will be to disallow these keywords in this context altogether.
Consistency with non-async Rust
We may want to think of "parallel iteration semantics" as something unique to async Rust, but in fact non-async Rust faces very similar challenges. To give a very literal example, the TCP example we used earlier would not be parallelized in non-async Rust as well:
use std::io;
use std::net::TcpListener;
let listener = TcpListener::bind("127.0.0.1:8080").await?;
for stream in listener.incoming() {
let stream = stream?;
let (reader, writer) = &mut (&stream, &stream);
io::copy(reader, writer)?;
}
Networking in non-async Rust is not the best match, but for example a data processing pipeline would be. And if we forget to parallelize at the right points there, we'll face very similar issues.
The point being: I don't think "parallelism" is a problem unique to async
Rust. And because of that I don't think we should look for a solution that's
specific to async Rust, but instead look for a solution that can be applied
across domains. I think RFC
2930 (buf_read
) is a great example of an
issue that was found in async Rust, but was determined to apply to non-async
Rust as well, and for which a solution was found that can be applied to both
domains.
How would tasks be spawned?
In order for tasks to be parallelized, they would need to be spawned on a thread pool somehow. Which means a crucial step in enabling parallel iteration at the language level is to have a notion of an executor as part of the language.
C++ is in the process of
adding
to the language (estimate is C++23), so it's not unthinkable for Rust to do
the same. In fact Boats has written about adding a #[global_executor]
before.
// The lowered code from the previous example...
channel.into_par_stream().for_each(async |event| dbg!(event));
// ... would in turn lower into this
while let Some(event) = channel.next() {
task::spawn(async move {
dbg!(event);
});
}
As you can see we're calling task::spawn
for each event. That functionality
would need to come from somewhere, and because async iteration is a language
level concern this would mean that functionality would need to become part of
the language.
I think having a way to perform parallel iteration at the language level is a
great idea. And having a #[global_executor]
hook with a default
implementation would solve a lot of issues the ecosystem currently has.
But if history is any indicator this would all take quite a while to shake out. And I think making async iteration syntax depend on this landing (even if unstable) would mean it might take a lot longer.
Parallelizing !Send streams
Not all streams are Send
, and async iteration should be able to work with
this as well. We can't quite propagate !Send
the way we do for async
blocks; the global executor would need to support spawning !Send
futures as
well. This is not a major hurdle as for example async-stc provides
spawn_local
function. But it's part of the design space to consider.
Backpressure & concurency control
For all its warts something that Node.js streams got right was backpressure. If for example writing to a file is slower than reading from a file, the pipeline won't keep reading data if the writer isn't available. This ensures your RAM doesn't fill up with bytes that are waiting to be written.
const fs = require('fs');
const reader = fs.createReadStream('file.txt');
const compress = zlib.createGzip();
const writer = fs.createWriteStream('file.txt.gz');
reader.pipe(compress).pipe(writer);
In Rust sequential async iteration applies backpressure by default. A translation of the above Node.js code would be somewhat like this:
let reader = fs::open("file.txt").await?;
let writer = fs::open("file.txt.gz").await?;
for buf in GzipEncoder::new(reader) {
writer.write(buf).await?;
}
async-compression
's Stream
types
already work somewhat like this. Because we're performing sequential async
iteration here, reading data from the stream is constrained by the speed at
which we can write. But also vice-versa: the stream is paused if no new data
is available to be read.
If async iteration syntax was parallel by default we would lose backpressure by default. Which means if no constraints are specified, by default parallel async iteration will keep reading data even if there is no consumer ready to process the data. Instead limits will always need to be manually set:
for buf in GzipEncoder::new(reader).limit(5) {
writer.write(buf).await?;
}
There is a third solution to backpressure we haven't discussed yet: set
some limit by default. Instead of async iteration having unbounded
parallelism by default, we could say, enable 1000
items by default. This
limit will however almost always be wrong, and really should be set with
knowledge of the workload.
name | max throughput | backpressure? |
---|---|---|
sequential | 1 | yes |
bounded | n | yes |
unbounded | unlimited | no |
How common is parallel iteration?
A large part of the argument in favor of parallel semantics by default for async iteration is that it is the most commonly desired semantics. Perhaps that may be true today within certain domains; but how true do we expect that to be in the future for across all domains async Rust will cover?
If we want to get a sense of where async Rust might be headed we can take a look at Node.js streams: they've been around for a decade longer than Rust streams have. A common kind of stream in the Node.js ecosystem is the transform stream: a stream that takes an input stream and produces an output stream. Take for example the csv-parser package: it takes a stream of CSV text, and produces an output stream of parsed elements:
NAME,AGE
Daffy Duck,24
Bugs Bunny,22
const csv = require('csv-parser')
const fs = require('fs')
const results = [];
fs.createReadStream('data.csv')
.pipe(csv())
.on('data', (data) => results.push(data))
.on('end', () => {
console.log(results);
// [
// { NAME: 'Daffy Duck', AGE: '24' },
// { NAME: 'Bugs Bunny', AGE: '22' }
// ]
});
Translating this to async Rust (with async iteration syntax) we could have something like this:
use async_std::fs::File;
use csv_parser::Csv;
use std::io;
async fn main() -> io::Result<()> {
let file = File::open("data.csv").await?
for item.await? in Csv::new(file) {
println!("name: {}, age: {}", item["NAME"], item["AGE"]);
}
}
In this case throughput is likely going to be limited by how fast we can read from disk, which means spawning tasks for the inner logic is likely not going to speed this up. In fact, spawning a task per item may in fact slow down the parser since the logic of "read the file" and "operate on the file" can no longer be compiled down into a single state machine 1. While tasks may be cheap compared to threads, they're by no means without a cost.
Inlining performance really depends on the workload. But as a rule of thumb: if something can't be inlined we're likely going to miss out on optimizations. Obviously "println" isn't a good example for something that can be inlined, but you get the gist.
The example above could be parallelized, if say, we were processing a directory of files. We could then spawn a task per file. But each file itself would still be processed as a sequential stream.
Another example of sequential stream semantics is the HTTP/1.1 protocol: the stream of incoming TCP connections is parallelized, but converting the TCP stream into a sequence of HTTP requests is sequential. And within the request itself, each HTTP body is sequential as well. Even a multiplexing protocol such as HTTP/2 eventually breaks down into request / response pairs which have sequential semantics:
async fn main() -> io::Result<()> {
// parallel
for stream.await? in TcpListener::accept("localhost:8080") {
// parallel
for req.await? in Http2Listener::connect(stream) {
// sequential
for (i, line?).await in req.lines() {
println!("line: {}, value: {}", i, line);
}
}
}
}
While parallel iteration may be common, it doesn't seem likely it will ever be used in such an overwhelming amount that sequential iteration is made redundant. It seems most likely people will want to mix both in projects, and we should seek to make both parallel and async iteration convenient.
Compiler hints
So far much of this post has been about why "parallel semantics for async iteration" are not the right default. But this doesn't mean that the issue that intends to address isn't worth addressing. Defaulting to parallel semantics is only one tool.
What if we could use different tools? For example, one issue raised is: "people might forget to parallelize certain streams, missing out on performance". Could we solve this issue through a lint instead? I think we could, for the most part!
For example clippy could detect known instances of core APIs that are should probably be parallelized, and warn when they're not. For example handling TCP connections, or processing files in a directory are things you probably want to do in parallel.
Or if we wanted to allow anyone to add it to their APIs: Rust could introduce
a #[must_par_spawn]
hint that could be added to structs, much like #[must_use]
.
It could come with a counterpart tag that marks a function as "this spawns
tasks". And the compiler could warn if something wasn't spawned 2.
There is an RFC being written for the
must_not_await
hint; going in this direction would not be without precedent.
#[must_par_spawn]
struct HttpStream;
#[par_spawn]
async fn spawn<T, F: Future<Output = T> + 'static>(f: F) -> T;
Both solutions would need work before implementing; but they present alternate directions that can be explored instead of resorting to parallel async iteration syntax.
tool-assisted optimization
In the post Futures and Segmented Stacks Boats talks about techniques to make futures more efficient by moving their stack onto the heap. Locating where to segment your stack is hard to do by hand, but could likely become trivial with the right tooling.
Parallelism has many similarities: finding the right places to parallelize can be tricky. Having a tool that can locate where tasks should be spawned, and which tasks aren't needed would be great. Runtime profiling for async programs seems like a generally underexplored domain, and I suspect more and better tooling could get us really far.
Parallel iteration syntax
So far in this post we've talked about why async iteration shouldn't default
to parallel semantics by default. But I do agree that it would be great if
we had some syntax for parallel iteration. In the "future directions"
section of my
parallel-stream
post, I laid out what that could look like (using par for
):
let mut listener = TcpListener::bind("127.0.0.1:8080").await?;
par for stream.await? in listener.incoming() {
io::copy(&stream, &stream).await?;
}
In par for
loops keywords such as break
and continue
would be invalid,
and each task would be spawned on a task. This syntax could be made to work
for both sync and non-async Rust, backed by different execution strategies.
And finally, being able to talk about parallelism as a first-class language
concept would enable much better diagnostics, documentation, and tooling to
be implemented.
For example, accidentally writing for x.await
instead of par for x.await
on a struct tagged as #[must_par_spawn]
could be met with the following
diagnostic:
warning: loop is sequential
--> src/main.rs:2:9
|
2 | for stream.await? in listener.incoming() {
| ^ help: use `par for stream.await? in listener.incoming()` to parallelize
|
= note: `#[warn(sequential_loops)]` on by default
I think parallel iteration syntax in some form would be fantastic. I just would like to see it exist in a similar way for both async and non-async Rust.
Addendum: lending streams
Scottmcm pointed out on Zulip that parallel iteration semantics also wouldn't work well with lending streams (also known as "streaming" or "attached" streams).
In a lending stream the Item
that gets returned by Stream
may be borrowed
from self. It can only be used as long as the self reference remains live.
For example a Stream
where:
type Item<'iter> = &'iter mut State;
...would fail to compile, which seems rather limiting.
Conclusion
Parallel iteration semantics for Stream
are an incredible concept that has
with much potential, but it seems unlikely to be the right fit for all cases.
By defaulting to sequential iteration semantics for Stream
parity with
Iterator
is preserved. This not only ensures keywords such as break
and
continue
work in a comparable way, it also enables looking at the design of
"parallel iteration" as a design that can be shared between async and
non-async Rust.
We still have a fair bit to go before we can add async iteration syntax to
Rust. The Stream
RFC draft still needs to be submitted and accepted. There
are still questions about migrations and interop with the stream adapters in
the ecosystem. And a design that covers everything from IntoStream
to
syntax 3. But even though all that may still be a while; hopefully
this post can serve to anchor discussion on whether iterating over a Stream
should default to sequential or parallel 4.
All async iteration syntax in this post has been made up, and is by no means an indication of what the lang team will decide on. I just needed something up for the purpose of this post.
I really tried making a joke about "If a stream would wear iteration, would it wear it this way, or that way" but couldn't get it to work. My biggest regret in publishing this post.
Thanks to Friedel Ziegelmayer and Florian Gilcher for proof reading.