spring-cloud-stream
spring-cloud-stream copied to clipboard
Better error handling support for reactive functions
I'm using the new functional approach with RabbitMQ and strive for fully reactive message handling. When using a reactive consumer, any error will break the connection. What I'd like instead is, if any exception/Flux.error() happens, the RabbitMQ should retry/publish message to dlq.
Considering the following example:
@Bean fun addAccount(dispatcher: ...): Consumer<Flux<Message<String>>> = Consumer { it.flatMap { mono { dispatcher.handle(it) } } .onErrorContinue { ex, _ -> StaticMessageHeaderAccessor.getAcknowledgmentCallback(...)?.noAutoAck() }.subscribe() }
I understand, that I have to handle errors manually, maybe sending a manual ack/nack or requeue requests. When using non reactive logic this is provided out of the box. It would be nice if I could achieve the same behavior with an error handler in the reactive chain. The above example doesn't work as there is no ACKNOWLEDGMENT_CALLBACK in the message (that's added in non reactive case).
Any solution that achieves "retry on error" behaviour would be nice.
So, here is the issue and you kind of eluded to it with your comment When using non reactive logic this is provided out of the box. . .. This is really the core of the problem.
The reason why we can do it in non-reactive way is because we own the stream, since every step in the stream is part of some implementation of either spring-cloud-stream/function or spring-integration frameworks. And the unit of work in imperative approach is a single Message, so your function is invoked every time there is a message. (In reactive terms think of function invocation as your implementation of flux.map(x -> y))
The reason why we can NOT do the same in reactive way is because we DP NOT own the stream. On top of that the unit of work is actually the entire stream (not a single message), so your function is only invoked once during the initialization to connect your stream definition with the source/target. Once connected we're pretty much wipe our hands clean as there is really nothing we can do to influence or interfere.
We tried before at the expense of enforcing Message as unit of work, but that resulted in may issues with users not being able to use any of the reactive stateful patterns such as merge, aggregate, buffer etc.
Looking at your code I see you're not using any of the stateful reactive patterns, so perhaps you can simply switch to non-reactive approach and by doing so make things simpler.
We will look into a possibility of adding AcknowledgmentCallback, but I'll admit it is not of the highest priority
I've also transfered it to s-c-stream
Hi @olegz thanks for the reply. What I want to achieve is using suspended methods in the message handler and send ack/nack (or use the automatic error handling) based on the outcome of the handler. The java equivalent would be "let me use a mono from the message handler" one per message and let the framework handle the possible error/acknowledge once the mono ran.
Maybe this is already provided in some way, it's just not obvious to me how. I don't necessarily want to handle the whole stream, just make sure that the message gets acked/nacked asynchronously at the end of the handler, without blocking. Is there already an approach for this?
@olegz what I still don't understand is why there coudln't be something like this:
Function<Message<String>, Mono<String>>
The point is: You run the provided function EACH time you receive a message, run some non blocking logic, that uses the Message<String> as the input and provides a Mono<String> as an output (one per message).
- If the output mono is empty, do not send any message.
- If the output mono is not empty, send the message once the non blocking handler logic ran
- If the output mono is Mono.error() use the existing error handling mechanisms built into the framework (automatic retries, etc), like it would be in the blocking situation.
We already do all that and more for non-reactive functions. So all you need is to change your function to look like this Function<Message<String>, String>
Also, yes it is possible to do all that for your case, but only for that case since it is still an imperative function and all you are asking is to check if function's output is reactive then do one of those things you describe. While I am open to that it is certainly not on any priority lists, especially since you can change the function to the above. So if anyone volunteers with the PR we can review.
But this issue is more generic and is about general approach for error handling for reactive functions and for that there is nothing we can do.
Hi @olegz
If I change the signature to Function<Message<String>, String> I'll have to block my handling logic at the last step to check if the handler logic was successful and the framework can ACK, or not successful and should retry.
What I need that we send this value in a non blocking way. When the handler returns a value (let's say from a reactive db layer) then do something, not block and wait until the reactive db layer returns something.
Maybe I'm missing something here but I don't see how I can solve this with a simple Function<Message<String>, String>
Might be worth adding it to the doc for the time being if the expected behaviour is that the error handling described in the docs does not apply for reactive cases. Currently, to me, the docs read as if there is no significant difference when using a reactive or imperative function, but in terms of error handling it seems to be quite different. It took me quite a bit to find this issue. 😅
If it was possible to declare a function as Function<T1, Mono<T2>> that has the same error handling semantics as imperative functions, that'd probably be a good solution for most developers.
Hi @olegz,
Any update on this issue ? It seems that at the moment there is still only 2 choices, either use the reactive approach to deal with the whole stream and loose and lot of message processing features, on the imperative one and loose all the benefits of the non blocking approach of your application code. It would be great to have access to a reactive message handler like the posters before me defined Function<T1, Mono<T2>>. Is this one the roadmap ?
Cheers
As I explained before, unlike with imperative approach, we do not have any view into the reactive stream. The whole stream is the unit if work from our standpoint so you must rely on the reactive error handling API.
With regard to Function<T1, Mono<T2>> we can certainly consider it. Not sure how mu it will buy you since IMHO it will be basically a reactive way of writing imperative function
Hi @olegz, I completely understand the fact that the framework has no view of the reactive stream in this mode. I think it will buy us a lot as it will allow for a reactive processing message by message. Lost of spring application now are reactive all the way so if you have to call a REST API or a DB during your message processing, all those interactions will be reactive and return reactive types. Having to block at the end of the processor, you loose all the benefits of it. I believe this a pretty common use case where you don't need to run any reactive commands on the whole stream (merge, window ...) but still want to process each message individually in a reactive and non blocking manner.
Does it make sense ?
Not completely related but it would be great for the docs to provide an example of how to do error handling with reactive function even if it's up to the application to handle it. The docs specify correctly that the current error handling section only applies to imperative handlers but provide no further information on the best way to do it in the reactive world :)
Any updates on this? I currently see it as a big limitation of SCS. I, again and again, encounter cases where I do not want to lose all the good stuff of imperatives handlers and need a non-blocking way to process each message.
@olegz, regarding your comment:
Not sure how mu it will buy you since IMHO it will be basically a reactive way of writing imperative function
Consider the super-simple example where you need to save each message into MongoDB. You want to use a non-blocking stack all the way up to the persistence layer, hence you use a reactive MongoDB client. And here you face a dilemma:
- You either use an imperative message handler and call
ReactiveCrudRepository.save().block()inside. Which neglects the point of using reactive APIs in the first place. - You declare the
Function<Flux<T>, Mono<Void>>handler. But you now have to add logic for retries, DLQ, etc. - and you definitely don't want that, since you simply want to store messages in the DB. Also,StreamBridgedoes not provide a reactive API and you cannot send a message to DLQ in a non-blocking way. That seems odd too.
If SCS would allow handlers like Function<T, Mono<T>> or Function<T, Mono<Void>> and treat them as a "reactive handler that is invoked for each message" - that would solve the problem.
I am not familiar with the inner workings of the SCS but it doesn't seem to be difficult to implement, doesn't it? At least on the conceptual level.
I believe that for imperative handlers there is a place inside SCS that does something like this (hypothetical pseudocode below):
...
.doOnNext(userProvidedImperativeHandler.apply(message))
.retry(someSCSRetryPolicy)
.onErrorResume(someSCSLogicForSendingToDLQ)
...
Which with the introduction of the Function<T, Mono<Void> handler will change the doOnNext operator to flatMap.
Is my reasoning flawed here? @olegz I would appreciate your view on this. Thanks!
This has been discussed many times. The framework (s-c-stream) has no control over reactive stream processing. We only act as facilitators to connect binder stream with user provided stream. This happens during the startup after which s-c-stream plays no roles or has any control. Further more we are dealing with non-reactive binders so there is really no benefit (other than the API itself) to even write reactive functions. I am going to mark this as ideal-for-contribution and if someone is interested in contributing I would be eager to review the PR.