confluent-kafka-go icon indicating copy to clipboard operation
confluent-kafka-go copied to clipboard

anything similar to endOffsets in confluent-kafka-go?

Open dev501-code opened this issue 5 years ago • 5 comments

Description

Do we have anything similar to the below in Go to get the endOffsets? java public Map<TopicPartition,Long> endOffsets(Collection<TopicPartition> partitions)

I was able to get the end offsets by trying out the below method, invoking it using the setting times.Offset=OffsetEnd. Is this by design? If so, Can I use OffsetsForTimes to get the endOffsets by setting times.Offset to OffsetEnd ?

func (c *Consumer) OffsetsForTimes(times []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error)

How to reproduce

Checklist

Please provide the following information:

  • [ ] confluent-kafka-go and librdkafka version (LibraryVersion()):
  • [ ] Apache Kafka broker version:
  • [ ] Client configuration: ConfigMap{...}
  • [ ] Operating system:
  • [ ] Provide client logs (with "debug": ".." as necessary)
  • [ ] Provide broker log excerpts
  • [ ] Critical issue

dev501-code avatar Nov 04 '19 05:11 dev501-code

https://docs.confluent.io/current/clients/confluent-kafka-go/index.html#Producer.QueryWatermarkOffsets

mhowlett avatar Nov 04 '19 05:11 mhowlett

Thanks. This is good but one call per partition. If I have 100 topics with 10 partitions each, then I will end up making 1000 calls.

The behavior of OffsetsForTimes when passed offset as OffsetEnd to return the next offset for a partition-- was this by design or it just works by accident that way? I don't see it documented so was hesitant to use it.

dev501-code avatar Nov 04 '19 18:11 dev501-code

I want to read from the last offset, but the seek function does not take the offset as input parameter. I don't understand how do I seek to the latest offset I got from queryWatermarkOffsets. Can someone please advise on a way to either seek to the latest offset or set it in the partition or something? Thanks

evgeniyasti avatar Nov 13 '19 16:11 evgeniyasti

hey @evgeniyasti, one chance is to directly assign to an arbitrary offset on each partition. For this use Consumer.Assign(). If you want to assign to the latest offsets, use the "OffsetEnd" constant. Like this: c.Assign([]kafka.TopicPartition{{Topic: &myTopicName, Partition: 0, Offset: kafka.OffsetEnd}})

spikepanx avatar Feb 02 '20 14:02 spikepanx

In the latest versions, the AdminAPI can be used for this.

ListOffsets(
	ctx context.Context, topicPartitionOffsets map[TopicPartition]OffsetSpec,
	options ...ListOffsetsAdminOption)

For more information on the usage, see the example .

milindl avatar Mar 05 '24 13:03 milindl