pulsar-client-reactive icon indicating copy to clipboard operation
pulsar-client-reactive copied to clipboard

#210 ReactiveMessagePipline allowing negative ack

Open vroyer opened this issue 8 months ago • 4 comments

Here is a proposal to provide a ReactiveMessagePipeline implementation allowing to negatively ack messages and provide a RedliveryBackoff to the ReactiveMessageSender. Thus, you can really benefit from Pulsar redelivery support.

vroyer avatar May 05 '25 15:05 vroyer

Hi Lari,

With the current implementation, the only way to nack for some business reason is to throw a RuntimeException and the retry is managed by reactor (see pipelineRetrySpec). In this case, the pulsar DeliveryBackoff is ineffective and this is the reason why I went further with the second commit.

The pipeline retry should occurs when the handler unexpectedly fails, not when you need to nack a message for some business logic and rely on the pulsar redelivery to reprocess the message. To avoid a breaking change with the current auto-acknowledged messageHandler, the best solution seems to introduce another "messageHandler2" returning a MessageResult where you can ack or nack messages depending on your use case ?

Regards, Vincent.

vroyer avatar May 05 '25 22:05 vroyer

With the current implementation, the only way to nack for some business reason is to throw a RuntimeException and the retry is managed by reactor (see pipelineRetrySpec). In this case, the pulsar DeliveryBackoff is ineffective and this is the reason why I went further with the second commit.

That's a bug in the current implementation.

The pipeline retry should occurs when the handler unexpectedly fails, not when you need to nack a message for some business logic and rely on the pulsar redelivery to reprocess the message.

Yes, that's a bug and it can be fixed without interface changes. The intention has been that when the Publisher signals an error, the message would be nacked. Do you see any problem in your use case with that?

One of the bugs in the existing framework code for handling the message handler is the lack of using Mono.defer. The result of this is that errors bubble up to the "pipeline" and that's why the pipeline retry kicks in. At least this is my assumption of the bug without validating it. We don't currently have a test case to ensure the correct behavior and the documentation of the expected behavior is also missing.

lhotari avatar May 06 '25 18:05 lhotari

With the current implementation, the only way to nack for some business reason is to throw a RuntimeException and the retry is managed by reactor (see pipelineRetrySpec). In this case, the pulsar DeliveryBackoff is ineffective and this is the reason why I went further with the second commit.

That's a bug in the current implementation.

The pipeline retry should occurs when the handler unexpectedly fails, not when you need to nack a message for some business logic and rely on the pulsar redelivery to reprocess the message.

Yes, that's a bug and it can be fixed without interface changes. The intention has been that when the Publisher signals an error, the message would be nacked. Do you see any problem in your use case with that?

I just wonder why you don't have the same approche for both the consumer and the pipeline. The consumer returns a MessageResult to ack/nack, while the pipeline signal an error. In the last case, you need to carefully manage Publisher errors and PulsarClientException (due to connection issues) to not nack because of an ack issue.

One of the bugs in the existing framework code for handling the message handler is the lack of using Mono.defer. The result of this is that errors bubble up to the "pipeline" and that's why the pipeline retry kicks in. At least this is my assumption of the bug without validating it. We don't currently have a test case to ensure the correct behavior and the documentation of the expected behavior is also missing.

I can't answer to this assumption. According to my tests with the proposed PR, the pipeline retry properly manages pulsar reconnection.

vroyer avatar May 07 '25 10:05 vroyer

I just wonder why you don't have the same approche for both the consumer and the pipeline. The consumer returns a MessageResult to ack/nack, while the pipeline signal an error. In the last case, you need to carefully manage Publisher errors and PulsarClientException (due to connection issues) to not nack because of an ack issue.

The original purpose of the messageHandler is to use acknowledge the message when the subscription to the publisher completes successfully and to nack when it errors. We should fix that bug first before adding another interface. The reason why the interface is designed like this is that the user doesn't misuse the API and return acknowledgements for messages that were received earlier. For pipelining, it's necessary to rely on the framework provided options or use the streamingMessageHandler. I don't currently see why there would be a need to complicate the interface since a publisher can properly signal the acknowledgement and negative acknowledgement of a message. Obviously the bugs in the current implementation should be fixed so that this would work properly.

One of the bugs in the existing framework code for handling the message handler is the lack of using Mono.defer. The result of this is that errors bubble up to the "pipeline" and that's why the pipeline retry kicks in. At least this is my assumption of the bug without validating it. We don't currently have a test case to ensure the correct behavior and the documentation of the expected behavior is also missing.

I can't answer to this assumption. According to my tests with the proposed PR, the pipeline retry properly manages pulsar reconnection.

We'll find out with test cases.

lhotari avatar May 07 '25 11:05 lhotari

Ok, you can just pick up the first commit for the DeliveryBackoff and fix the pipeline to nack without making a retry unless there is a PulsarClientException. That will be also fine.

vroyer avatar May 07 '25 16:05 vroyer

Ok, you can just pick up the first commit for the DeliveryBackoff and fix the pipeline to nack without making a retry unless there is a PulsarClientException. That will be also fine.

@vroyer There's a fix for the negative acknowledgement bug in #213. I'll close this PR. Please reopen a separate PR for the RedeliveryBackoff changes.

lhotari avatar May 19 '25 07:05 lhotari