vertx-lang-kotlin icon indicating copy to clipboard operation
vertx-lang-kotlin copied to clipboard

Suspending EventBus delegate

Open AlexeySoshin opened this issue 3 years ago • 4 comments

Motivation: Following #171 this is my suggestion to have a coroutine-aware EventBus

  • Having CoroutineEventBus that delegates to underlying EventBus for non-suspending cases, and implements suspending cases
  • Having CoroutineMessageConsumer that does the same for message consumption

AlexeySoshin avatar Oct 28 '20 14:10 AlexeySoshin

MessageConsumer is a ReadStream<Message>, isn't that sufficient ?

vietj avatar Oct 28 '20 17:10 vietj

It could work, but I would say it doesn't have the same level of ergonomics. Instead of

    bus.consumer<String>("some-address") {
      delay(10)
    }

It will be something like:

    bus.consumer<String>("some-address").toChannel(vertx).consumeEach {
      delay(10)
    }

And this will have to be done inside a suspending function, since consumeEach is suspending.

Not the end of the world, for sure.

AlexeySoshin avatar Oct 29 '20 16:10 AlexeySoshin

Also I think if we do the channel approach the code gets 'stuck' inside the consume

with the first approach i just want it to attach the suspending handler and then move onto the code below it

asad-awadia avatar Apr 06 '21 19:04 asad-awadia

Hi, did anything happen in meanwhile, since using code like this doesn't leverage kotlin coroutines:

  private suspend fun listenForSaveBankAccount() {

    vertx.eventBus().localConsumer<JsonObject>(Address.BANK_ACCOUNT_SAVE) { msg ->
      val json = msg.body()

      val username = json.getString("username")
      val bankAccount = json.getJsonObject("bankAccount")

      sqlClient.preparedQuery("select * from users where username = $1")
        .execute(Tuple.of(username))
        .compose { Future.succeededFuture(it.first().getInteger(1)) }
        .onFailure { msg.fail(FAILURE_CODE_GENERAL, "Could not resolve user by username $username") }
        .flatMap { userId ->
          sqlClient.preparedQuery("select count(*) from bank_accounts where user_id = $1")
            .execute(Tuple.of(userId))
            .map { it.first().getInteger(1) }
        }
        .flatMap { totalUserBankAccounts ->
          val maybeCode: String? = bankAccount.getString("code")
          if (!maybeCode.isNullOrBlank()) {

          } else {
            
          }
        }
    }
  }

dodalovic avatar May 22 '22 09:05 dodalovic