fs2-kafka
fs2-kafka copied to clipboard
How do I commit offsets?
I found that after running a stream to get a nonempty collection of messages, the offsets weren't incremented. Is there a way in this client to commit offsets while the stream is running?
I used code something like the following, which threw NoOffsetForPartitionException
. The groupIds on the clients are the same , and there was only one partition.
val topicPartition = new TopicPartition(topic, 0)
val receivedMessages = t.getKafkaMessages[IO]().take(messages.size).compile.toVector.unsafeRunSync()
val consumerAfter =
new KafkaConsumer(
Map[String, AnyRef](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers,
ConsumerConfig.RETRY_BACKOFF_MS_CONFIG -> "1000",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "false",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "none",
ConsumerConfig.GROUP_ID_CONFIG -> "increments-offsets"
).asJava,
new ByteArrayDeserializer(),
Generic.kafkaDeserializer
)
consumerAfter.assign(List(topicPartition).asJava)
val endOffset = consumerAfter.position(topicPartition)
@shawjef3 we do not commit any offset. The client form fs-kafka is not durable, so you need to commit offsets in different persistent store, or eventually, you may want to use compacting kafka topic to do so.