confluent-kafka-go
confluent-kafka-go copied to clipboard
Proposal: Deprecate the channel-based Consumer API
Abstract
Currently there are two distinct Consumer APIs: channel-based and function-based.
Each implementation has a different set of problems. This very fact can be confusing for users and for the maintainers as well. In the end, I believe having two APIs does more harm than good.
Technical background
Some notable differences between the two APIs include:
-
Poll()
might properly re-prioritize messages while the channel-based API does not (eg. channel-based Consumer might receive outdated messages) - #65 & https://github.com/edenhill/librdkafka/issues/1778
- #55
The arguments for having a channel-based API to begin with are the following (as described in README):
- Possibly more idiomatic Go
- Makes reading from multiple channels easy
- Fast
Proposal
I believe we can get away with a single API: we can deprecate the channel-based consumer.
Addressing the arguments for the channel-based API:
- While it'd be nice to have a channel-based API, the fact that outdated events might be received is not worth the effort. librdkafka does already the job of purging outdated messages and not handing them to the application, so we should just rely on that. A function call to receive events is not much less idiomatic than receiving events from a channel.
- Answered in (1)
- I don't have any metrics here. Ideally we would get input from users regarding if they're using the channel-based API for performance reason and if switching to the function-based causes any performance regressions. My wild guess is that it doesn't.
We should note that this proposal doesn't prohibit the use of channels; it just moves this responsibility to the users. Anyone can wrap Poll()
with their own channels. This way, a channel-based API may grow organically as a solution from the community and it could even replace the function-based API if it ever addresses all of its issues and is battle-tested and mature enough.
Thank you for creating this proposal, I believe this makes total sense. Would be interested to hear what the wider community thinks of this proposal.
I'd like to propose also removing the channel-based producer, at least the produce interface - delivery reports can still go on a channel (everyone will typically implement the same in a go routine anyway).
I agree that having the two internal modes is overly complex and error prone.
However, I'd rather refactor to always using a channel internally:
- Change
handle
to always have a goroutine that perpetually polls the queue, one message at a time, with no timeout. - Upon closing the handle, close the
termChan
and callrd_kafka_queue_yield
to unblock the queue polling and stop the polling goroutine. - When a user calls
Poll
, they should pass in a context, which would be idiomatic, give support for timeouts/deadlines, and gracefully handle cancelations. - If we need a non-blocking method, we can introduce
Pop
e.g.
func (h *handle) consumeEvents() {
for {
select {
case <-h.termChan: // check if closed
return
default:
}
// single read logic from current handle#eventPoll
}
}
func (h *handle) Poll(ctx context.Context) (Event, error) {
select {
case <-ctx.Done():
return nil, ctx.Err
case <-h.termChan:
return nil, ErrClosed
case e := <-h.events:
return e, nil
}
}
func (h *handle) Pop() (Event, error) {
select {
case <-h.termChan:
return nil, ErrClosed
case e := <-h.events:
return e, nil
default:
return nil, ErrNoEvents
}
}
One problem with having the Go consumer pre-pull a message from librdkafka and then put it in a Go channel is that it lives in a twilight zone;
- from librdkafka's perspective the message has been delivered to the application - which will affect things like last poll time, number of pre-fetched message thresholds, etc.
- from the application perspective the message does not yet exist
- from the Go consumer's perspective it has no control of the message since it is on the channel which only the application may read.
This complicates things such as offset store&commit, as well as rebalances and termination.
What I'd like to see is a direct API that pulls a message directly from librdkafka and passes it to the application.
Once a message has been polled and is in Go, is it automatically marked as delivered? Is there a way to keep it in a twilight state or unmark it?
Because if that's possible, the channel can be unbuffered and do something like:
func (h *handle) consumeEvents() {
for {
event := h.poll()
select {
case <-h.termChan: // check if closed
// unack
return
case h.events <- event: // hang until it's consumed
// ack
}
}
}
Closing this as Channel based consumer is deprecated in the latest release (v2.0.2)