Producer Send and SendAsyn is blocked for forever when pulsar is down
Producer Send and SendAsyn is blocked forever when pulsar is down if the MaxReconnectToBroker is set to unlimited retry. In case of pulsar down scenarios, within runEventsLoopin producer_partition.go, the call enters reconnectToBroker and remains in a forever loop until the pulsar broker connection is established. Due to this, no more events are consumed from eventsChan channel causing both Send and SendAsyn to be blocked. Due to this, the SendTimeout would also be not honoured.
Expected behavior
Producer Send must not be blocked forever when the pulsar broker is down. It must honour the SendTimeout and return back with an error. Producer SendAsyn must never be blocked when the pulsar broker is down. It must honour the SendTimeout and call the callback function.
Actual behavior
Due the above mentioned issue Producer Send/SendAsyn blocks forever when the pulsar broker is down
Steps to reproduce
- Create a pulsar producer and set the MaxReconnectToBroker as unlimited retry and SendTimeout as a fixed value
- Send messages to pulsar using Send or SendAsyn API
- Bring down pulsar broker or inject a connection error between broker and client
- In case of Send, the call is blocker forever. In case of SendAsyn, the callback is never called and once the
pendingQueueis filled, the call is blocked forever.
System configuration
Pulsar client version - v0.4.0
Proposed fix : In runEventsLoop, have a seperate go-routine working on connectClosedCh channel. This way eventsChan is never blocked.
func (p *partitionProducer) runEventsLoop() {
go func() {
for {
select {
case <-p.closeCh:
return
case <-p.connectClosedCh:
p.log.Debug("runEventsLoop will reconnect")
p.reconnectToBroker()
}
}
}()
for {
select {
case i := <-p.eventsChan:
switch v := i.(type) {
case *sendRequest:
p.internalSend(v)
case *flushRequest:
p.internalFlush(v)
case *closeProducer:
p.internalClose(v)
return
}
case <-p.batchFlushTicker.C:
if p.batchBuilder.IsMultiBatches() {
p.internalFlushCurrentBatches()
} else {
p.internalFlushCurrentBatch()
}
}
}
}
This is a duplication of #496
#496 Issue seems to be with the consumers. Also, does the proposed fix look good. If yes, I can create a PR for the same
I had the same issue with v0.14.0.