vertx-lang-kotlin
vertx-lang-kotlin copied to clipboard
SetPeriodic and Eventbus do not have await suspending bindings
Version
3.9.1 as well as 4 milestone-5
Context
Vert.x Kotlin lang provides suspending await functions for most APIs - such as http/tcp server - filesystem etc.
but for set periodic or eventbus handlers - they are not marked as suspending functions - so I can't use the await bindings inside
Do you have a reproducer?
fun main() {
val vertx = Vertx.vertx()
vertx.setPeriodic(5000) {
// can't use writeFileAwait in here
vertx.fileSystem().writeFile("./file.txt", "somedata".toBuffer()) { }
}
}
Same thing with vertx.executeBlockingAwait<JsonObject> { // can't call suspending functions inside here }
I can't call suspending functions inside that handler
I know the eventbus can be converted to a channel but it would still be great if I could do something like https://github.com/vert-x3/vertx-examples/blob/master/web-examples/src/main/java/io/vertx/example/web/angular_realtime/Server.java#L45
and have a suspend function as the handler
I think it might be because the handler interface is not a suspend function and may require a lot of work?
@vietj @jponge
That's exactly what I was looking for! Maybe an extension function would help.
E.g. as an extension function of MessageConsumer
:
fun <T> MessageConsumer<T>.coroutineHandler(context: Context, coroutineHandler: suspend (Message<T>) -> Unit) {
handler {
CoroutineScope(context.dispatcher()).launch {
coroutineHandler(it)
}
}
}
where the caller side would look like this:
vertx.eventBus().consumer<JsonObject>("eventname").coroutineHandler(context) {
it.doSomethingAndReply()
}
or even shorter as an extension function of EventBus
:
fun <T> EventBus.consumer(context: Context, address: String, coroutineHandler: suspend (Message<T>) -> Unit): MessageConsumer<T> {
return consumer(address) {
CoroutineScope(context.dispatcher()).launch {
coroutineHandler(it)
}
}
}
where the caller side would look like this:
vertx.eventBus().consumer<JsonObject>(context, "eventname") {
it.doSomethingAndReply()
}
In both examples fun Message<JsonObject>.doSomethingAndReply()
can be a suspending function.
Does this make sense? It works on my machine, but I am an absolute novice to Vert.x.
I'll take a look at this, probably implementing @sebhard suggestion.
thanks @AlexeySoshin