kafka-go icon indicating copy to clipboard operation
kafka-go copied to clipboard

Can a standalone consumer consume multiple topics?

Open ppd0705 opened this issue 2 years ago • 18 comments

I want to subscribe multi topics with a standalone consumer. Does it seem like both Reader and Conn don‘t support this situation?

ppd0705 avatar Jun 07 '22 06:06 ppd0705

see see this example

kscooo avatar Jun 13 '22 13:06 kscooo

see see this example

this example is for consumer group, not for standalone consumer

ppd0705 avatar Jun 13 '22 13:06 ppd0705

see see this example

this example is for consumer group, not for standalone consumer

try https://github.com/segmentio/kafka-go/blob/v0.4.32/conn.go#L962 ?

kscooo avatar Jun 13 '22 14:06 kscooo

try https://github.com/segmentio/kafka-go/blob/v0.4.32/conn.go#L962 ?

thank you. this method gets partition info for the given topics, not subscribes the given topics.

ppd0705 avatar Jun 13 '22 14:06 ppd0705

https://github.com/segmentio/kafka-go/blob/v0.4.32/reader.go#L363

kscooo avatar Jun 14 '22 08:06 kscooo

https://github.com/segmentio/kafka-go/blob/v0.4.32/reader.go#L363

// GroupTopics allows specifying multiple topics, but can only be used in // combination with GroupID, as it is a consumer-group feature.

ppd0705 avatar Jun 14 '22 10:06 ppd0705

Can't you set the GroupID?

kscooo avatar Jun 14 '22 12:06 kscooo

haha, I don‘t want use consumer group

ppd0705 avatar Jun 14 '22 12:06 ppd0705

I feel that kafka, unlike redis pub/sub, still places more emphasis on the concept of groups

kscooo avatar Jun 14 '22 12:06 kscooo

thank for your reply. I am using kafka for the first time

ppd0705 avatar Jun 14 '22 14:06 ppd0705

Hello @ppd0705

I believe you would have to create separate readers if you want to consume from multiple topics and manage the offsets yourself.

achille-roussel avatar Jun 17 '22 17:06 achille-roussel

Hi @achille-roussel

In my situation, because messages are time-sensitive, so I only need to consume messages from the latest offset.

ppd0705 avatar Jun 17 '22 23:06 ppd0705

Hi @achille-roussel

In my situation, because messages are time-sensitive, so I only need to consume messages from the latest offset.

I don't know how your code architecture is, it feels like you can open multiple goroutines to consume multiple topics in parallel, with each reader set to read from the latest offset, after all, goroutines are very lightweight

kscooo avatar Jun 18 '22 08:06 kscooo

@kscooo Hi! https://pkg.go.dev/github.com/segmentio/kafka-go#example-Generation.Start-ConsumerGroupParallelReaders
I have a long timeout(40s) before commit and more than one consumer(1 topic 2 partitions 2 consumers) and i have rebalancing is not correct. What should I do to fix the situation?

bundleman avatar Aug 17 '22 15:08 bundleman

@achille-roussel Hi. I have multiple topics with multiple partitions. What approach can I use so that both topics and their partitions are processed in parallel, given that the delay in processing and messages can be long?

bundleman avatar Aug 17 '22 15:08 bundleman

@kscooo Hi! pkg.go.dev/github.com/segmentio/kafka-go#example-Generation.Start-ConsumerGroupParallelReaders I have a long timeout(40s) before commit and more than one consumer(1 topic 2 partitions 2 consumers) and i have rebalancing is not correct. What should I do to fix the situation?

I am not for the maintainer, so I am not very clear about the code details. You might want to take a deeper look at the relevant configuration(like Balancers ), which may help you, on a side note, the comments on this library are written very well

kscooo avatar Aug 18 '22 03:08 kscooo

@kscooo Hi! pkg.go.dev/github.com/segmentio/kafka-go#example-Generation.Start-ConsumerGroupParallelReaders I have a long timeout(40s) before commit and more than one consumer(1 topic 2 partitions 2 consumers) and i have rebalancing is not correct. What should I do to fix the situation?

I am not for the maintainer, so I am not very clear about the code details. You might want to take a deeper look at the relevant configuration(like Balancers ), which may help you, on a side note, the comments on this library are written very well

Thanks for your reply. Yes I use this option with the RoundRobin balancer

bundleman avatar Aug 18 '22 06:08 bundleman

Hello @bundleman

Was the suggestion to use kafka.GroupBalancer on the consumer group useful to address the original issue you reported?

achille-roussel avatar Sep 16 '22 16:09 achille-roussel

I'm going to close this issue due to inactivity, assuming the original question has been answered. Feel free to reopen if it needs further discussion!

achille-roussel avatar Jan 27 '23 18:01 achille-roussel