Flux breaks down on a conversion error and stops processing messages
Describe the issue If I provide a functional binding with a generic, and I receive a message which is not properly formatted, the application will stop reading. new messages. Based on this stackoverflow question: https://stackoverflow.com/q/66304153/2924784
To Reproduce Steps to reproduce the behavior: see the question for the relevant code
Expected behavior Should discard the improperly formatted message and process the following ones properly.
Would it be possible to drop a message to dlq on error? @olegz
As a workaround for now I use something like
@Bean
fun consumer() = Consumer<Message<String>> { msg ->
val header = msg.headers[AmqpHeaders.RECEIVED_ROUTING_KEY]
val payload = msg.payload
logger.info { "Received rabbit message $payload" }
when (header) {
"drop" -> Mono.error(IllegalArgumentException("should be dropped"))
"empty" -> Mono.empty()
else -> Mono.just("good")
}.block()
}
I countered the same problem. It will be nice if there is a way to do following: NACK the message if exception. after serveral retrying, still exception, put the message to DLQ and ACK the message.
This is a known and documented issue. There is nothing we can do since we have no control over the reactive stream. I know it could be confusing but in the case of reactive s-c-stream only acts as a facilitator to connect stream from binder to a stream defined by the user. That is happening during the startup. Once done s-c-stream plays no role or has any control over the stream processing. That is a far cry from the imperative model where functions are message handlers invoke by the s-c-stream on each arriving message.