kotlin-kafka
kotlin-kafka copied to clipboard
[DOCS] Using KafkaReceiver with Ktor Server (cancellation, and terminal events)
Hi,
I'm trying to user KafkaReceiver inside Ktor 2. I'm not quite sure what is the best way to start KafkaReciever with Ktor. What I did was to create an application plugin like the following and install it in Ktor engine.
import io.github.nomisRev.kafka.map
import io.github.nomisRev.kafka.receiver.AutoOffsetReset
import io.github.nomisRev.kafka.receiver.KafkaReceiver
import io.github.nomisRev.kafka.receiver.ReceiverSettings
import io.ktor.server.application.*
import io.ktor.server.application.hooks.*
import io.ktor.server.config.*
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.config.SaslConfigs
import java.util.*
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.ByteArrayDeserializer
@JvmInline
value class Key(val key: String)
@JvmInline
value class Message(val content: String)
val EntityUpdateEventPlugin = createApplicationPlugin(name = "EntityUpdateEventPlugin") {
on(MonitoringEvent(ApplicationStarted)) { application ->
val topicName = "odm-entity-update-event"
val environment = application.environment
val bootstrapServers = environment.config.property("kafka.bootstrapServers").getString()
val kafkaPropertiesConfig = environment.config.config("kafka.properties")
val receiverProperties = Properties().apply {
kafkaPropertiesConfig.propertyOrNull(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)?.let {
this[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = it.getString()
}
kafkaPropertiesConfig.propertyOrNull(SaslConfigs.SASL_JAAS_CONFIG)?.let {
this[SaslConfigs.SASL_JAAS_CONFIG] = it.getString()
}
kafkaPropertiesConfig.propertyOrNull(SaslConfigs.SASL_MECHANISM)?.let {
this[SaslConfigs.SASL_MECHANISM] = it.getString()
}
}
runBlocking(Dispatchers.Default) {
coroutineScope {
launch(Dispatchers.IO) {
val settings: ReceiverSettings<Key, Message> = ReceiverSettings(
bootstrapServers,
StringDeserializer().map(::Key),
ByteArrayDeserializer().map {
Message(String(it))
},
groupId = "my-event-group",
autoOffsetReset = AutoOffsetReset.Earliest,
properties = receiverProperties
)
KafkaReceiver(settings)
.receive(topicName)
.map { "${it.key()} -> ${it.value()}" }
.collect {
application.log.info(it)
}
}
}
}
}
}
I seems the receiver starts and collects the messages. The question is how can we stop the receiver when Ktor shutdowns. When I stop the Ktor application it seems stuck and I have to kill the process.
I found a way to make it work. It seems runBlocking blocks the event loop of Ktor engine. The receiver and Ktor route handlers works fine if it is started in the following way:
on(MonitoringEvent(ApplicationStarted)) { application ->
// .. code to initialise the receiver
CoroutineScope(Dispatchers.IO).launch {
receiver
.receive("my-topic")
.collect {
application.log.info("${it.key()} -> ${it.value()}")
it.offset.acknowledge()
}
}
}
I don't know which is the best way to start a receiver in Ktor.
@nomisRev Can you recommend or give us an example how to use it with Ktor?
Regards
Hey @ageorgousakis,
Thanks for your interest in the library, and opening a ticket! 🙏
As you've show there already, the returning Flow is coupled to it's surrounding CoroutineScope.
The simplest way to running a Flow for the same length as the server is to use launchIn with Application.
So to provide a small snippet:
val flow: Flow<Unit> = receiver
.receive("my-topic")
.map { // <-- changed collect to map, so the result is still a Flow
application.log.info("${it.key()} -> ${it.value()}")
it.offset.acknowledge()
}
val application: Application = TODO("This is the Application from Ktor")
flow.launchIn(application)
The Ktor Application implements CoroutineScope, and it cancels the CoroutineScope when the server is cancelled.
So it will also cancel your Flow.
NOTE: I've run into some issues where Ktor doesn't properly cancel on SIGINT from K8S for example, and I've therefor build suspendapp.
SuspendApp With Ktor and K8S
SuspendApp with Kafka.
Alternatively, you could also use ApplicationEngine#addShutdownHook and use it to manually control a CoroutineScope.
The take-away is the the Flow cancels with the CoroutineScope it's called from.
Hope that helps @ageorgousakis ! Let's leave this issue open, so it can serve as reminder to include this information it in the documentation 👍