Protocol adapter shutdown: Cannot perform [send] operation after producer has been closed
When using Kafka-based messaging: On protocol adapter shutdown, there may be errors like these getting logged:
WARN org.eclipse.hono.tracing.TracingHelper
An unexpected error occurred! [logged on span [8df2f9a8fbb2799c:33ca4d94d9776346:8ceb03ecb09127ef:1 - forward Event]]
java.lang.IllegalStateException: Cannot perform operation after producer has been closed
at org.apache.kafka.clients.producer.KafkaProducer.throwIfProducerClosed(KafkaProducer.java:892)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:901)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:885)
at io.vertx.kafka.client.producer.impl.KafkaWriteStreamImpl.lambda$send$4(KafkaWriteStreamImpl.java:76)
at io.vertx.core.impl.ContextImpl.lambda$null$0(ContextImpl.java:159)
at io.vertx.core.impl.AbstractContext.dispatch(AbstractContext.java:100)
at io.vertx.core.impl.ContextImpl.lambda$executeBlocking$1(ContextImpl.java:157)
at io.vertx.core.impl.TaskQueue.run(TaskQueue.java:76)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Unknown Source)
As far as I see, before sending a record via KafkaProducer hono checks that lifecycle status is 'started' - hence the component hasn't received stopped event yet and the Hono hasn't called KafkaProducer.close. However, it seems to me that, the thread model of vertx KafkaProducer implies that if hono component call send-then-close the send won't fail (at least not with that stack trace). I see that Vertx kafka producer uses to register close hooks for vertx/context close/remove. So, maybe these hooks close producers on shutdown even before Hono components have got stop event.
just fyi running:
final Vertx vertx = Vertx.vertx();
((VertxInternal)vertx).addCloseHook(p -> {
System.out.println("[vertx hook before] vert closed");
p.complete();;
});
vertx.deployVerticle(new AbstractVerticle() {
@Override
public void stop(final Promise<Void> stopPromise) throws Exception {
System.out.println("[stop] stop verticle");
super.stop(stopPromise);
}
}).onComplete(v -> {
((VertxInternal)vertx).addCloseHook(p -> {
System.out.println("[vertx hook after] vert closed");
p.complete();;
});
vertx.close();
});
produces for me:
[vertx hook before] vert closed
[vertx hook after] vert closed
[stop] stop verticle
so it seems that Vertx close handlers are called before stopping verticles. For context close hooks it seems that verticle stop is called before them.