Tide Channels
— 2020-01-29

WebSocket (WS) support for Tide has been a long-anticipated feature. More recently requests for Server Sent Events (SSE) support have started to pop up as well. In this post we'll look at the motivation, requirements, and design for WS and SSE support in Tide. But to not keep you waiting, this is the Tl;Dr of what we've come up with:

let mut app = tide::new();
app.at("/sse").get(tide::sse()); // Endpoint to connect new SSE channels on.
app.at("/").get(async |req| {
    req.sse().send(b"hello Chashu").await?; // Send a message over the SSE channel.
    Response::new(200)
});
app.listen("127.0.0.1:8080").await?;

Note: unlike some of my other posts that document work that's been completed already, this post documents designs that are currently underway. The upside of this is that I can share my notes as I'm doing research! The downside is that the designs outlined here don't exist yet, and will likely change during implementation and stabilization.

Why channels?

Before diving into how WS and SSE support could be exposed it's worth asking why we want these features in the first place. Live chat is a pretty common example, but however fun that is to write, most uses are more boring. Examples include:

These features could all be implemented by periodically making HTTP requests to the server to check whether new data is available. But instead using WS or SSE streams allows that data to be sent to the client as soon as the data is available. This often results in this information being presented sooner in browsers, which often feels good 1.

1

It's hard to definitively say whether WS/SSE performs better than repeatedly polling. With HTTP/1.1 it probably is. But with HTTP/2.0 many of the downsides of making repeated requests are gone, and keeping a separate WS connection open might in fact be slower. But work is being done on enabling WS support over HTTP/2.0 so it's hard to make any definitive statements about performance. It all depends.

I think the best way to think about WS and SSE is as networked channels. Messages are sent by one side and received by the other. SSE provides unidirectional channels. WS provides bidirectional channels (both sides of the channel can send messages to the other side). As we'll see later on, features such as broadcasting and multi-producer support can be enabled for either channel kind.

Which APIs do we need?

Implementations for async SSE and WS streams already exist on crates.io. Examples include async-tungstenite and sse-codec. But while these are great building blocks to build upon, exposing them from Tide without analyzing how people would want to use them would result in a poor end-user experience 2 3.

2

Many frameworks (of any kind) feel like they were designed by walking down a checklist of features. On paper these frameworks are great because they tick all boxes and infinitely flexible. But in their flexibility they fail to provide meaningful abstractions, shifting the burden of using APIs correctly onto the user. This rarely leads to pleasant user experiences.

3

While talking about abstractions: finding the right layer to abstract on is a tedious process. Abstract too much, and unexpected uses that should be valid become impossible. Abstract too little, and APIs become hard to use and prone to error. The task of API design is to find the middle ground, and balance that with other forces such as consistency and performance. And getting things right takes time.

Channel implementations for HTTP servers can roughly be split up into two categories 4:

This calls for the introduction of two-ish APIs: one for sending messages from inside HTTP endpoints, and one for sending messages outside of it. And in turn there should be variants for both SSE and WS.

Another consideration is that there should be a mechanism to easily access channels associated with the same peer. But it should also be possible to access channels associated with other peers.

Channel states

In theory any channel can exist in one of 3 states:

However we can't make any meaningful distinction at the framework layer between "currently not connected", and "will never connect" because we can't know for sure whether we've encountered a race condition, or if this is intended behavior. 5

The way to provide stricter guarantees around Tide's APIs is for application authors to encode stricter guarantees for their applications by writing Extension Traits. This is part of Tide's layered design.

5

If an API is public clients may or may not provide websocket support as well. This is different from applications where the author controls both client and server impls, where that guarantee can be made. Tide needs to support both, so we can't codify any assumptions about what it means when a client isn't connected.

API design

The design we're going for in Tide is to have a managed pool of client connections inside the framework, and expose convenience methods to access them from endpoints and on registration.

Channels will be subdivided under two flavors: Server Sent Events (SSE), and WebSockets (WS). Their APIs will be most mostly similar, with the main difference being that WS channels can receive messages and implement Stream<Item = Message>.

Registering a new channel

Establishing a new channel is similar for both SSE and WS:

let mut app = tide::new();
app.at("/sse").get(tide::sse());
app.at("/ws").get(tide::ws());

In the case of WS an HTTP upgrade is performed, which has a handshake procedure and some other details. While in the SSE case a response is instantiated by sending the right response headers, and then keeping the body stream open.

In addition to shorthand functions, we should also allow configuration of both channel kinds. By default the shorthand function would provide things like timeouts, keepalive messages, and other values. But these should all be configurable through the constructors.

let mut app tide::new();

let ws = tide::ws().timeout(Duration::from_secs(5));
app.at("/ws").get(ws);

Registering external channel handlers

Similar to async-std's channels, Tide's channels would be cloned so each request can get their own copy and we don't ever have to worry about lifetimes.

On creation the channel structs should provide hooks so that when a new channel is instantiated, a callback is called providing the channel itself. Probably for good measure it'd also be useful to provide a copy of State.

let mut app tide::new();

let ws = tide::ws_with(async |ws: WebSocket, _state| {
    println!("new websocket created");
});
app.at("/ws").get(ws);

The purpose of this API is to integrate with external resources that might want to send messages to peers outside of regular the scope of an HTTP request.

An open question here remains how to expose further metadata. Perhaps the WebSocket struct should hold information such as peer_addr, but that's still unclear.

Using channels inside endpoints

Using channels inside endpoints should have a convenient API. Both req.sse() and req.ws() would always return a valid channel instance associated with the current client address.

let mut app = tide::new();
app.at("/ws").get(tide::ws()); // endpoint to establish a websocket connection on
app.at("/").get(async |req| {
    let socket = req.ws(); // access the socket

    socket.send(b"hello chashu").await?;    // send a message
    let msg = socket.recv().await?;         // receive a message
    println!("message received: {}", msg);  // print a message

    Response::new(200)
});
app.listen("127.0.0.1:8080").await?;

Connectivity states can be checked through the is_open method. (See Channel States for more on this). Say we don't want to send a message unless we have an SSE connection, we could express it as follows:

let mut app = tide::new();
app.at("/sse").get(tide::sse());
app.at("/").get(async |req| {
    let sse = req.sse();

    if sse.is_open() {
        sse.send(b"hello Nori").await?;
    }

    Response::new(200)
});
app.listen("127.0.0.1:8080").await?;

Support for accessing channels associated with other peers would not be possible in the first iteration for these APIs. See group support for future directions we're exploring.

Matching channels with remote peers

Something to keep in mind is that we can't match remote peers with channels purely on incoming TCP request addresses since those could be routed by a proxy. Instead we should be aware of x-forwarded-for and forwarded headers.

It seems likely there is a set of APIs here that we need to define that exposes the "actual IP address" of the remote peers. Any bugs around this logic seem like they'd be a typical case of: "worked on my machine, but broke once in production because of proxies" so we should put some effort in to get this right.

Internally to the framework we should keep a hashmap that matches remote peers with channels, and shares the right channel with the provided Request.

update(2020/01/30): as was pointed out on twitter, in order to safely associate peers with channels we need to introduce a session mechanism first. Matching by IP or other params has dire security implications.

Disconnects

Another topic to consider are disconnects, and retries. Both SSE and WS handle disconnects and retries differently, but the gist is that on disconnect we we should assume that the client might reconnect for a given duration. And once that is over all pending channels should fail with an io::Error (since the remote is gone).

While the channel is awaiting reconnection, the is_open method should continue to return true. However the senders will remain waiting and might eventually end. This is acceptable behavior since if an SSE connection dropped midway through a session, the client itself likely dropped as well, at which point it seems acceptable to return an error from the endpoint.

If this proves to be insufficient, we may consider extending Tide's channel APIs. There's a fair chance this might be the case, so it's something to keep in mind.

Heartbeat

In order for channels to keep their respective connections open, they need to regularly send data over the TCP connection. Failure to do so usually results in the underlying operating system closing a connection and returning a timeout error. Sending messages at a set interval for this purpose is also known as a "heartbeat".

For Server Sent Events, heartbeats should be implemented using comments. These are empty messages that are ignored by client implementations.

The WebSocket specification provides ping and pong frames that can be used to maintain a heartbeat with a peer.

Interface overview

These are the interfaces we're proposing for the various channel interfaces:

use async_std::sync::{Sender, Receiver};

// The messages sent and received by Tide's channels.
pub enum Message {
    Text(String),
    Binary(Vec<u8>),
    Ping,
}

impl From<String> for Message {};
impl From<Vec<u8>> for Message {};
impl TryFrom<Message> for String {};
impl TryFrom<Message> for Vec<u8> {};

// The websocket interface.
pub struct WebSocket((Sender, Receiver));

impl WebSocket {
    fn is_open(&self) -> bool;
    async fn send(&self, impl Into<Message>) -> io::Result<()>;
    async fn send_json(&self, &impl Serialize) -> io::Result<()>;
    async fn recv(&self) -> io::Result<Message>;
    async fn recv_json<T: DeserializeOwned>(&self) -> io::Result<T>;
}

impl Stream<Item = Message> for WebSocket {};

// The server sent events interface.
pub struct ServerSentEvents(Sender);

impl ServerSentEvents {
    /// The id of the next message.
    fn id(&self) -> u64;
    fn is_open(&self) -> bool;
    async fn send(&self, impl Into<Message>) -> io::Result<()>;
    async fn send_json(&self, &impl Serialize) -> io::Result<()>;
}

// Impls on Tide's Request struct
impl Request {
    fn ws(&self) -> WebSocket;
    fn sse(&self) -> ServerSentEvents;
}

And these are the interfaces for the various endpoints and endpoint constructors:

/// Top-level functions to create an SSE endpoint
pub fn sse() -> ServerSendEventsEndpoint;
pub fn sse_with(f: impl AsyncFn<ServerSentEvents>) -> ServerSendEventsEndpoint;

/// Top-level function to create an SSE endpoint
pub fn ws() -> WebSocketEndpoint;
pub fn ws_with(f: impl AsyncFn<WebSocket>) -> WebSocketEndpoint;

/// The SSE endpoint struct.
pub struct ServerSentEventsEndpoint(fn: Option<AsyncFn<_>>);
impl ServerSentEventsEndpoint {
    pub fn new() -> Self;
    pub fn with(fn: impl AsyncFn<ServerSentEvents>) -> Self;
    pub fn set_timeout(self) -> Self;
}
impl tide::Endpoint for ServerSentEventsEndpoint {};

/// The Websocket endpoint struct.
pub struct WebSocketEndpoint(fn: Option<AsyncFn<_>>);
impl WebSocketEndpoint {
    pub fn new() -> Self;
    pub fn with(fn: impl AsyncFn<WebSocketEndpoint>) -> Self;
    pub fn set_timeout(self) -> Self;
}
impl tide::Endpoint for WebSocketEndpoint {};

Future directions

This post currently only describes a design for channels in Tide. Logical next steps would include: implementation, feedback, and stabilization. But looking beyond that, there are some features that don't haven't been fleshed out yet.

Group support

The APIs we've described so far only operate on individual channels. Inside an HTTP request it's clear which channel is connected to the client. But what if you want to send messages over channels connected to different clients? This often means manually writing grouping logic to keep track of who belongs to which group.

In Phoenix channels, all channels are grouped under topics, which makes it easy to send messages to multiple channels. This is great for features such as group chat, or real-time collaboration.

For Tide we want it to be possible to create groups, but do that as an extension of singular APIs. Unlike Phoenix, a design goal here is to not require a specialized client library to interact with Tide's channels. Tide should be flexible enough to replace existing production servers, without requiring a redesign of the client logic. A rough API sketch:

let mut app = tide::new();
app.at("/wss").get(tide::ws());
app.at("/room/:name/join").post(async |req| {
    let group_name = format!("room:{}", req.params("name")?);
    req.ws().join_group(&group_name).await;
    req.ws().broadcast(&group_name, format!("{} has joined the room", req.peer_addr())).await?;
    Response::new(200)
});
app.listen("127.0.0.1:8080").await?;

In the example above all we provide is a mechanism to more conveniently address peers that are also connected to the same server. In real-world servers this would probably abstracted into a set of shared logic, implemented as a RequestExt. This would allow abstracting the stringly typed APIs into a reusable interface throughout the application:

use my_app::RequestExt;

let mut app = tide::new();
app.at("/wss").get(tide::ws());
app.at("/room/:name/join").post(async |req| {
    req.room().join().await?;
    req.room().broadcast("test, test").await?;
    Response::new(200)
});
app.listen("127.0.0.1:8080").await?;

HTTP/2 push streams

On the surface HTTP/2 push and SSE appear to have much in common. Both enable sending messages from the server to the client, without the client requesting them. However they're intended for slightly different purposes.

In practice HTTP/2 push is being designed to best handle static resources. Specs such as Cache Digests for HTTP/2 making their way in, and packages such as node-h2-auto-push are turning HTTP/2 push into something that works really well for that.

In contrast SSE support in devtools is improving, and becoming easier to inspect and debug small messages sent by the server. The different protocols are being optimized for different purposes, and that's valid 6.

6

For more on HTTP/2 push, I can recommend the article: "Node.js can HTTP/2 push!"

However if we were to expose an API for it, it'd probably be along these lines 7:

let mut app = tide::new();
app.at("/index.html").get(async |req| {
    let mut res = Response::new(200);
    res.set_body(Path::new("./assets/index.html"));

    let mut css = Response::new(200);
    css.set_body(Path::new("./assets/bundle.css"));
    res.push("/bundle.css", css);

    Ok(res)
});
app.listen("127.0.0.1:8080").await?;
7

The main consideration for any HTTP/2 push API is that we stay true to Tide's Request/Response model. This is analogous to the Node.js HTTP/2 response.createPushResponse() API that's part of the HTTP/1 compat layer. The step after this would probably be to build cache-aware auto-push middleware, but that comes later.

Summary

In this post we've shared the motivation, constraints, and proposed design for Tide's channels APIs. These APIs cover support for Server Sent Events, and WebSockets.

In addition we've looked ahead for ways we can extend these APIs to work with "groups", and shared a rough draft of an HTTP/2 push interface as well.

As I said at the start of this post, this one is a bit different than usual. Today is January 29th 2020, and the entirety of this post was drafted and published in a single day. This means there's a fair chance designs laid out here might have shortcomings, and will change over time. But I wanted to get ahead of the curve, and share the research and motivations underlying these designs.

My personal takeaway from researching the various channel APIs for Tide, is that channels present a really nice interface, and lots of network things can probably be expressed as channels! I've used both Server Sent Events and WebSockets before, but they never really felt right. I think with this API we might be onto something that could make this easier.

Either way that's about all I can share for now. It's back to writing different posts. Happy Wednesday!