kafka-go icon indicating copy to clipboard operation
kafka-go copied to clipboard

TopicReader: A reader for all partitions of a topic without using a consumer group

Open fvosberg opened this issue 4 years ago • 12 comments

Describe the solution you'd like

Now and then, I need to read messages from a topic with no need for consumer groups, because we want to:

  • Have all instances of a service consume all partitions
  • Want to build a little in memory projection of this topic while booting the service (so no persistent offset management is needed)

Most of the time, we are implementing something like this:

func InitTopicReader(cfg kafka.ReaderConfig) (TopicReader, error) {
   // make a connection to the first broker
  // get alle partitions for the topic
  // init a reader for each partition
  // fanInMessages and errors
}

My questions regarding this:

  • Are you interested in this feature? I would be happy to help
  • How should rebalancing be handled in this screnario?
  • What functions should it support?

I think these would be useful:

func (r *TopicReader) FetchMessage(context.Context) (Message, error)
func (r *TopicReader) ReadLag(context.Context) (int64, error)
func (r *TopicReader) Lag() (int64)
func (r *TopicReader) ReadPartitionLag(context.Context, int) (int64, error)
func (r *TopicReader) PartitionLag(int) int64
func (r *TopicReader) Partitions() ([]Partition, error)
func (r *TopicReader) Stats() ReaderStats
func (r *TopicReader) Close() error

Cheers Fredi

fvosberg avatar Dec 14 '19 14:12 fvosberg

Hi there, I'm probably missing some context but is there a reason you want to avoid reading from a consumer group?

nlsun avatar Sep 30 '20 01:09 nlsun

Hi,

yes, I've tried to point the reasons out above, but I'm not a native speaker so let me try again:

We are using kafka as our main persistence. The services, which don't need a separate persistence, read the data they need on startup. This is fast and migrations of data schemas are easy to deploy. The services use an ephemeral database or use the data just in memory. So:

  • every instance of the service has to read all partitions (so if you want to use a consumer group, every instance would have to need one)
  • the offsets should not be persisted, as a restart should reset the offsets

Am I missing another option (I don't want a consumer group for every lifecycle of an instance and have to have so many stale consumer groups in my brokers)

Cheers

fvosberg avatar Sep 30 '20 05:09 fvosberg

One option is to pin a consumer group to each instance and reset the offset of the consumer group upon startup of the instance. Although this has other complications like how to associate a consumer group without an external datastore or hardcoding.

We don't currently see enough immediate value in using kafka as a database for us to support it here. However, if you're interested, you could publish your implementation (https://github.com/segmentio/kafka-go/pull/522) as a public repo and we could advertise it in the README to gauge interest, how does that sound?

nlsun avatar Sep 30 '20 18:09 nlsun

Hi nlsun,

thanks for the reply. So just to be sure: You don't see the benefit of this feature to be big enough to invest maintaining it? I have some other ideas aswell, so implementing as a wrapper for kafka-go would be a nice option, and if there was enough interest, we would be able to merge it later, right? Or are you saying, that you as segment.io are not needing this feature, so you are not interested in having it as a part of kafka-go?

Anyway. If I implement it in an own library, it would be very nice to have it promoted, but even nicer would be feedback on my implementation. I could do the heavy lifting, but getting some feedback on edgecases, internals and implications would be very nice.

Thanks a lot

fvosberg avatar Sep 30 '20 20:09 fvosberg

Yup speaking about the feature not Segment, we don't have much insight into this use case, if it turns out to be a common use case we could consider merging it to avoid having users take a second dependency

nlsun avatar Sep 30 '20 21:09 nlsun

@fvosberg Hi there, any updates on this?

nlsun avatar Jan 15 '21 19:01 nlsun

Yup speaking about the feature not Segment, we don't have much insight into this use case, if it turns out to be a common use case we could consider merging it to avoid having users take a second dependency

For example we want to re-consume the messages between two different time, so I need call the SetOffsetAt method, but this method did not work when using consumer group. I think this is a common use case. :)

rockxsj avatar Apr 27 '21 07:04 rockxsj

This is a very common use case :) I'd love to see this available.

calvinbrown085 avatar Jul 21 '21 15:07 calvinbrown085

Hi @rockxsj @calvinbrown085 , I was thinking about this more, but is there a reason a consumer group is not sufficient for this use case?

It appears to me that this would be similar to manually maintaining a consumer group, with additional effort required around handling adding/removing partitions (which would normally be handled by a consumer group).

nlsun avatar Jul 28 '21 22:07 nlsun

@nlsun It seems that, 'sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning', Is there a plan to realize it? Thank you

dairongpeng avatar Mar 17 '23 03:03 dairongpeng

Hi @dairongpeng, have you tried using a consumer group for this use case?

nlsun avatar Mar 20 '23 19:03 nlsun