Resilient Server-sent Events (SSE)
Hello, I’m Kristof, a human being like you, and an easy to work with, friendly guy.
I've been a programmer, a consultant, CIO in startups, head of software development in government, and built two software companies.
Some days I’m coding Golang in the guts of a system and other days I'm wearing a suit to help clients with their DevOps practices.
Table of Contents
I'm a big fan of Server-sent Events (SSE), which are similar to Websockets. The main differences are:
- SSE works only in one direction: from the server to the client. (AJAX calls can easily provide for the other direction.)
- SSE is much simpler. You don't need a complex library on the server-side, nor on the client-side (I'm looking at you, Socket.io). You could do SSE in CGI with
perl
orbash
scripts, if you had to. - SSE is safer with proxies. Websockets has its own protocol (
ws:
), which is still not always supported by proxies and reverse proxies. - SSE is more resilient. It has embedded support for basic reconnection (and we will improve on it in this article).
- SSE has the drawback of hogging one browser connection out of six (if you're using HTTP/1.x) or of one hundred (if you are using HTTP/2 or higher).
Ably has a very good article comparing SSE and Websockets.
In a recent project, I had to create a very resilient stream, which:
- if disconnected, it keeps trying to reconnect indefinitely (even on errors), and
- on reconnection, it remembers the last message (time), and queries the server for any updates since.
(In this particular project, I was using SSE as a delivery mechanism for Turbo Streams, so some lines in the following examples call Turbo
or use Content-Type: vnd.turbo-stream.html
, but you don't have to. I'm marking these with XXX
.)
Wire protocol #
Here's how an SSE connection would look like in telnet
, receiving three messages:
The HTTP request:
GET /app/sse HTTP/1.1
Host: example.com
The HTTP response:
HTTP/1.1 200
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
Transfer-Encoding: chunked
data: first message
data: another message
data: with two lines
data: {"username": "bobby", "time": "02:34:11", "text": "Hi everyone."}
You can do extra things like named events with event:
and it has a built-in followup mechanism using id:
which automatically generates a Last-Event-ID
header on reconnect.
For more information on SSE protocol message format, read this.
Our approach to reconnection only differs in two things:
- Ours keeps reconnecting even if the client received an error from the server. SSE doesn't do this on his own.
- Ours is a time-based followup, not event-id based.
Client-side javascript #
This is the client-side Javascript:
// Installs the handler for the Turbo Stream Source.
let eventsource;
function InstallTurboStreamSource() {
// Avoid double installation
if (eventsource != null) {
return;
}
// Place to save the time of the last message
// NOTE: Instead of client-side JS, this initial value is even better when
// generated server-side.
let inactivitysince = Math.floor(new Date() / 1000);
// Create our event source
function NewEventsource() {
eventsource = new EventSource("/app/sse/");
fetch("/app/since/" + inactivitysince + "/")
.then((r) => r.text())
.then((html) => Turbo.renderStreamMessage(html)); // XXX Do your own stuff here
// Update the "latest time we got something" on every message
eventsource.addEventListener("message", (e) => {
inactivitysince = Math.floor(new Date() / 1000);
});
// If it ever disconnects, start a retry
eventsource.addEventListener("error", (e) => {
eventsource.close();
// Connect us again after a small wait
setTimeout(NewEventsource, 2000);
});
// Close SSE on page unload
window.addEventListener("beforeunload", (e) => {
eventsource.close();
});
// Connect the EventSource to Turbo
Turbo.session.connectStreamSource(eventsource); // XXX Do your own stuff here
}
NewEventsource();
}
Server-side golang #
Here is the golang function for the popular gin-gonic web framework for sending SSE events (no library required, just pure go
):
router.GET("/app/sse/", func(c *gin.Context) {
c.Writer.Header().Set("Content-Type", "text/event-stream")
c.Writer.Header().Set("Cache-Control", "no-cache")
c.Writer.Header().Set("Connection", "keep-alive")
c.Writer.Header().Set("Transfer-Encoding", "chunked")
c.Writer.Flush()
log.Println("Client connected to SSE")
recv := make(chan string)
Subscribe(recv) // Subscribe to your data source
clientGone := c.Writer.CloseNotify()
for {
select {
case <-clientGone:
Unsubscribe(recv)
log.Println("Client gone from SSE")
return
case message := <-recv:
log.Printf("Sending SSE stream to client for message %s", message)
// Get data from DB
data := [...]
// Render template
// NOTE: A bit hack-ish way to capture the output of the function that expects a ResponseWriter
w := httptest.NewRecorder()
err = router.HTMLRender.Instance("app-stream.html", gin.H{
"data": data,
}).Render(w)
if err != nil {
c.AbortWithError(http.StatusBadRequest, err)
return
}
// format it as SSE data
ssedata := bytes.ReplaceAll(w.Body.Bytes(), []byte("\n"), []byte("\ndata: "))
// Send response
_, err = c.Writer.Write(ssedata)
if err != nil {
c.AbortWithError(http.StatusBadRequest, err)
return
}
_, err = c.Writer.Write([]byte("\n\n"))
if err != nil {
c.AbortWithError(http.StatusBadRequest, err)
return
}
c.Writer.Flush()
}
}
})
And while it's fairly easy, for reference here is the go
function that handles the "catch me up on the events since..." call:
// Event stream by changed time
router.GET("/app/since/:unixtime/", func(c *gin.Context) {
// Get filters
unixtime_str := c.Param("unixtime")
unixtime, err := strconv.Atoi(unixtime_str)
if err != nil {
c.AbortWithError(http.StatusBadRequest, err)
return
}
// Load data
data := ...
// Send response
c.HTML(http.StatusOK, "app-stream.html", gin.H{
"data": data,
})
})
Auth0 also has a very detailed article on SSE using React on the client and NodeJS on the server.
Germano has another good article using Python.