fs2-kafka icon indicating copy to clipboard operation
fs2-kafka copied to clipboard

How do I commit offsets?

Open shawjef3 opened this issue 6 years ago • 1 comments

I found that after running a stream to get a nonempty collection of messages, the offsets weren't incremented. Is there a way in this client to commit offsets while the stream is running?

I used code something like the following, which threw NoOffsetForPartitionException. The groupIds on the clients are the same , and there was only one partition.

      val topicPartition = new TopicPartition(topic, 0)

      val receivedMessages = t.getKafkaMessages[IO]().take(messages.size).compile.toVector.unsafeRunSync()

      val consumerAfter =
        new KafkaConsumer(
          Map[String, AnyRef](
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers,
            ConsumerConfig.RETRY_BACKOFF_MS_CONFIG -> "1000",
            ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "false",
            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "none",
            ConsumerConfig.GROUP_ID_CONFIG -> "increments-offsets"
          ).asJava,
          new ByteArrayDeserializer(),
          Generic.kafkaDeserializer
        )
      consumerAfter.assign(List(topicPartition).asJava)
      val endOffset = consumerAfter.position(topicPartition)

shawjef3 avatar Mar 12 '18 16:03 shawjef3

@shawjef3 we do not commit any offset. The client form fs-kafka is not durable, so you need to commit offsets in different persistent store, or eventually, you may want to use compacting kafka topic to do so.

pchlupacek avatar Oct 07 '18 17:10 pchlupacek