reactor-kafka
reactor-kafka copied to clipboard
Add JVM shutdown hook to KafkaReceiver to close KafkaConsumer
When the JVM receives a SIGINT,
the KafkaReceiver does NOT close the underlying KafkaConsumer to prevent resource leaks.
Is that a typical arrangement @garyrussell @artembilan ?
If the kafka-client (KafkaConsumer) doesn't install such a hook and puts the responsibility on the user, doesn't it make sense that the same would be required for KafkaSender (and its #close() method)?
Well, Spring for Apache Kafka project is fully rely on the ApplicationContext lifecycle.
So, whatever calls its close() would initiate the underlying KafkaConsumer.close().
Doesn't look like Spring adds such a hook by default: https://docs.spring.io/spring-framework/docs/current/reference/html/core.html#beans-factory-shutdown. But at least there is such a API for end-user consideration.
Looks like Spring Boot has its own opinion: https://github.com/spring-projects/spring-boot/blob/main/spring-boot-project/spring-boot/src/main/java/org/springframework/boot/SpringApplication.java#L203.
But anyway. If we expose something like registerShutdownHook() on the KafkaReceiver and KafkaSender, that should be enough for end-user requirements.
I am using Reactor Kafka in Spring Boot Webflux environment.
So, I am expecting that spring-managed KafkaReceiver beans should be closed on SIGINT. Like it happens for JDBC or MongoDB connections.
But, Spring only closes KafkaSender connections. Maybe if KafkaReceiver implements the Closeable interface or close(), spring's shutdown hook might kick in.
OK. I see. Right, the KafkaReceiver (neither its impl) has a close() API, unlike KafkaSender does.
At the same time I see that ConsumerHandler has one, but does not delegate to its Consumer.close().
ConsumerHandler invokes consumerEventLoop.stop() which schedules a CloseEvent which closes the consumer.
It looks to me like the handler.close() will be invoked when the receiver.receive() flux completes; it is passed into the usingWhen as the asyncCleanUp argument.
What am I missing? Isn't it the application's responsibility to complete the flux?
I have a question. When exactly does the KafkaReceiver.receive() completes? I think, it runs indefinitely unless someone uses the Disposable.dispose() method returned by Flux.subscribe().
My proposal is,
expose a close() method in KafkaReceiver which will complete the KafkaReceiver.receive() flux and close the KafkaConsumer as well.
Also, KafkaSender has a close() but it doesn't extend the Closeable or AutoCloseable. It would be better if it extends the AutoCloseable so that it can be used with try-with-resources
Your application bean could implement AutoCloseable and call dispose().
Also, KafkaSender has a close() but it doesn't extend the Closeable or AutoCloseable. It would be better if it extends the AutoCloseable so that it can be used with try-with-resources
Agreed.
Your application bean could implement
AutoCloseableand call dispose().
Yes, currently my component class implements the DisposableBean and, in the destroy() method, I am calling the dispose() method.
But the problem is, the subscribers of the KafkaReceiver.receive() receive an error signal of CancellationException when dispose() is called. That's why I am proposing this:
My proposal is, expose a close() method in KafkaReceiver which will complete the KafkaReceiver.receive() flux and close the KafkaConsumer as well.
Also, KafkaSender has a close() but it doesn't extend the Closeable or AutoCloseable. It would be better if it extends the AutoCloseable so that it can be used with try-with-resources
Isn't there an inherent risk here, because the class is reactive and thus non-blocking?
A try-with-resource block would only make sense if you block inside, which is generally discouraged.
Worse, if you don't block but still use the KafkaSender inside a twr, it will get closed too early.
I would be vary of such a change.
(exposing a close() method where it is missing makes perfect sense though)
The return type of KafkaSender.close() is void not Mono<Void> . So, it must be a blocking operation right?
In regarding to try-with-resources Imagine the following piece of code
try(KafkaSender sender = KafkaSender.create(senderOptions)) {
sender.send(recordsPublisher)
.subscribe();
}
When the close() call is made, does the KafkaSender finishes the task of pending messages to send or discards them and closes the KafkaProducer?
in that particular case, it potentially wouldn't even get a chance to deal with the recordsPublisher at all. I'm not sure of the exact behavior but I would expect the KafkaSender would terminate that subscription with an onError stating that the KafkaSender has been closed. Since .subscribe() is being used (without an error handler), the default behavior of reactor would then consist in logging this onError.
If the close() happens fast enough, it could trigger before the send, and so from the perspective of the KafkaSender there would be no pending messages (since at this point the recordsPublisher wouldn't even be subscribed, or it could have been cancelled).
Hi! I am using reactor-kaka with spring boot and running into a similar issue. I am calling dispose() on shutdown and this eventually calls ConsumerEventLoop#stop(). However, this isn't closing the underlying KafkaConsumer. There is a call to wakeup().
Is this expected? I added a debugger to see if the close() is invoked on the consumer, but it doesn't look like it is getting called. Since this consumer is wrapped by KafkaReceiver, I don't see a way how spring could automatically handle lifecycle for it either...
@aritrachatterjee15 stop() schedules a CloseEvent which performs the close().
There is a test here:
https://github.com/reactor/reactor-kafka/blob/f74cd96460b3b069917b82ed96580edfb0090ebd/src/test/java/reactor/kafka/receiver/internals/MockReceiverTest.java#L125-L145
Using reactor-kafka 1.3.18 in a Scala settings. Basically converting the eventual consumer Flux into a plain Publisher and let the streaming happen through FS2, which is a streaming library.
Getting a hold of Disposable is not feasible here, since I am not subscribing to the upstream, just passing it along as a plain publisher.
When a cancelation signal is received, the stream will use the Subscription instance to cancel, which I can verify since I see the kafka consumer canceled log message. Still, KafkaReceiver doesn't stop. So, does KafkaReceiver honor cancel signal at all?
Here is a snippet for reference:
val receiver =
KafkaReceiver
.create(receiverOptions(group, topics: _*))
val upstream =
receiver
.receiveAutoAck()
.concatMap(identity(_))
.retryWhen(Retry.backoff(retryMax, retryDelay).transientErrors(true))
.repeat
val stream: Stream[IO, ConsumerRecord[String, Array[Byte]]] =
fs2.interop.flow
.fromPublisher[IO](FlowAdapters.toFlowPublisher(upstream), bufferSize)
.evalFilter(safeAccept)
if (asyncSize == 1)
stream.evalMap(safeOnRecord).compile.drain.cancelable {
IO.blocking {
logger.info("kafka consumer canceled")
}
}
else
stream.parEvalMap(asyncSize)(safeOnRecord).compile.drain.cancelable {
IO.blocking {
logger.info("kafka consumer canceled")
}
}