vertx-lang-kotlin icon indicating copy to clipboard operation
vertx-lang-kotlin copied to clipboard

SetPeriodic and Eventbus do not have await suspending bindings

Open asad-awadia opened this issue 3 years ago • 4 comments

Version

3.9.1 as well as 4 milestone-5

Context

Vert.x Kotlin lang provides suspending await functions for most APIs - such as http/tcp server - filesystem etc.

but for set periodic or eventbus handlers - they are not marked as suspending functions - so I can't use the await bindings inside

Do you have a reproducer?

fun main() {
 val vertx = Vertx.vertx()
  vertx.setPeriodic(5000) {
       // can't use writeFileAwait in here 
        vertx.fileSystem().writeFile("./file.txt", "somedata".toBuffer()) {  }
    }
}

Same thing with vertx.executeBlockingAwait<JsonObject> { // can't call suspending functions inside here } I can't call suspending functions inside that handler

I know the eventbus can be converted to a channel but it would still be great if I could do something like https://github.com/vert-x3/vertx-examples/blob/master/web-examples/src/main/java/io/vertx/example/web/angular_realtime/Server.java#L45 and have a suspend function as the handler

I think it might be because the handler interface is not a suspend function and may require a lot of work?

asad-awadia avatar Jul 09 '20 21:07 asad-awadia

@vietj @jponge

asad-awadia avatar Jul 13 '20 18:07 asad-awadia

That's exactly what I was looking for! Maybe an extension function would help.

E.g. as an extension function of MessageConsumer:

fun <T> MessageConsumer<T>.coroutineHandler(context: Context, coroutineHandler: suspend (Message<T>) -> Unit) {
    handler {
        CoroutineScope(context.dispatcher()).launch {
            coroutineHandler(it)
        }
    }
}

where the caller side would look like this:

vertx.eventBus().consumer<JsonObject>("eventname").coroutineHandler(context) {
   it.doSomethingAndReply()
}

or even shorter as an extension function of EventBus:

fun <T> EventBus.consumer(context: Context, address: String, coroutineHandler: suspend (Message<T>) -> Unit): MessageConsumer<T> {
    return consumer(address) {
        CoroutineScope(context.dispatcher()).launch {
            coroutineHandler(it)
        }
    }
}

where the caller side would look like this:

vertx.eventBus().consumer<JsonObject>(context, "eventname") {
   it.doSomethingAndReply()
}

In both examples fun Message<JsonObject>.doSomethingAndReply() can be a suspending function.

Does this make sense? It works on my machine, but I am an absolute novice to Vert.x.

sebastianharder avatar Jul 26 '20 09:07 sebastianharder

I'll take a look at this, probably implementing @sebhard suggestion.

AlexeySoshin avatar Oct 26 '20 14:10 AlexeySoshin

thanks @AlexeySoshin

vietj avatar Oct 27 '20 08:10 vietj