go-nsq
go-nsq copied to clipboard
consumer: sendRDY error not propogating
Hello,
first of all apologies if my terminology is a bit off, I'm not a regular Go programmer :)
We run a process reading from NSQ servers over an SSH tunnel. While debugging an issue when this connection breaks, we found a potential problem with how an error from sendRDY
will not fully propagate.
sendRDY
possibly emits an error (https://github.com/nsqio/go-nsq/blob/d71fb89c9e0263aaed89645b0b168e11ccf597b1/consumer.go#L950-L964):
func (r *Consumer) sendRDY(c *Conn, count int64) error
updateRDY
, which calls sendRDY
, also possibly emits an error (https://github.com/nsqio/go-nsq/blob/d71fb89c9e0263aaed89645b0b168e11ccf597b1/consumer.go#L907):
func (r *Consumer) updateRDY(c *Conn, count int64) error
But that error isn't handled in it's own recursive call here (https://github.com/nsqio/go-nsq/blob/d71fb89c9e0263aaed89645b0b168e11ccf597b1/consumer.go#L940):
r.rdyRetryTimers[c.String()] = time.AfterFunc(5*time.Second,
func() {
r.updateRDY(c, count)
})
We were thinking that the failure for the error to fully propagate means our process doesn't pick up the loss of connection and doesn't know to attempt a mitigation.
We also found a few other invocations of updateRDY
that don't appear to handle errors, which both appear in startStopContinueBackoff
, which doesn't report that it can throw an error (https://github.com/nsqio/go-nsq/blob/d71fb89c9e0263aaed89645b0b168e11ccf597b1/consumer.go#L761):
- https://github.com/nsqio/go-nsq/blob/d71fb89c9e0263aaed89645b0b168e11ccf597b1/consumer.go#L795
- https://github.com/nsqio/go-nsq/blob/d71fb89c9e0263aaed89645b0b168e11ccf597b1/consumer.go#L810
For a little bit of extra context, this seems to require a pretty specific set of circumstances for us. When the tunnel drops, sometimes it's detected and a reconnect happens, other times we see this:
2016/11/01 10:19:07 ERR 1 [csym/create] (127.0.0.1:3001) IO error - write tcp 127.0.0.1:51178->127.0.0.1:3001: write: broken pipe
2016/11/01 10:19:07 ERR 1 [csym/create] (127.0.0.1:3001) error sending RDY 1 - write tcp 127.0.0.1:51178->127.0.0.1:3001: write: broken pipe
When we fall into this mode, we do not observe a reconnect (even though the tunnel would have eventually come back up on it's own, we'd need to reinitialize the connection to NSQ)
@armcknight @bmhatfield thanks for the detailed info, I'll try to take a look at this!
I just poked around at this.
Despite not handling the returned errors in sendRDY
/ updateRDY
, conn.WriteCommand
calls c.delegate.OnIOError
which calls conn.Close
, which should then trigger reconnect cycle.
The only reason why it wouldn't is if messages are in flight and never "complete", meaning the rest of the cleanup logic doesn't execute. This is probably a poor assumption though, perhaps we should bound this with some timeout.
Thoughts?
We resolved this recently on our end, your explanation is pretty spot-on for what we were experiencing. We had messages still in flight when the RDY count was getting redistributed, which caused the connection with the in-flight messages to close prematurely. We fixed this by upping low_rdy_idle_timeout
to 2 minutes in our NSQ client configuration.
I don't think I quite understand all of the inner workings of this package to comment on whether or not there should be a timeout on this operation in the client, so I'll leave that up to you, but hopefully the way we resolved this internally may provide some assistance in making that decision.