rust streams
— 2019-06-20

As Rust's async story is evolving, so is Rust's streaming story. In this post we'll take a look at how Rust's streaming model works, how to use it effectively, and where things are heading in the future.

The stream traits

In synchronous Rust, the core streaming abstraction is that of Iterator. It provides a way of yielding items in a sequence, and blocks in between. Composition is done by passing iterators into the constructors of other iterators, allowing us to plumb things together without much fanfare.

In asynchronous Rust the core streaming abstraction is Stream. It behaves very similar to Iterator, but instead of blocking between each item yield, it allows other tasks to run while it waits.

In addition async Rust has counterparts to the synchronous Read and Write in the form of AsyncRead and AsyncWrite. The purpose of these traits is to represent unparsed bytes, often coming directly from the IO layer (such as from sockets or files).

use futures::prelude::*;
use runtime::fs::File;

let f = file::create("foo.txt").await?; // create a file
f.write_all(b"hello world").await?;     // write data to the file (AsyncWrite)

let f = file::open("foo.txt").await?; // open a file
let mut buffer = Vec::new();          // init the buffer to read the data into
f.read_to_end(&mut buffer).await?;    // read the whole file (AsyncRead)

Rust streams have some of the best features of other languages. For example: they sidestep inheritance problems as seen in Node.js's Duplex streams by leveraging Rust's trait system. But they also implement backpressure and lazy iteration, improving their efficiency. And on top of that, Rust streams allow asynchronous iteration using the same type.

There's a lot to like about Rust streams, even though there are still some kinks to be sorted out.

Streams and roles

Let's start off by enumerating the kinds of streams that can be expressed in a typical system:

Establishing common terminology is useful because Rust's stream traits don't map 1:1 to these roles. In fact, each of Rust's stream traits can be used to fill many different roles. Here's an overview of which roles each trait can take part in:

Source Sink Through Duplex
AsyncRead Yes No Yes Yes
AsyncWrite No Yes No Yes
Stream Yes No Yes No

There's quite a bit to unpack here. Let's dig in!

duplex

duplex is always implemented using AsyncRead + AsyncWrite. This is not unlike other languages. A key difference, however, is that using Rust's trait system we can evade multiple inheritance problems that plague some other languages. Examples of duplex streams include sockets and files.

through

through streams are implemented using either AsyncRead or Stream. Data flows from one stream to the other by passing another through into its constructor.

In Rust, the only difference between source and through is in how the traits are used, not in the trait definitions themselves. An example:

let s = b"hello planet";                // source  (AsyncRead)
let s = gzip::compress(s).await?;       // through (AsyncRead)
let s = my_protocol::parse(s).await?;   // through (Stream)

asyncread vs stream

Another point of interest is the distinction between AsyncRead and Stream. Both kinds of streams are allowed to operate on bytes. But the difference is that AsyncRead is a byte stream that operates on borrowed data. While Stream is an object stream that operates on owned data. This is to say that Stream can operate on any kind of data, not only bytes.

While both AsyncRead and Stream can operate on bytes, AsyncRead yields unparsed data, while Stream yields parsed data. The difference is that with Stream each item yielded can generally be turned into a valid message on its own. While with AsyncRead it may be the case we need to request more data.

Examples of AsyncRead include files, sockets, and HTTP bodies. Examples of Stream include ndjson lines, and protobuf messages.

The relationship between AsyncRead and Stream is equivalent to the relationship between stdlib's Read and Iterator traits. In the following example we convert arbitrary amount of bytes into separate lines of bytes using split. We've marked each line with the traits and yield types:

use std::io;

let f = io::File::open("foo.txt")?; // Read<[u8]>
let f = io::BufReader::new(f);      // Read<[u8]>
for buf in f.split(b'\n') {         // Iterator<[u8]>
    println!("{}", buf);
}

Same data types. Different traits.

Unfortunately AsyncRead.split does something radically different, so this example can't be directly copied over yet (more on what split does later). So don't try and write this in async Rust quite yet.

sinks

The way streams work is that at the end of a stream pipeline, there's a sink or iterator requesting items from the streams. This means that the stream pipeline will only yield data if it's requested for. This is commonly referred to as "lazy iteration", or "streams with backpressure".

Currently there's no dedicated syntax to loop through streams. Instead it's recommended to use a while let Some loop:

let stream = my_protocol::parse(f).await?;
while let Some(item) = stream.next().await {
    println!("{:?}", item);
}

Now that we have a slightly better picture of how Rust's traits related to streaming concepts, we're ready to take a look at how to create streaming pipelines.

Pipelines

One of the staples of streams-based programming is being able to compose streams together. In shell you can pipe programs together using |, and in Node.js you can do the same using .pipe. A typical shell example looks like this:

$ cat foo.txt | gzip > foo.txt.gz

The example above reads data from foo.txt, pipes it through gzip to compress the data, and writes the result back out to a new file.

Rust streams have a very similar model. In fact we could imagine the same code being written in Rust as:

use runtime::fs::File;

File::open("foo.txt")
    .and_then(|s| gzip::compress(s))
    .and_then(|s| word_count::bytes(s))
    .and_then(|s| s.copy_into(File::create("foo.txt.gz")))
    .await?;

This code example won't run today because not all packages exist yet. But it illustrates quite well how Rust's streams work in practice. We can express the pipeline abstractly as follows:

┌───────────┐   ┌───────────┐   ┌────────────┐
│ AsyncRead │──>│ AsyncRead │──>│ AsyncWrite │
└───────────┘   └───────────┘   └────────────┘

Data goes from the source file, through the compressor into the destination file. Different pipelines will use different combinations of AsyncRead and Sink. But in all patterns it's going to be common to pass the last stream down to the next constructor, until we reach a sink.

Piping duplex streams

When duplex streams are involved, the streaming model gets a little trickier. Let's pretend we're opening a socket that implements AsyncRead + AsyncWrite:

let mut sock = Socket::new("localhost:3000");
dbg!(sock) // implements AsyncRead + AsyncWrite

We want to read data from the socket, operate on each value, and write data back to the socket. In Rust this would get us in trouble because we can't hold a mutable reference to the same value in two places. So duplex streams have a convenient split method to split the socket into a reader and writer half:

let mut sock = Socket::new("localhost:3000");
let (reader, writer) = &mut sock.split();

Piping AsyncRead to AsyncWrite

In the example above, the Socket duplex is both a source, and a sink. Neither of these methods wraps another stream. And sometimes we're only interested in the read or the write half of the stream. Which is why it's uncommon for Duplex streams to take other streams in their constructor.

So how do we write data to it?

Well, Rust conveniently has a copy_into combinator for this exact purpose. It takes data from an AsyncRead, and writes it to an AsyncWrite:

let mut sock = Socket::new("localhost:3000");
let (reader, writer) = &mut sock.split();
reader.copy_into(writer).await?;

Piping Stream to AsyncWrite

If we want to write data from a Stream to an AsyncWrite, things become quite a bit tricky. First off our Stream should output bytes (&[u8] or Vec<u8>), because IO devices can only read bytes.

But more importantly: there's currently no copy_into combinator available! But we can work around that by converting from Stream into AsyncRead, and then calling copy_into on that:

stream
    .map(io::Result::Ok)  // convert each `Vec<u8>` to `Result<Vec<u8>>`
    .into_async_read()    // convert the stream to `AsyncRead`
    .copy_into(writer)    // copy the data to the sink
    .await?;              // start the pipeline

Currently this code does suffer from a double buffering bug, which makes it less efficient than it could be. But what would likely work best here is if copy_into would work for Stream too:

stream.copy_into(writer).await?;

Handling errors

One of the biggest mistakes Node.js made when it introduced streams, was that pipe doesn't forward errors. Luckily in Rust streams this is solved because of how streams are wrapped in constructors. This means that streams automatically forward errors, and pipelines handle them.

The only difficulty with error handling is that the error kinds need to line up. This can be particularly tricky when creating pipelines that include errors other than io::Error. But the ecosystem is still young, and patterns are still emerging, so it shouldn't be surprising not everything is streamlined quite yet.

writing codecs

It's common for parser protocols be split into an encoder and decoder half. Encoders convert structs to sequences of bytes. And decoders convert bytes into structs. This can easily be modeled in Rust:

/// The type we're converting to and from.
pub struct MyFrame;

/// Convert frames to bytes.
pub struct Encoder;
impl Encoder {
    /// Take a stream of frames, and return a stream of bytes.
    pub fn encode(stream: impl Stream<Item = MyFrame>) -> Self;
}
impl Stream for Encoder {
    type Item = Result<Vec<u8>, Error>;
}

/// Convert bytes to frames.
pub struct Decoder;
impl Decoder {
    /// Take a stream of bytes, and return a stream of frames.
    pub fn decode(reader: impl AsyncRead) -> Self;
}
impl Stream for Decoder {
    type Item = Result<MyFrame, Error>;
}

There exist specialized crates that are meant to assist in the creation of codecs. But in practice codecs are mostly a design pattern, and the easiest way to write them is using the standard stream traits directly.

note: depending on your use case you might need to perform some internal buffering when writing decoders. But all that requires is a good (ring)buffer abstraction, and there's a variety on crates.io.

ad-hoc streams using combinators

Sometimes you want to quickly operate on the output of a stream. Whether it's filtering out results you're not interested in, concatenating items, or doing a quick count. Streams combinators allow you to perform these tasks with little overhead.

Say we wanted to read data from a file, and split it by newline. The lines combinators provides that:

let mut sock = Socket::new("localhost:3000");
let (reader, _) = &mut sock.split();

// This is returns a stream of `String`
let lines = reader.lines().await?;

Now what if we wanted to parse those lines using serde? Cue the map combinator:

let mut sock = Socket::new("localhost:3000");
let (reader, _) = &mut sock.split();

#[derive(Deserialize)]
struct Pet {
    name: String,
}

// This returns a stream of `Result<Pet>`
let pet_stream = reader
    .lines()
    .map(|line| serde_json::parse::<Pet>(line));

Another interesting fact to point out is that Vec<u8> implements both AsyncRead and AsyncWrite, which means that if you want to concatenate all values of a stream, it's possible to use a buffer directly for that.

There are probably many more combinators that could be added, and patterns to be explored. But the core mechanics of Rust's streams feel really solid, and more combinators can be added as we grow the ecosystem.

Why we do not talk about the sink trait

Surprise! There's another trait you should know about. Its name is Sink, and it's the odd one out in the lot. It's not just confusing to say out loud (are we talking about Sync or Sink?), but the trait itself is quite out there. Take a look at the definition:

pub trait Sink<Item> {
    type SinkError;
    fn poll_ready(
        self: Pin<&mut Self>,
        cx: &mut Contex
    ) -> Poll<Result<(), Self::SinkError>>;
    fn start_send(
        self: Pin<&mut Self>,
        item: Item
    ) -> Result<(), Self::SinkError>;
    fn poll_flush(
        self: Pin<&mut Self>,
        cx: &mut Context
    ) -> Poll<Result<(), Self::SinkError>>;
    fn poll_close(
        self: Pin<&mut Self>,
        cx: &mut Context
    ) -> Poll<Result<(), Self::SinkError>>;
}

That's right. Whenever you implement Sink you need to implement 4 methods, 1 associated type, and 1 generic parameter. Oh and also a mandatory internal buffer. Because all those methods in the trait definition are hooks into a very specific lifecycle. Where the only way to move data through that cycle is by temporarily storing data internally, and yielding it again at a later point.

Maybe you've caught on to it, but Sink is not simple. Its raison d'être is to be a typed counterpart to AsyncWrite. It usually wraps a writer in its constructor, and then serializes types into it.

On paper this might sound appealing. But in practice nobody dares write this monster of a trait without heavy-handed help from crates.io. Which begs the question if this amount of complexity is actually worth it. And the answer increasingly seems to be a resounding "no".

Sink doesn't bring anything to the table that can't be solved more elegantly and with less ceremony using the 3 standard stream traits. So save yourself some trouble, and don't bother with Sink.

What's next?

async iteration syntax

Async iteration of streams is currently possible, but it isn't necessarily nice to use. Most user land iteration of streams is done using while let Some loops

let mut listener = TcpListener::bind("127.0.0.1:8081")?;
let incoming = listener.incoming();
while let Some(conn) = incoming.await {
    let conn = conn?;
    /* handle connection */
}

It'd be nicer if we could write this as a for await loop instead:

let mut listener = TcpListener::bind("127.0.0.1:8081")?;
for conn.await? in listener.incoming() {
    /* handle connection */
}

It's unclear when this will happen. But it's definitely something worth looking forward to!

async trait streams

Speaking of improvements, the stream traits themselves could use some work. Currently the traits are quite similar to the Future trait:

pub trait AsyncRead {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context,
        buf: &mut [u8]
    ) -> Poll<io::Result<usize>>;
}

What makes this especially tricky is the definition of self: Pin<&mut Self>. This means this method is only implemented for instances of Self that are pinned. I don't want to bore you with all the ways why this is tricky, but instead I want to mention that lately I've been hearing conversations about a possible simplification of these traits.

In principle the stream traits don't have anything async about them. The only reason why they're async is because they return futures, and might need to wait on other futures internally. This is important, because once async is allowed in traits directly, it seems like it would be possible to simplify the traits significantly.

pub trait AsyncRead {
    async fn read(&mut self, buf: &mut [u8]) -> io::Result<usize>;
}

This would be particularly nice because it would mean AsyncRead, AsyncWrite and Stream would be defined the exact same way as std Read, Write, and Iterator with the only difference being the async keyword in front of the methods.

pub trait Read {
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize>;
}

Nothing about this is sure though. But I'm cautiously optimistic about the possibilities here.

anonymous streams using yield

Speaking of improvements to how we define streams, another thing that has been talked about is adding syntax for generators. Generators would likely use the yield keyword, and we could imagine a stream essentially being a generator of futures. And just like async/await allows us to skip the boilerplate around constructing futures, yield would give us the same for streams:

async fn keep_squaring(mut val: u64) -> yield u64 {
   loop {
       val *= 2;
       yield val;
   }
}

for val.await in keep_squaring(4) {
    dbg!(val);
}

This one might be a lot further out though, but seems like it has the potential to provide some welcome workflow improvements.

zero-copy reads and writes

A nice feature AsyncRead and AsyncWrite have is support for vectored IO through poll_read_vectored and poll_write_vectored. This allows optimizing performance for specific applications.

A similar method that might be useful to add in the future are poll_read_vec and poll_write_vec (perhaps under a less confusing name). These methods would allow passing buffers directly into the methods, and using a mem::swap trick, prevent performing one extra memcpy on every operation. Allowing us to increase performance in certain APIs significantly, without needing to modify the end-user API at all.

This is particularly relevant when wrapping synchronous APIs (which currently means: almost every filesystem operation). But more importantly: it would allow us to remove the extra overhead Rust currently has for futures based IO, compared to using the OS APIs directly.

Conclusion

In this post we've talked about the different kinds of async streams rust has, discussed common patterns and pitfalls, and looked towards a possible future of streams.

The future or Rust streams is incredibly exciting! If we can nail the ergonomics of piping streams together, we'll be one step closer to making Rust a great option for the space traditionally held by scripting languages. But with Rust's reliability guarantees.

We hope you enjoyed reading about streams! -- have a great week!

Thanks to Irina Shestak, Nemo157, David Barsky, Stjepan Glavina, and Hugh Kennedy for reading and providing feedback, ideas, and input on the many iterations of this post.