confluent-kafka-go
confluent-kafka-go copied to clipboard
anything similar to endOffsets in confluent-kafka-go?
Description
Do we have anything similar to the below in Go to get the endOffsets? java public Map<TopicPartition,Long> endOffsets(Collection<TopicPartition> partitions)
I was able to get the end offsets by trying out the below method, invoking it using the setting times.Offset=OffsetEnd. Is this by design? If so, Can I use OffsetsForTimes to get the endOffsets by setting times.Offset to OffsetEnd ?
func (c *Consumer) OffsetsForTimes(times []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error)
How to reproduce
Checklist
Please provide the following information:
- [ ] confluent-kafka-go and librdkafka version (
LibraryVersion()
): - [ ] Apache Kafka broker version:
- [ ] Client configuration:
ConfigMap{...}
- [ ] Operating system:
- [ ] Provide client logs (with
"debug": ".."
as necessary) - [ ] Provide broker log excerpts
- [ ] Critical issue
https://docs.confluent.io/current/clients/confluent-kafka-go/index.html#Producer.QueryWatermarkOffsets
Thanks. This is good but one call per partition. If I have 100 topics with 10 partitions each, then I will end up making 1000 calls.
The behavior of OffsetsForTimes when passed offset as OffsetEnd to return the next offset for a partition-- was this by design or it just works by accident that way? I don't see it documented so was hesitant to use it.
I want to read from the last offset, but the seek function does not take the offset as input parameter.
I don't understand how do I seek to the latest offset I got from queryWatermarkOffsets
.
Can someone please advise on a way to either seek to the latest offset or set it in the partition or something?
Thanks
hey @evgeniyasti, one chance is to directly assign to an arbitrary offset on each partition. For this use Consumer.Assign(). If you want to assign to the latest offsets, use the "OffsetEnd" constant. Like this:
c.Assign([]kafka.TopicPartition{{Topic: &myTopicName, Partition: 0, Offset: kafka.OffsetEnd}})
In the latest versions, the AdminAPI can be used for this.
ListOffsets(
ctx context.Context, topicPartitionOffsets map[TopicPartition]OffsetSpec,
options ...ListOffsetsAdminOption)
For more information on the usage, see the example .