Skip to main content

Resilient Server-sent Events (SSE)

·4 mins
Kristof Kovacs
Author
Kristof Kovacs
Software Architect & DevOps Consultant

Hello, I’m Kristof, a human being like you, and an easy to work with, friendly guy.

I've been a programmer, a consultant, CIO in startups, head of software development in government, and built two software companies.

Some days I’m coding Golang in the guts of a system and other days I'm wearing a suit to help clients with their DevOps practices.

I'm a big fan of Server-sent Events (SSE), which are similar to Websockets. The main differences are:

  • SSE works only in one direction: from the server to the client. (AJAX calls can easily provide for the other direction.)
  • SSE is much simpler. You don't need a complex library on the server-side, nor on the client-side (I'm looking at you, Socket.io). You could do SSE in CGI with perl or bash scripts, if you had to.
  • SSE is safer with proxies. Websockets has its own protocol (ws:), which is still not always supported by proxies and reverse proxies.
  • SSE is more resilient. It has embedded support for basic reconnection (and we will improve on it in this article).
  • SSE has the drawback of hogging one browser connection out of six (if you're using HTTP/1.x) or of one hundred (if you are using HTTP/2 or higher).

Ably has a very good article comparing SSE and Websockets.

In a recent project, I had to create a very resilient stream, which:

  1. if disconnected, it keeps trying to reconnect indefinitely (even on errors), and
  2. on reconnection, it remembers the last message (time), and queries the server for any updates since.

(In this particular project, I was using SSE as a delivery mechanism for Turbo Streams, so some lines in the following examples call Turbo or use Content-Type: vnd.turbo-stream.html, but you don't have to. I'm marking these with XXX.)

Wire protocol #

Here's how an SSE connection would look like in telnet, receiving three messages:

The HTTP request:

GET /app/sse HTTP/1.1
Host: example.com

The HTTP response:

HTTP/1.1 200
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
Transfer-Encoding: chunked

data: first message

data: another message
data: with two lines

data: {"username": "bobby", "time": "02:34:11", "text": "Hi everyone."}

You can do extra things like named events with event: and it has a built-in followup mechanism using id: which automatically generates a Last-Event-ID header on reconnect.

For more information on SSE protocol message format, read this.

Our approach to reconnection only differs in two things:

  1. Ours keeps reconnecting even if the client received an error from the server. SSE doesn't do this on his own.
  2. Ours is a time-based followup, not event-id based.

Client-side javascript #

This is the client-side Javascript:

// Installs the handler for the Turbo Stream Source.
let eventsource;
function InstallTurboStreamSource() {
    // Avoid double installation
    if (eventsource != null) {
        return;
    }

    // Place to save the time of the last message
    // NOTE: Instead of client-side JS, this initial value is even better when
    // generated server-side.
    let inactivitysince = Math.floor(new Date() / 1000);

    // Create our event source
    function NewEventsource() {
        eventsource = new EventSource("/app/sse/");
        fetch("/app/since/" + inactivitysince + "/")
            .then((r) => r.text())
            .then((html) => Turbo.renderStreamMessage(html)); // XXX Do your own stuff here

        // Update the "latest time we got something" on every message
        eventsource.addEventListener("message", (e) => {
            inactivitysince = Math.floor(new Date() / 1000);
        });
        // If it ever disconnects, start a retry
        eventsource.addEventListener("error", (e) => {
            eventsource.close();
            // Connect us again after a small wait
            setTimeout(NewEventsource, 2000);
        });
        // Close SSE on page unload
        window.addEventListener("beforeunload", (e) => {
            eventsource.close();
        });
        // Connect the EventSource to Turbo
        Turbo.session.connectStreamSource(eventsource); // XXX Do your own stuff here
    }
    NewEventsource();
}

Server-side golang #

Here is the golang function for the popular gin-gonic web framework for sending SSE events (no library required, just pure go):

router.GET("/app/sse/", func(c *gin.Context) {
    c.Writer.Header().Set("Content-Type", "text/event-stream")
    c.Writer.Header().Set("Cache-Control", "no-cache")
    c.Writer.Header().Set("Connection", "keep-alive")
    c.Writer.Header().Set("Transfer-Encoding", "chunked")
    c.Writer.Flush()

    log.Println("Client connected to SSE")

    recv := make(chan string)
    Subscribe(recv) // Subscribe to your data source

    clientGone := c.Writer.CloseNotify()
    for {
        select {
        case <-clientGone:
            Unsubscribe(recv)
            log.Println("Client gone from SSE")
            return
        case message := <-recv:
            log.Printf("Sending SSE stream to client for message %s", message)

            // Get data from DB
            data := [...]

            // Render template
            // NOTE: A bit hack-ish way to capture the output of the function that expects a ResponseWriter
            w := httptest.NewRecorder()
            err = router.HTMLRender.Instance("app-stream.html", gin.H{
                "data": data,
            }).Render(w)
            if err != nil {
                c.AbortWithError(http.StatusBadRequest, err)
                return
            }

            // format it as SSE data
            ssedata := bytes.ReplaceAll(w.Body.Bytes(), []byte("\n"), []byte("\ndata: "))

            // Send response
            _, err = c.Writer.Write(ssedata)
            if err != nil {
                c.AbortWithError(http.StatusBadRequest, err)
                return
            }
            _, err = c.Writer.Write([]byte("\n\n"))
            if err != nil {
                c.AbortWithError(http.StatusBadRequest, err)
                return
            }
            c.Writer.Flush()
        }
    }
})

And while it's fairly easy, for reference here is the go function that handles the "catch me up on the events since..." call:

// Event stream by changed time
router.GET("/app/since/:unixtime/", func(c *gin.Context) {
		// Get filters
		unixtime_str := c.Param("unixtime")
		unixtime, err := strconv.Atoi(unixtime_str)
		if err != nil {
				c.AbortWithError(http.StatusBadRequest, err)
				return
		}

		// Load data
		data := ...

		// Send response
		c.HTML(http.StatusOK, "app-stream.html", gin.H{
				"data": data,
		})
})

Auth0 also has a very detailed article on SSE using React on the client and NodeJS on the server.

Germano has another good article using Python.