scalatest-embedded-kafka icon indicating copy to clipboard operation
scalatest-embedded-kafka copied to clipboard

Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic

Open gyoho opened this issue 7 years ago • 3 comments

Firstly, thanks for developing such a nice tool very useful for Kafka development.

I'm using the withRunningKafka method to test publishing to an embedded Kafka instance using ReactiveKafka.

  "KafkaEvnetSink" should {
    "streams event to kafka" in withRunningKafka {
      val kafkaConfig = EmbeddedKafkaConfig()
      val brokerList = s"localhost:${kafkaConfig.kafkaPort}"
      val topic = "abc"
      val sink = Sink.fromSubscriber(new ReactiveKafka().publish(ProducerProperties(brokerList, topic, new StringEncoder())))

      Source(1 to 10).map(_.toString).to(sink).run()
      consumeFirstStringMessageFrom(topic) shouldBe "1"
    }

The above test throws this error: 15:08:12.857 ERROR k.producer.async.DefaultEventHandler - Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: abc

It seems that this is an issue of port and host according this SO post.

Is this a bug or am I using it wrong?

gyoho avatar Feb 25 '18 23:02 gyoho

If you swap localhost with 127.0.0.1 what happens?

manub avatar Feb 26 '18 10:02 manub

Unfortunately, same error.

gyoho avatar Feb 26 '18 17:02 gyoho

If you look at the post, the OP managed to fix it by adding the advertised.port property. You can add custom properties to brokers/producers/consumers by using the EmbeddedKafkaConfig class. Give it a go.

manub avatar Mar 05 '18 21:03 manub