confluent-kafka-go icon indicating copy to clipboard operation
confluent-kafka-go copied to clipboard

Proposal: Deprecate the channel-based Consumer API

Open agis opened this issue 6 years ago • 4 comments

Abstract

Currently there are two distinct Consumer APIs: channel-based and function-based.

Each implementation has a different set of problems. This very fact can be confusing for users and for the maintainers as well. In the end, I believe having two APIs does more harm than good.

Technical background

Some notable differences between the two APIs include:

  1. Poll() might properly re-prioritize messages while the channel-based API does not (eg. channel-based Consumer might receive outdated messages)
  2. #65 & https://github.com/edenhill/librdkafka/issues/1778
  3. #55

The arguments for having a channel-based API to begin with are the following (as described in README):

  1. Possibly more idiomatic Go
  2. Makes reading from multiple channels easy
  3. Fast

Proposal

I believe we can get away with a single API: we can deprecate the channel-based consumer.

Addressing the arguments for the channel-based API:

  1. While it'd be nice to have a channel-based API, the fact that outdated events might be received is not worth the effort. librdkafka does already the job of purging outdated messages and not handing them to the application, so we should just rely on that. A function call to receive events is not much less idiomatic than receiving events from a channel.
  2. Answered in (1)
  3. I don't have any metrics here. Ideally we would get input from users regarding if they're using the channel-based API for performance reason and if switching to the function-based causes any performance regressions. My wild guess is that it doesn't.

We should note that this proposal doesn't prohibit the use of channels; it just moves this responsibility to the users. Anyone can wrap Poll() with their own channels. This way, a channel-based API may grow organically as a solution from the community and it could even replace the function-based API if it ever addresses all of its issues and is battle-tested and mature enough.

agis avatar May 23 '18 09:05 agis

Thank you for creating this proposal, I believe this makes total sense. Would be interested to hear what the wider community thinks of this proposal.

I'd like to propose also removing the channel-based producer, at least the produce interface - delivery reports can still go on a channel (everyone will typically implement the same in a go routine anyway).

edenhill avatar May 23 '18 09:05 edenhill

I agree that having the two internal modes is overly complex and error prone.

However, I'd rather refactor to always using a channel internally:

  1. Change handle to always have a goroutine that perpetually polls the queue, one message at a time, with no timeout.
  2. Upon closing the handle, close the termChan and call rd_kafka_queue_yield to unblock the queue polling and stop the polling goroutine.
  3. When a user calls Poll, they should pass in a context, which would be idiomatic, give support for timeouts/deadlines, and gracefully handle cancelations.
  4. If we need a non-blocking method, we can introduce Pop

e.g.

func (h *handle) consumeEvents() {
    for {
        select {
        case <-h.termChan: // check if closed
            return
        default:
        }

       // single read logic from current handle#eventPoll
    }
}

func (h *handle) Poll(ctx context.Context) (Event, error) {
    select {
    case <-ctx.Done():
        return nil, ctx.Err
    case <-h.termChan:
        return nil, ErrClosed
    case e := <-h.events:
        return e, nil
    }
}

func (h *handle) Pop() (Event, error) {
    select {
    case <-h.termChan:
        return nil, ErrClosed
    case e := <-h.events:
        return e, nil
    default:
        return nil, ErrNoEvents
    }
}

lavoiesl avatar May 16 '22 15:05 lavoiesl

One problem with having the Go consumer pre-pull a message from librdkafka and then put it in a Go channel is that it lives in a twilight zone;

  • from librdkafka's perspective the message has been delivered to the application - which will affect things like last poll time, number of pre-fetched message thresholds, etc.
  • from the application perspective the message does not yet exist
  • from the Go consumer's perspective it has no control of the message since it is on the channel which only the application may read.

This complicates things such as offset store&commit, as well as rebalances and termination.

What I'd like to see is a direct API that pulls a message directly from librdkafka and passes it to the application.

edenhill avatar May 16 '22 15:05 edenhill

Once a message has been polled and is in Go, is it automatically marked as delivered? Is there a way to keep it in a twilight state or unmark it?

Because if that's possible, the channel can be unbuffered and do something like:

func (h *handle) consumeEvents() {
    for {
        event := h.poll()

        select {
        case <-h.termChan: // check if closed
            // unack
            return
        case h.events <- event: // hang until it's consumed
            // ack
        }
    }
}

lavoiesl avatar May 16 '22 16:05 lavoiesl

Closing this as Channel based consumer is deprecated in the latest release (v2.0.2)

milindl avatar Feb 28 '23 04:02 milindl