smallrye-reactive-messaging icon indicating copy to clipboard operation
smallrye-reactive-messaging copied to clipboard

Add multi request to `KafkaRequestReply`

Open Malandril opened this issue 1 year ago • 2 comments

Purpose

This PR adds a new request type to the KafkaRequestReply so that we can receive multiple replies to a request. This can be useful when you want to send a request, for which multiple consumer groups can answer, or a some service needs to answer with multiple messages. That was not supported with the Uni<Rep> request.

Implementation

This is a first idea to add the feature, in which the KafkaRequestReplyImpl is modified to use a Multi as a base for all operations instead of the Uni. If you think this should be a separate, class or if you have any other idea, do not hesitate.

This adds two new methods requestMulti that returns a Multi<Rep>. I had no idea, for the method names, and we can discuss a better one. The reply.timeout is applied between each reply, and the ReplyFailureHandler is called for each reply, so one failure on a reply will fail the whole operation. I didn't write the doc yet.

If you have any input feel free.

Malandril avatar Sep 23 '24 18:09 Malandril

That's a very interesting idea. I'll look later today!

ozangunalp avatar Sep 24 '24 08:09 ozangunalp

@Malandril Thanks for the tests as well! I've pushed small changes, and exposed a way to complete a pending request.

The upstream consumer request handling is slightly different, but I am unsure of the impact. It was designed to be able to pause the consumer when there aren't any pending requests.

ozangunalp avatar Sep 24 '24 17:09 ozangunalp

Hello @ozangunalp is there any reason this wasn't merged ? If there is any issue i can try to fix them.

Malandril avatar Jan 22 '25 18:01 Malandril

Sorry, I need to get back to this. I don't remember if I pushed the refactoring I did.

I remember there was an issue with the strategy to when to complete the returned Multi. I am thinking of a handler to identify the completion message.

wdyt ?

ozangunalp avatar Jan 23 '25 11:01 ozangunalp

I don't really understand, you mean somethig like a predicate, to the method, so that the user can configure a completion condition ? That would be passed to the request ?

public Multi<Message<Rep>> requestMulti(Message<Req> request, Predicate<Multi<Message<Rep>>> completionConditon);

And when the predicate returns true, the multi is completed ?

Malandril avatar Jan 23 '25 21:01 Malandril

What I was thinking can easily be done using Mutiny API, requestMulti(reqMessage).select().first(reply -> isNotCompletion(reply))

ozangunalp avatar Jan 24 '25 15:01 ozangunalp

I added a custom timeout exception and rebased/squashed some commits.

If all tests are green I think it is good to go.

Thank you @Malandril for your contribution and patience!

ozangunalp avatar Jan 24 '25 15:01 ozangunalp

Thanks for the feedbacks and fixes !

Malandril avatar Jan 25 '25 11:01 Malandril