fiber icon indicating copy to clipboard operation
fiber copied to clipboard

🤗 [Question]: Setup of SSE Fiber with fasthttp.StreamWriter - event source is pending / never connects ...

Open michealroberts opened this issue 5 months ago • 8 comments

Question Description

Versions:

Go 1.21.5 github.com/gofiber/fiber/v2 v2.52.0 github.com/valyala/fasthttp v1.51.0


Issue

I have the following logic inside of an SSE handler:

// Peak at the incoming Accept request header:
accept := c.Request().Header.Peek("Accept")

// Check whether the Accept header is set to text/event-stream:
if c.Accepts("text/event-stream") == "text/event-stream" && strings.Contains(string(accept), "text/event-stream") {
  ctx := c.Context()

  ctx.SetContentType("text/event-stream")

  ctx.Response.Header.Set("Cache-Control", "no-cache")
  ctx.Response.Header.Set("Connection", "keep-alive")
  ctx.Response.Header.Set("Transfer-Encoding", "chunked")
  ctx.Response.Header.Set("Access-Control-Allow-Headers", "Cache-Control")
  ctx.Response.Header.Set("Access-Control-Allow-Credentials", "true")

  ctx.SetBodyStreamWriter(fasthttp.StreamWriter(func(w *bufio.Writer) {
    defer func() {
      if r := recover(); r != nil {
        fmt.Println("Recovered in SSE writer:", r)
      }
    }()

    ticker := time.NewTicker(1 * time.Second)
    defer ticker.Stop()

    for {
      select {
      case <-ticker.C:
        status, err := GetStatus(telescope)

        if err != nil {
          fmt.Printf("Error while getting status: %v\n", err)
          return
        }

        data, err := json.Marshal(status)

        if err != nil {
          fmt.Printf("Error while marshaling JSON: %v\n", err)
          return
        }

        fmt.Fprintf(w, "data: %s\n\n", string(data))

        fmt.Println(string(data))

        if err := w.Flush(); err != nil {
          fmt.Printf("Error while flushing: %v. Closing connection.\n", err)
          return
        }
      case <-c.Context().Done():
        fmt.Println("Client disconnected. Closing connection.")
        return
      }
    }
  }))

  return nil
}

Which to me, looks good. I can log the message without issue ...

However, when connecting from a browser side client ... the connection is stuck in the "connecting" phase of the event source connection.

I can see, that when the endpoint is called from my client, the server is logging correctly:

CleanShot 2024-02-08 at 11 50 20

But the connection remains as pending:

CleanShot 2024-02-08 at 11 49 24@2x

Also, no errors when requesting application/json (so for me, it isn't a case that the underlying code has an issue):

CleanShot 2024-02-08 at 12 03 50@2x

The front end JS code is standard for the EventSource API.


Headers

CleanShot 2024-02-08 at 11 54 39@2x

Reproduction

I can also provide access to the repository for a minimal reproduction if the issue isn't apparent from what I have supplied if needed, please just request access for your username and I can provide it (as long as you are listed as a core maintainer of this repo).

Code Snippet (optional)

/*****************************************************************************************************************/

//	@author		Michael Roberts <[email protected]>
//	@package	@observerly/nox/telescope
//	@license	Copyright © 2021-2023 observerly

/*****************************************************************************************************************/

package telescope

/*****************************************************************************************************************/

import (
	"bufio"
	"encoding/json"
	"fmt"
	"strings"
	"time"

	"github.com/gofiber/fiber/v2"
	"github.com/observerly/alpacago/pkg/alpacago"
	"github.com/valyala/fasthttp"

	"nox/internal/common"
	"nox/internal/middleware"
)

/*****************************************************************************************************************/

type GetStatusHandlerResponse struct {
	Connected bool `json:"connected"`
}

/*****************************************************************************************************************/

type GetStatusChannels struct {
	Connected chan bool `json:"connected"`
}

/*****************************************************************************************************************/

func GetStatus(telescope *alpacago.Telescope) (GetStatusHandlerResponse, error) {
	// Create channels for the status values:
	channels := GetStatusChannels{}

	// Create a wait group for the status values:
	wg, channels, errors := common.SetupWaitGroupForStruct(channels)

	// Get the connection status:
	go func() {
		defer wg.Done()
		common.RetrieveAndSendToChannel(telescope.IsConnected, channels.Connected, errors)
	}()

	go func() {
		// Wait for all the goroutines to finish:
		wg.Wait()
		// Close the channels:
		common.CloseChannelsForStruct(channels)
	}()

	status := &GetStatusHandlerResponse{}

	// Extract the values from the channels:
	err := common.ExtractValueFromChannelStruct(channels, status)

	// Check if we encountered any errors while extracting the values:
	if len(errors) > 0 {
		return *status, fmt.Errorf("encountered errors while retrieving status values: %v", errors)
	}

	// If we encounter an error, return the error:
	if err != nil {
		return *status, err
	}

	// Return the status values:
	return *status, nil
}

/*****************************************************************************************************************/

func GetStatusHandler(c *fiber.Ctx) error {
	// Get the telescope client from the context middleware:
	telescope, ok := c.Locals("telescope").(*alpacago.Telescope)

	// If the telescope client is not available in the context, return an error:
	if !ok {
		return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
			"error": "Telescope is not available in context",
		})
	}

	// Peak at the incoming Accept request header:
	accept := c.Request().Header.Peek("Accept")

	// Check whether the Accept header is set to text/event-stream:
	if c.Accepts("text/event-stream") == "text/event-stream" && strings.Contains(string(accept), "text/event-stream") {
		ctx := middleware.TextEventStream(c)

		ctx.SetBodyStreamWriter(fasthttp.StreamWriter(func(w *bufio.Writer) {
			defer func() {
				if r := recover(); r != nil {
					fmt.Println("Recovered in SSE writer:", r)
				}
			}()

			ticker := time.NewTicker(1 * time.Second)
			defer ticker.Stop()

			for {
				select {
				case <-ticker.C:
					status, err := GetStatus(telescope)

					if err != nil {
						fmt.Printf("Error while getting status: %v\n", err)
						return
					}

					data, err := json.Marshal(status)

					if err != nil {
						fmt.Printf("Error while marshaling JSON: %v\n", err)
						return
					}

					fmt.Fprintf(w, "data: %s\n\n", string(data))

					fmt.Println(string(data))

					if err := w.Flush(); err != nil {
						fmt.Printf("Error while flushing: %v. Closing connection.\n", err)
						return
					}
				case <-c.Context().Done():
					fmt.Println("Client disconnected. Closing connection.")
					return
				}
			}
		}))

		return nil
	}

	// Get the telescope status
	status, err := GetStatus(telescope)

	if err != nil {
		return c.Status(fiber.StatusInternalServerError).JSON(
			common.ErrorResponse{
				Error: err.Error(),
			},
		)
	}

	// Return the telescope status:
	return c.JSON(status)
}

/*****************************************************************************************************************/

Checklist:

  • [X] I agree to follow Fiber's Code of Conduct.
  • [X] I have checked for existing issues that describe my questions prior to opening this one.
  • [X] I understand that improperly formatted questions may be closed without explanation.

michealroberts avatar Feb 08 '24 12:02 michealroberts

@michealroberts is your endpoint working if you remove

case <-c.Context().Done():
    fmt.Println("Client disconnected. Closing connection.")
    return

efectn avatar Feb 09 '24 15:02 efectn

@efectn Yeh, so it still doesn't work when removing that unfortunately 😞

I have a branch here: https://github.com/observerly/nox/pull/48 for a full reproduction that you should be able to access. In that code, I've added the basic example given in the gofiber/examples repo for SSE setup, line for line, and unfortunately it still doesn't work ...

Are you able to work with SSE on the latest versions of Fiber and fasthttp?

michealroberts avatar Feb 09 '24 15:02 michealroberts

@efectn Yeh, so it still doesn't work when removing that unfortunately 😞

I have a branch here: observerly/nox#48 for a full reproduction that you should be able to access. In that code, I've added the basic example given in the gofiber/examples repo for SSE setup, line for line, and unfortunately it still doesn't work ...

Are you able to work with SSE on the latest versions of Fiber and fasthttp?

This one works for me https://paste.laravel.io/8c6a1464-4f52-46c1-b362-ab49f5ad60cf

2024-02-09_18-40

efectn avatar Feb 09 '24 15:02 efectn

@efectn Aye aye aye, ok. I feel like I have narrowed it down to be able to replicate it.

I have the ETag middleware installed from github.com/gofiber/fiber/v2/middleware/etag ... I guess this somehow causes issues in terms of headers 🤔

What are your thoughts on this? I should be able to get a minimal reproduction.

michealroberts avatar Feb 09 '24 15:02 michealroberts

@efectn Aye aye aye, ok. I feel like I have narrowed it down to be able to replicate it.

I have the ETag middleware installed from github.com/gofiber/fiber/v2/middleware/etag ... I guess this somehow causes issues in terms of headers 🤔

What are your thoughts on this? I should be able to get a minimal reproduction.

Yes it seems. You can disable etag for specific path like:

app.Use(etag.New(etag.Config{
	Next: func(c *fiber.Ctx) bool {
		return c.Path() == "/sse"
	},
}))

efectn avatar Feb 09 '24 15:02 efectn

@efectn I think I will disable ETags globally for now, I have quite a number of SSE routes.

I wonder if I should open up a separate minimal, reproducible, example of the SSE + ETag issue, and maybe start to work on a possible fix ...

michealroberts avatar Feb 09 '24 16:02 michealroberts

I have the same problem.

sdaduanbilei avatar Mar 22 '24 01:03 sdaduanbilei