Add multi request to `KafkaRequestReply`
Purpose
This PR adds a new request type to the KafkaRequestReply so that we can receive multiple replies to a request.
This can be useful when you want to send a request, for which multiple consumer groups can answer, or a some service needs to answer with multiple messages.
That was not supported with the Uni<Rep> request.
Implementation
This is a first idea to add the feature, in which the KafkaRequestReplyImpl is modified to use a Multi as a base for all operations instead of the Uni.
If you think this should be a separate, class or if you have any other idea, do not hesitate.
This adds two new methods requestMulti that returns a Multi<Rep>.
I had no idea, for the method names, and we can discuss a better one.
The reply.timeout is applied between each reply, and the ReplyFailureHandler is called for each reply, so one failure on a reply will fail the whole operation.
I didn't write the doc yet.
If you have any input feel free.
That's a very interesting idea. I'll look later today!
@Malandril Thanks for the tests as well! I've pushed small changes, and exposed a way to complete a pending request.
The upstream consumer request handling is slightly different, but I am unsure of the impact. It was designed to be able to pause the consumer when there aren't any pending requests.
Hello @ozangunalp is there any reason this wasn't merged ? If there is any issue i can try to fix them.
Sorry, I need to get back to this. I don't remember if I pushed the refactoring I did.
I remember there was an issue with the strategy to when to complete the returned Multi. I am thinking of a handler to identify the completion message.
wdyt ?
I don't really understand, you mean somethig like a predicate, to the method, so that the user can configure a completion condition ? That would be passed to the request ?
public Multi<Message<Rep>> requestMulti(Message<Req> request, Predicate<Multi<Message<Rep>>> completionConditon);
And when the predicate returns true, the multi is completed ?
What I was thinking can easily be done using Mutiny API,
requestMulti(reqMessage).select().first(reply -> isNotCompletion(reply))
I added a custom timeout exception and rebased/squashed some commits.
If all tests are green I think it is good to go.
Thank you @Malandril for your contribution and patience!
Thanks for the feedbacks and fixes !