scalatest-embedded-kafka icon indicating copy to clipboard operation
scalatest-embedded-kafka copied to clipboard

Ask for committed offsets

Open Flamma opened this issue 6 years ago • 0 comments

I want to test if an offset is committed or not (because only commit in some situations). I have not found any way to test that.

I'm very newbie at this, so probably there's something I just don't get.

I'm using akka kafka streams. Here's an example of a test that is working.

  "KafkaSource" should {
    "consume from a kafka topic and pass the message " in {
      val commitToKafka = true
      val key = "key".getBytes
      val message = "message".getBytes

      withRunningKafka {

        val source = getKafkaSource(commitToKafka)
        val (_, sub) = source
          .toMat(TestSink.probe[CommittableMessage[Array[Byte], Array[Byte], ConsumerMessage.CommittableOffset]])(Keep.both)
          .run()

        val messageOpt = publishAndRequestRetry(topic, key, message, sub, retries)
        messageOpt should not be empty
        messageOpt.get.value shouldBe message
      }
    }

publishAndRequestRetry uses publishToKafka to put a message and wait for the source to get it.

I want to add a test to know if the offset has been commited or not. Is this possible with EmbeddedKafka?

Flamma avatar Dec 21 '18 10:12 Flamma