eventsource
eventsource copied to clipboard
Client will hang there if the server side crashed after the connection has already been established.
Let's see the example below:
Server side:
package main
import (
"fmt"
"net"
"net/http"
"time"
"github.com/donovanhide/eventsource"
)
type TimeEvent time.Time
func (t TimeEvent) Id() string { return fmt.Sprint(time.Time(t).UnixNano()) }
func (t TimeEvent) Event() string { return "Tick" }
func (t TimeEvent) Data() string { return time.Time(t).String() }
const (
TICK_COUNT = 5
)
func TimePublisher(srv *eventsource.Server) {
start := time.Date(2013, time.January, 1, 0, 0, 0, 0, time.UTC)
ticker := time.NewTicker(time.Second)
for {
select {
case <- ticker.C:
}
srv.Publish([]string{"time"}, TimeEvent(start))
start = start.Add(time.Second)
}
}
func main() {
srv := eventsource.NewServer()
srv.Gzip = true
defer srv.Close()
l, err := net.Listen("tcp", "127.0.0.1:8099")
if err != nil {
return
}
defer l.Close()
http.HandleFunc("/time", srv.Handler("time"))
go http.Serve(l, nil)
go TimePublisher(srv)
fmt.Println("event source started.")
select {}
}
Client side:
package main
import (
"fmt"
"github.com/donovanhide/eventsource"
)
func main() {
stream, err := eventsource.Subscribe("http://127.0.0.1:8099/time", "")
if err != nil {
return
}
for ev := range stream.Events{
fmt.Println(ev.Id(), ev.Event(), ev.Data())
}
}
You'll find that after the connection has been extablished, and the client side has print logs like this:
1356998406000000000 Tick 2013-01-01 00:00:06 +0000 UTC
1356998407000000000 Tick 2013-01-01 00:00:07 +0000 UTC
1356998408000000000 Tick 2013-01-01 00:00:08 +0000 UTC
1356998409000000000 Tick 2013-01-01 00:00:09 +0000 UTC
1356998410000000000 Tick 2013-01-01 00:00:10 +0000 UTC
1356998411000000000 Tick 2013-01-01 00:00:11 +0000 UTC
But, if you kill the server side process now, you can find that client just hang there, and no errors occurs.
And there is no retry actions.
How to solve this, guys ?
The client hangs on https://github.com/donovanhide/eventsource/blob/3ed64d21fb0b6bd8b49bcfec08f3004daee8723d/stream.go#L193
I've been playing with this to find a fix but I'm not entirely sure my approach is correct yet. It seems that there's no error detection so I set the stream.isClosed if I encounter an error during Decode().
Let me share what I've done so far. If someone can finish or correct what I've tried if, please do so.
My comments are prefixed with WC
func (stream *Stream) receiveEvents(r io.ReadCloser) {
dec := NewDecoder(r)
for {
ev, err := dec.Decode()
if stream.isStreamClosed() {
return
}
if err != nil {
// WC Not sure if this is correct but let's close the stream so we know to reconnect later
stream.markStreamClosed()
// WC putting this err on the channel seems to be cause of Issue #38
//stream.Errors <- err
return
}
pub := ev.(*publication)
if pub.Retry() > 0 {
stream.retry = time.Duration(pub.Retry()) * time.Millisecond
}
if len(pub.Id()) > 0 {
stream.lastEventId = pub.Id()
}
stream.Events <- ev
}
}
func (stream *Stream) retryRestartStream() {
backoff := stream.retry
for {
time.Sleep(backoff)
if stream.isStreamClosed() {
stream.Logger.Println("while retry stream is closed")
// WC not sure why we'd return if stream is closed, probably my misunderstanding of what should invoke a restart
// WC but I'm going to comment this out for now as I want the for loop to continue until it can restart/reconnect
//return
}
// NOTE: because of the defer we're opening the new connection
// before closing the old one. Shouldn't be a problem in practice,
// but something to be aware of.
// WC added this to only reconnect to stream if it's closed
if stream.isStreamClosed() {
if stream.Logger != nil {
stream.Logger.Printf("Reconnecting in %0.4f secs\n", backoff.Seconds())
}
r, err := stream.connect()
if err == nil {
stream.Logger.Println("reconnected")
// WC set isStreamClosed to false (I copied markStreamClosed up top)
stream.markStreamOpen()
go stream.stream(r)
return
} else {
stream.Logger.Printf("retryRestart couldn't connect: %v\n", err)
}
// WC I don't see where we're using this stream.Errors channel so commenting out for now
//stream.Errors <- err
backoff *= 2
}
}
}
if I shutdown server and then restart it, I can reconnect and receive events but I'm getting errors which I suspect are lingering http.Requests from the failed connect attempts.
net/http: request canceled (Client.Timeout exceeded while reading body)
For future reference: It seems the client blocks on the errors channel. All we need to do is flush that channel to let the client continue:
stream, err := eventsource.SubscribeWith("", client, request)
if err != nil {
log.Fatal(err)
}
go func() {
for true {
streamError := <-stream.Errors
log.Debug(streamError)
}
}()