scalatest-embedded-kafka icon indicating copy to clipboard operation
scalatest-embedded-kafka copied to clipboard

can't connect to embedded server message

Open darthmiguel opened this issue 7 years ago • 1 comments

Hi I am using this code to test that my Kafka Producer is correctly writing to kafka

describe("publish message to kafka topic") { it("should publish the message to kafka topic") {

  implicit val config = EmbeddedKafkaConfig(kafkaPort = 6666)
  implicit  val deserializer = new StringDeserializer
  val message = "this is a test message"
  withRunningKafka {
    KP.configure("localhost:6666")
    KP.send("test", message)
    consumeFirstMessageFrom("test") should be (message)
  }
}

}

My KP class has a method that configures the producer, in this case bootstrapserver: localhost, port 6666. And the send method receives the topic and the message

The test passes but in the console log I get this error

17:32:04.512 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network.Selector - [Producer clientId=producer-1] Connection with localhost/127.0.0.1 disconnected java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50) at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:106) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:444) at org.apache.kafka.common.network.Selector.poll(Selector.java:398) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) at java.lang.Thread.run(Thread.java:748)

Any insights on why is this happening? When I changed the port on my KP.configure("localhost:12345" then the test really failed.

darthmiguel avatar Feb 21 '18 22:02 darthmiguel

if you try using EmbeddedKafka.start() and .stop() do you have the same result? I think the issue here is that there's an active connection while the server shuts down.

manub avatar Mar 05 '18 21:03 manub