Async Iteration Semantics
— 2020-09-18

In a recent lang team meeting we discussed the upcoming Stream RFC. We covered both the steps required to land it, and the steps we'd want to take after that. One of the things we'd want eventually for streams is: "async iteration syntax". Just like for x in y works for Iterator, we'd want something similar to work for Stream. For example JavaScript support both sync and async iteration through for..of loops:

// sync
for (let num of iter) {
  console.log(i);
}

// async
for await (let event of emitter) {
    console.log(event);
}

However when talking about async iteration syntax in Rust one question has come up repeatedly:

Should async iteration be parallel or sequential by default?

In this post I'd like to cover the challenges I see with parallel async iteration, and make the case for, spoilers, defaulting to sequential iteration semantics.


However before we continue, the usual disclaimers: I'm not on the lang team. I do not hold any direct decision making power about the Rust language. I am a member of the Async Foundations WG, but I do not speak for anyone on the team other than myself. This post is from my perspective only.

The problem space

So the core question we're trying to answer is: "should async iteration default to sequential or parallel?" Putting this into code, it's a question of what some hypothetical future "async iteration" syntax should compile down to, and which semantics are provided:

// This code...
for event.await in channel {
    dbg!(event);
}

// ... should it lower into this? (sequential)
while let Some(event) = channel.next().await {
    dbg!(event);
}

// ... or should it lower into this? (parallel)
channel.into_par_stream().for_each(async |event| dbg!(event));

The "lowering" we're describing here is not the precise code we'd expect the compiler to output. But hopefully it gets the point across. In the "sequential" case we'd expect async iteration loops to behave much the same as async while let Some() loops do today. And in the parallel case, we'd expect something to work with semantics similar to those of parallel-stream.

The main argument I've heard in favor of parallel async iteration semantics is that in most cases it's what people want anyway. For example take the following code:

use async_std::io;
use async_std::net::TcpListener;
use async_std::prelude::*;

let listener = TcpListener::bind("127.0.0.1:8080").await?;

for stream.await? in listener.incoming() {
    io::copy(&stream, &stream).await?;
}

Under sequential iteration semantics this loop would process at most 1 TCP connection at the time, irregardless of system resources available. But under parallel iteration semantics, this will have access to all resources available. The idea is that parallel iteration semantics provide a language level solution to this problem.

Control flow operators

The first issue that arises with parallel async iteration is that the break and continue control flow operators will have different semantics, or need to be disallowed altogether. Take the following code:

use async_std::io;
use async_std::net::TcpListener;
use async_std::prelude::*;

let listener = TcpListener::bind("127.0.0.1:8080").await?;

// Let's pretend this is valid syntax..
for (i, stream?).await in listener.incoming().enumerate() {
    if i === 1000 {
        break;
    }
    io::copy(&stream, &stream).await?;
}

Say we have a few dozen parallel requests open when we hit break. We should probably stop accepting new requests. What should happen to the still open connections?

There is no right answer. We may want to drop them. We may want to keep them running. More likely the desired behavior may depend on what you're doing. But the semantics will always be different than in regular for loops because of parallelism.

Adding new semantics to a well-established concept is tricky, and may actually lead to subtle translation issues when converting existing algorithms to async Rust. The safest option will be to disallow these keywords in this context altogether.

Consistency with non-async Rust

We may want to think of "parallel iteration semantics" as something unique to async Rust, but in fact non-async Rust faces very similar challenges. To give a very literal example, the TCP example we used earlier would not be parallelized in non-async Rust as well:

use std::io;
use std::net::TcpListener;

let listener = TcpListener::bind("127.0.0.1:8080").await?;

for stream in listener.incoming() {
    let stream = stream?;
    let (reader, writer) = &mut (&stream, &stream);
    io::copy(reader, writer)?;
}

Networking in non-async Rust is not the best match, but for example a data processing pipeline would be. And if we forget to parallelize at the right points there, we'll face very similar issues.

The point being: I don't think "parallelism" is a problem unique to async Rust. And because of that I don't think we should look for a solution that's specific to async Rust, but instead look for a solution that can be applied across domains. I think RFC 2930 (buf_read) is a great example of an issue that was found in async Rust, but was determined to apply to non-async Rust as well, and for which a solution was found that can be applied to both domains.

How would tasks be spawned?

In order for tasks to be parallelized, they would need to be spawned on a thread pool somehow. Which means a crucial step in enabling parallel iteration at the language level is to have a notion of an executor as part of the language.

C++ is in the process of adding to the language (estimate is C++23), so it's not unthinkable for Rust to do the same. In fact Boats has written about adding a #[global_executor] before.

// The lowered code from the previous example...
channel.into_par_stream().for_each(async |event| dbg!(event));

// ... would in turn lower into this
while let Some(event) = channel.next() {
    task::spawn(async move {
        dbg!(event);
    });
}

As you can see we're calling task::spawn for each event. That functionality would need to come from somewhere, and because async iteration is a language level concern this would mean that functionality would need to become part of the language.

I think having a way to perform parallel iteration at the language level is a great idea. And having a #[global_executor] hook with a default implementation would solve a lot of issues the ecosystem currently has.

But if history is any indicator this would all take quite a while to shake out. And I think making async iteration syntax depend on this landing (even if unstable) would mean it might take a lot longer.

Parallelizing !Send streams

Not all streams are Send, and async iteration should be able to work with this as well. We can't quite propagate !Send the way we do for async blocks; the global executor would need to support spawning !Send futures as well. This is not a major hurdle as for example async-stc provides spawn_local function. But it's part of the design space to consider.

Backpressure & concurency control

For all its warts something that Node.js streams got right was backpressure. If for example writing to a file is slower than reading from a file, the pipeline won't keep reading data if the writer isn't available. This ensures your RAM doesn't fill up with bytes that are waiting to be written.

const fs = require('fs');

const reader = fs.createReadStream('file.txt');
const compress = zlib.createGzip();
const writer = fs.createWriteStream('file.txt.gz');

reader.pipe(compress).pipe(writer);

In Rust sequential async iteration applies backpressure by default. A translation of the above Node.js code would be somewhat like this:

let reader = fs::open("file.txt").await?;
let writer = fs::open("file.txt.gz").await?;

for buf in GzipEncoder::new(reader) {
    writer.write(buf).await?;
}

async-compression's Stream types already work somewhat like this. Because we're performing sequential async iteration here, reading data from the stream is constrained by the speed at which we can write. But also vice-versa: the stream is paused if no new data is available to be read.

If async iteration syntax was parallel by default we would lose backpressure by default. Which means if no constraints are specified, by default parallel async iteration will keep reading data even if there is no consumer ready to process the data. Instead limits will always need to be manually set:

for buf in GzipEncoder::new(reader).limit(5) {
    writer.write(buf).await?;
}

There is a third solution to backpressure we haven't discussed yet: set some limit by default. Instead of async iteration having unbounded parallelism by default, we could say, enable 1000 items by default. This limit will however almost always be wrong, and really should be set with knowledge of the workload.

namemax throughputbackpressure?
sequential1yes
boundednyes
unboundedunlimitedno

How common is parallel iteration?

A large part of the argument in favor of parallel semantics by default for async iteration is that it is the most commonly desired semantics. Perhaps that may be true today within certain domains; but how true do we expect that to be in the future for across all domains async Rust will cover?

If we want to get a sense of where async Rust might be headed we can take a look at Node.js streams: they've been around for a decade longer than Rust streams have. A common kind of stream in the Node.js ecosystem is the transform stream: a stream that takes an input stream and produces an output stream. Take for example the csv-parser package: it takes a stream of CSV text, and produces an output stream of parsed elements:

NAME,AGE
Daffy Duck,24
Bugs Bunny,22
const csv = require('csv-parser')
const fs = require('fs')
const results = [];

fs.createReadStream('data.csv')
  .pipe(csv())
  .on('data', (data) => results.push(data))
  .on('end', () => {
    console.log(results);
    // [
    //   { NAME: 'Daffy Duck', AGE: '24' },
    //   { NAME: 'Bugs Bunny', AGE: '22' }
    // ]
  });

Translating this to async Rust (with async iteration syntax) we could have something like this:

use async_std::fs::File;
use csv_parser::Csv;
use std::io;

async fn main() -> io::Result<()> {
    let file = File::open("data.csv").await?
    for item.await? in Csv::new(file) {
        println!("name: {}, age: {}", item["NAME"], item["AGE"]);
    }
}

In this case throughput is likely going to be limited by how fast we can read from disk, which means spawning tasks for the inner logic is likely not going to speed this up. In fact, spawning a task per item may in fact slow down the parser since the logic of "read the file" and "operate on the file" can no longer be compiled down into a single state machine 1. While tasks may be cheap compared to threads, they're by no means without a cost.

1

Inlining performance really depends on the workload. But as a rule of thumb: if something can't be inlined we're likely going to miss out on optimizations. Obviously "println" isn't a good example for something that can be inlined, but you get the gist.

The example above could be parallelized, if say, we were processing a directory of files. We could then spawn a task per file. But each file itself would still be processed as a sequential stream.

Another example of sequential stream semantics is the HTTP/1.1 protocol: the stream of incoming TCP connections is parallelized, but converting the TCP stream into a sequence of HTTP requests is sequential. And within the request itself, each HTTP body is sequential as well. Even a multiplexing protocol such as HTTP/2 eventually breaks down into request / response pairs which have sequential semantics:

async fn main() -> io::Result<()> {
    // parallel
    for stream.await? in TcpListener::accept("localhost:8080") {
        // parallel
        for req.await? in Http2Listener::connect(stream) {
            // sequential
            for (i, line?).await in req.lines() {
                println!("line: {}, value: {}", i, line);
            }
        }
    }
}

While parallel iteration may be common, it doesn't seem likely it will ever be used in such an overwhelming amount that sequential iteration is made redundant. It seems most likely people will want to mix both in projects, and we should seek to make both parallel and async iteration convenient.

Compiler hints

So far much of this post has been about why "parallel semantics for async iteration" are not the right default. But this doesn't mean that the issue that intends to address isn't worth addressing. Defaulting to parallel semantics is only one tool.

What if we could use different tools? For example, one issue raised is: "people might forget to parallelize certain streams, missing out on performance". Could we solve this issue through a lint instead? I think we could, for the most part!

For example clippy could detect known instances of core APIs that are should probably be parallelized, and warn when they're not. For example handling TCP connections, or processing files in a directory are things you probably want to do in parallel.

Or if we wanted to allow anyone to add it to their APIs: Rust could introduce a #[must_par_spawn] hint that could be added to structs, much like #[must_use]. It could come with a counterpart tag that marks a function as "this spawns tasks". And the compiler could warn if something wasn't spawned 2.

2

There is an RFC being written for the must_not_await hint; going in this direction would not be without precedent.

#[must_par_spawn]
struct HttpStream;

#[par_spawn]
async fn spawn<T, F: Future<Output = T> + 'static>(f: F) -> T;

Both solutions would need work before implementing; but they present alternate directions that can be explored instead of resorting to parallel async iteration syntax.

tool-assisted optimization

In the post Futures and Segmented Stacks Boats talks about techniques to make futures more efficient by moving their stack onto the heap. Locating where to segment your stack is hard to do by hand, but could likely become trivial with the right tooling.

Parallelism has many similarities: finding the right places to parallelize can be tricky. Having a tool that can locate where tasks should be spawned, and which tasks aren't needed would be great. Runtime profiling for async programs seems like a generally underexplored domain, and I suspect more and better tooling could get us really far.

Parallel iteration syntax

So far in this post we've talked about why async iteration shouldn't default to parallel semantics by default. But I do agree that it would be great if we had some syntax for parallel iteration. In the "future directions" section of my parallel-stream post, I laid out what that could look like (using par for):

let mut listener = TcpListener::bind("127.0.0.1:8080").await?;
par for stream.await? in listener.incoming() {
    io::copy(&stream, &stream).await?;
}

In par for loops keywords such as break and continue would be invalid, and each task would be spawned on a task. This syntax could be made to work for both sync and non-async Rust, backed by different execution strategies. And finally, being able to talk about parallelism as a first-class language concept would enable much better diagnostics, documentation, and tooling to be implemented.

For example, accidentally writing for x.await instead of par for x.await on a struct tagged as #[must_par_spawn] could be met with the following diagnostic:

warning: loop is sequential
 --> src/main.rs:2:9
  |
2 |     for stream.await? in listener.incoming() {
  |     ^ help: use `par for stream.await? in listener.incoming()` to parallelize
  |
  = note: `#[warn(sequential_loops)]` on by default

I think parallel iteration syntax in some form would be fantastic. I just would like to see it exist in a similar way for both async and non-async Rust.

Addendum: lending streams

Scottmcm pointed out on Zulip that parallel iteration semantics also wouldn't work well with lending streams (also known as "streaming" or "attached" streams).

In a lending stream the Item that gets returned by Stream may be borrowed from self. It can only be used as long as the self reference remains live. For example a Stream where:

type Item<'iter> = &'iter mut State;

...would fail to compile, which seems rather limiting.

Conclusion

Parallel iteration semantics for Stream are an incredible concept that has with much potential, but it seems unlikely to be the right fit for all cases.

By defaulting to sequential iteration semantics for Stream parity with Iterator is preserved. This not only ensures keywords such as break and continue work in a comparable way, it also enables looking at the design of "parallel iteration" as a design that can be shared between async and non-async Rust.

We still have a fair bit to go before we can add async iteration syntax to Rust. The Stream RFC draft still needs to be submitted and accepted. There are still questions about migrations and interop with the stream adapters in the ecosystem. And a design that covers everything from IntoStream to syntax 3. But even though all that may still be a while; hopefully this post can serve to anchor discussion on whether iterating over a Stream should default to sequential or parallel 4.

3

All async iteration syntax in this post has been made up, and is by no means an indication of what the lang team will decide on. I just needed something up for the purpose of this post.

4

I really tried making a joke about "If a stream would wear iteration, would it wear it this way, or that way" but couldn't get it to work. My biggest regret in publishing this post.


Thanks to Friedel Ziegelmayer and Florian Gilcher for proof reading.