go-sse icon indicating copy to clipboard operation
go-sse copied to clipboard

Alternative SSE client

Open tmaxmax opened this issue 1 year ago • 11 comments

The current client API offers a great deal of flexibility:

  • with an sse.Client multiple Connections with the same configuration can be made
  • there can be multiple event listeners (for distinct or the same event)
  • event listeners can be added or removed after the connection is established
  • event listeners can listen to a single event type or all of them.

This doesn't come for free, though: both the user-facing API and the implementation code are complex, and the client uses a bit more resources, generates more garbage and must ensure serialized concurrent access to internal state. Moreover, the current client:

  • does not announce in any way what the connection status is – see Appendix 1 to this proposal for context and how it could be tackled.
  • is cumbersome for consuming OpenAI-style streams – see Appendix 2 for a utility specifically designed for reading such streams.

As of now, the instantiation of a connection with cancellation, some custom configuration and sending data on a channel looks as follows:

ctx, cancel := context.WithCancel(context.Background()) // or other means of creating a context, might come from somewhere
defer cancel()

client := sse.Client{
	/* your options */
}

r, _ := http.NewRequestWithContext(ctx, http.MethodGet, "http://localhost:8000", http.NoBody)
conn := client.NewConnection(r)

ch := make(chan sse.Event)
go func() {
	for ev := range ch {
		fmt.Println("%s\n\n", event.Data)
	}
}()

conn.SubscribeMessages(func(event sse.Event) {
	ch <- event
})

if err := conn.Connect(); err != nil {
	fmt.Fprintln(os.Stderr, err)
}

I've added the channel because from what I have observed, most users of the library create callbacks which mainly send the events on a channel to be consumed elsewhere.

I think this is quite a mouthful and I wonder whether enough use of the aforementioned flexibilities is made for them to justify the current API.

Here's another way in which I think the client could be designed. Instead of having a Client type and a Connection type with many methods, we could instead have the following:

package sse

// Connect does the HTTP request, receives the events from the server and sends them
// on the given channel.
//
// Returns errors if any of the parameters are invalid. Besides that it has the exact same
// behavior as `sse.Connection.Connect` has.
func Connect(req *http.Request, msgs chan<- Event, config *ConnectConfig) error

type ConnectConfig struct {
	/* the same stuff that is on sse.Client currently */
}

Usage would look as follows:

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ch := make(chan sse.Event)
go func() {
	for ev := range ch {
		if ev.Type == "" {
			fmt.Printf("%s\n\n", event.Data)
		}
	}
}()

config := &see.ConnectConfig{
	/* your options */
}

r, _ := http.NewRequestWithContext(ctx, http.MethodGet, "http://localhost:8000", http.NoBody)
if err := sse.Connect(r, ch, config); err != nil {
	fmt.Fprintln(os.Stderr, err)
}

It is not that much shorter, but assuming that the context comes from elsewhere and that the configuration is already defined, the code necessary for establishing a connection is significantly shorter – creating an http.Request and calling Connect. Connection with the default configuration would also not need a separate top-level function – just pass nil instead of a ConnectionConfig!

There are two important changes here, though:

  • checking that the received events are of the desired type is now the user's responsibility
  • new event listeners cannot be added (so easily) on the fly – the user would have to implement this themselves

For example, if we receive three diferent event types and we handle them differently, previously one could do:

conn.SubscribeEvent("a", func(e sse.Event) {
	aCh <- e
})
conn.SubscribeEvent("b", func(e sse.Event) {
	bCh <- e
})
conn.SubscribeEvent("c", func(e sse.Event) {
	cCh <- e
})
if err := conn.Connect(); err != nil {
	// handle error
}

With this change, it would look like this:

evs := make(chan sse.Event)
go func() {
	for e := range evs {
		switch e.Type {
		case "a":
			aCh <- e // or just handle them here instead of sending them on another channel
		case "b":
			bCh <- e
		case "c":
			cCh <- e
		}
	}
}()
if err := sse.Connect(req, evs, nil); err != nil {
	// handle error
}

On the flipside, simple requests would be easier to make. Consider a request to ChatGPT:

prompt, _ := json.Marshal(OpenAIRequest{
	Model:    "gpt-4-1106-preview",
	Messages: msgs,
	Stream:   true,
})
r, _ := http.NewRequest(http.MethodPost, "https://api.openai.com/v1/chat/completions", strings.NewReader(string(prompt)))
r.Header.Add("Authorization", fmt.Sprintf("Bearer %s", OpenAIKey))
r.Header.Add("Content-Type", "application/json")
conn := sse.NewConnection(r)

conn.SubscribeMessages(func(ev sse.Event) {
	events <- ev // it is processed elsewhere
})

if err := conn.Connect(); err != nil {
	/* handle error */
}

This would be the new version:

prompt, _ := json.Marshal(OpenAIRequest{
	Model:    "gpt-4-1106-preview",
	Messages: msgs,
	Stream:   true,
})
r, _ := http.NewRequest(http.MethodPost, "https://api.openai.com/v1/chat/completions", strings.NewReader(string(prompt)))
r.Header.Add("Authorization", fmt.Sprintf("Bearer %s", OpenAIKey))
r.Header.Add("Content-Type", "application/json")

if err := sse.Connect(r, events, nil); err != nil {
	/* handle error */
}

There are obvious benefits:

  • much less boilerplate – no more NewConnection -> SubscribeMessages -> Connect
  • it is not possible to connect without receiving the messages
  • given that the connection code is shorter, focus is moved on the creation of the request
  • handling the response data happens directly in user's code – there's no function boundary to separate business logic, no inversion of control

As an analogy, imagine if the net/http.Client would be used something like this:

conn := http.Client.NewConnection(req)
conn.HandleResponse(func(res *http.Response) {
	// do something with the response
})
if err := conn.Connect(); err != nil {
	// handle error
}

It would be painful to use.

The main advantage of the new API would be, I believe, that the control of the response is fully in the library user's hands. There are no callbacks one needs to reason about; there is no need for the user to look up the source code to find out how the Connection behaves in various respects – for example, in what order the event listeners are called; finally, in a paradoxical manner there would be one single way to do things: for example, if one wants to handle multiple event types, currently they can register multiple callbacks for each event, or write the same switch code as above inside a callback passed to SubscribeAll. Also, it would be much easier to maintain – this change would result in ~200LOC and 6 public API entities being removed. This reduction in API surface reduces documentation and in the end how much the user must learn about the library in order to use it effectively.

Looking forward on your input regarding this API change!

tmaxmax avatar Feb 06 '24 00:02 tmaxmax

Just my 2c's as far as I understand this.

  • As a general point of view, the more code can be removed the better, especially if that code is not part of the problem that is being solved.
  • connecting using a single Connect function with parameters that describe what is needed is more self descriptive that multiple methods. The parameters are my guide that is easier to discover than having to remember there are multiple steps needed.
  • Using an options struct in Connect() with optional configuration, such as max buffer size, would fit here nicely. It also matches the http connect approach.
  • I've been using SSE events with and without subscription and ended up not using it. The switch-case approach is just fine for all my use-cases and the code is easier to follow and debug. Subscribing to events adds the burden of unsubscribing with little benefit. Less mental load so a plus a my book.
  • The only counter argument to providing an event handler on Connect, is when there is a need to set the event handler after connection is made. For example when the connection must be established before the event handler is instantiated. Perhaps this can still be accommodated with a 'SetHandler' method.

So in short, these look like good improvements.

hspaay avatar May 19 '24 03:05 hspaay

Thank you for the feedback!

As an observation, unsubscribing is not actually required for the correctness of the program. It is there to enable the user to stop receiving events on the given subscription. This reinforces that the API presents a set of unnecessary confusion points.

With regards to adding event handlers on the fly, this could still be achieved with some userland code. One possible solution could be something like this:

type handlerRegistration {
    eventType string
    handler   func(sse.Event)
}

handlers := map[string]func(sse.Event){} // the key is the event type
handlersCh := make(chan handlerRegistration)
ch := make(chan sse.Event)

go func() {
    for {
        select {
        case e := <-ch:
            if h := handlers[e.Type]; h != nil {
                h(e)
            }
        case r := <-handlersCh:
            handlers[r.eventType] = r.handler
        case <-ctx.Done():
            return
        }
    }
}()

// the call to sse.Connect

This is actually very similar to how it's implemented internally. The thing is this code would reside in the user's codebase, which means it can be adjusted to whatever their needs are – subsequent event handler registration could be implemented in a totally different manner, if that's more fit.

There could maybe be some type which would wrap the events channel and provide the handler registration functionality separately but honestly for the reasons above I'd rather just not have go-sse offer a solution for this. It seems as if the most flexibility is achieved by doing nothing, in this case.

To collect more data points, what's a concrete scenario where adding handlers after connecting was useful or necessary?

tmaxmax avatar May 19 '24 09:05 tmaxmax

Thanks for the example. It makes a lot of sense. No concerns there.

The use-case for adding handlers after connecting is when the connection is required as part of initialization of a service so the service can receive async responses. This also makes it easy to use different transports for testing or based on configuration.

If the client is an end-user (web browser) then it would be the other way around as the application starts before the connection is established or re-established.

This isn't a deal breaker as it is easy enough to create a helper that is added as SSE listener before the connection is established and passed to the service after the connection is successful. Just a bit more work. Not a big deal. There are many ways to cut this apple. :) I hope this explanation makes sense.

hspaay avatar May 20 '24 22:05 hspaay

Question: where does specifying a custom http.Client fit into the new API design?

One thing that is common throughout other codebases is using a subset of an *http.Client rather than the whole thing, e.g.:

// A Doer captures the Do method of an http.Client.
type Doer interface {
	Do(*http.Request) (*http.Response, error)
}

Unless go-sse needs a full *http.Client for some underlying reason, it might be nice to similarly have go-sse take this interface instead of a full http.Client. Just an idea :)

emidoots avatar Jul 13 '24 20:07 emidoots

@slimsag the sse.ConnectConfig would have a Client *http.Client field.

When it comes to mocking the client, I think the canonical way to do it in Go is to actually use a mock http.RoundTripper for the client's Transport field. This is how I've been doing it and how I'm doing it for example in this library's tests.

I also wouldn't add another API entity just for mocking when the same can already be achieved with the standard library directly, in order to keep go-sse's footprint minimal.

With all these being said, it will be possible to mock the client.

tmaxmax avatar Jul 14 '24 14:07 tmaxmax

I actually don't mean for mocking (although it is useful for that, too) - I really mean for using other HTTP clients (typically a wrapper of an *http.Client that have other behavior, e.g. tracing/metrics, logging, etc.)

emidoots avatar Jul 14 '24 14:07 emidoots

I understand now. I actually happen not to be very familiar with this pattern – are there some open-source projects which employ it or some libraries that help with creating such clients?

I'm trying to determine whether there is something either unachievable through http.Client.Transport to this pattern or which provides significant advantages over just using that, and how widespread it is.

To me at a first glance, if one wishes to log stuff around a request, a wrapper http.RoundTripper could be used to that effect with equal success.

tmaxmax avatar Jul 14 '24 17:07 tmaxmax

Here are some examples of other projects that do it:

  • https://github.com/aws/smithy-go/blob/main/transport/http/client.go#L12-L15
  • https://github.com/sourcegraph/sourcegraph/blob/660d6866b5d386e9ae9239ea432b4e126eaa4e7a/internal/metrics/metrics.go#L102-L105 (and many other places in sourcegraph codebase)
  • https://github.com/cilium/cilium/blob/main/vendor/github.com/Azure/go-autorest/autorest/adal/sender.go#L37-L42
  • https://github.com/kubernetes/kubernetes/blob/master/pkg/kubelet/types/types.go#L31-L31
  • https://github.com/kedacore/keda/blob/main/pkg/util/http.go#L41-L41
  • https://github.com/olivere/elastic/blob/release-branch.v7/client.go#L110-L110
  • https://github.com/oapi-codegen/oapi-codegen/blob/main/examples/client/client.gen.go#L27-L27
  • https://github.com/prometheus/client_golang/blob/main/prometheus/push/push.go#L64

Anywho, it's totally just a suggestion/idea for compatibility with existing code. I think there are ways to workaround it. Feel free to disregard the suggestion :)

emidoots avatar Jul 14 '24 18:07 emidoots

Thank you for the plethora of examples!

I find it interesting how in some of them, namely Sourcegraph, Cilium, the implementors of that interface in the respective files actually wrap an http.RoundTripper. I haven't explored usage in the other codebases or beyond the linked source files, so admittedly this is a very low sample size to draw conclusions from.

I'll most certainly revisit this once I start working on the client. If other people see this discussion and would be interested in having this, please leave a reaction to any of the relevant messages.

tmaxmax avatar Jul 14 '24 23:07 tmaxmax