franz-go icon indicating copy to clipboard operation
franz-go copied to clipboard

fetch using topic id

Open arjunnair1997 opened this issue 1 year ago • 3 comments

Hello,

I'm trying to use the franz go kafka client to consume records from a source cluster. I'm using a direct consumer, because the workload is to consume a list of arbitrary (topic id, partition id, offset) periodically.

I'm trying to achieve this by keeping a single client alive. When it receives the list of (topic id, partition id, offset) to consume from, it performs remove consume topic partitions using RemoveConsumePartitions or even PurgeTopicsFromClient, and adds the new consume topic partitions using AddConsumePartitions.

However, the above APIs use the topic name string and not the topic ids. I think I can make my use case of ensuring I'm fetching from the correct topic work if the FetchTopic struct also has the topic id in it and it's returned from the PollRecords function with the topic ids also set.

I think it's possible to do this by:

  1. Adding topic id field to FetchTopic.
  2. Adding a config to indicate that the topic id should be set in FetchTopic.
  3. Setting the topic id in the FetchTopic upon fetches(I think this is possible for request versions >= 13?)

What do you think? If you're open to this, then I can implement this.

arjunnair1997 avatar Jul 24 '24 18:07 arjunnair1997

I'm on board, though I'd skip (2).

I think this is the full patch:

diff --git a/pkg/kgo/record_and_fetch.go b/pkg/kgo/record_and_fetch.go
index 4f1ebe6f..55315644 100644
--- a/pkg/kgo/record_and_fetch.go
+++ b/pkg/kgo/record_and_fetch.go
@@ -274,6 +274,8 @@ func (p *FetchPartition) EachRecord(fn func(*Record)) {
 type FetchTopic struct {
 	// Topic is the topic this is for.
 	Topic string
+	// TopicID is the ID of the topic, if your cluster supports topic IDs.
+	TopicID [16]byte
 	// Partitions contains individual partitions in the topic that were
 	// fetched.
 	Partitions []FetchPartition
@@ -560,6 +562,7 @@ func (fs Fetches) EachTopic(fn func(FetchTopic)) {
 	for topic, partitions := range topics {
 		fn(FetchTopic{
 			topic,
+			[16]byte{},
 			partitions,
 		})
 	}
diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go
index 7a55ef0c..cddb4189 100644
--- a/pkg/kgo/source.go
+++ b/pkg/kgo/source.go
@@ -979,6 +979,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe

 		fetchTopic := FetchTopic{
 			Topic:      topic,
+			TopicID:    rt.TopicID,
 			Partitions: make([]FetchPartition, 0, len(rt.Partitions)),
 		}

I can push this after I cut a patch release. There aren't many new features at the moment, so I may wait on releasing this for a little bit -- is that alright? (potentially a month?)

twmb avatar Jul 29 '24 04:07 twmb

Hey, that's great, thanks. If you decide to merge it in now, I can pin franz-go master, and start using it. Otherwise, I can fork franz go for a bit until you decide to merge it in.

arjunnair1997 avatar Jul 29 '24 14:07 arjunnair1997

I was burned in the past by merging a feature then having to revert it to issue a quick bugfix release; if you're open to using the branch for now, please do that 😅

I'll leave this issue open, it'll be closed once I merge the PR -- which I'll do right as I'm about to cut a release.

twmb avatar Jul 29 '24 14:07 twmb