spring-cloud-stream icon indicating copy to clipboard operation
spring-cloud-stream copied to clipboard

Flux breaks down on a conversion error and stops processing messages

Open rabiori opened this issue 4 years ago • 3 comments

Describe the issue If I provide a functional binding with a generic, and I receive a message which is not properly formatted, the application will stop reading. new messages. Based on this stackoverflow question: https://stackoverflow.com/q/66304153/2924784

To Reproduce Steps to reproduce the behavior: see the question for the relevant code

Expected behavior Should discard the improperly formatted message and process the following ones properly.

rabiori avatar Mar 01 '21 11:03 rabiori

Would it be possible to drop a message to dlq on error? @olegz

s4got10dev avatar May 05 '21 13:05 s4got10dev

As a workaround for now I use something like

    @Bean
    fun consumer() = Consumer<Message<String>> { msg ->
        val header = msg.headers[AmqpHeaders.RECEIVED_ROUTING_KEY]
        val payload = msg.payload
        logger.info { "Received rabbit message $payload" }
        when (header) {
            "drop" -> Mono.error(IllegalArgumentException("should be dropped"))
            "empty" -> Mono.empty()
            else -> Mono.just("good")
        }.block()
    }

s4got10dev avatar May 05 '21 13:05 s4got10dev

I countered the same problem. It will be nice if there is a way to do following: NACK the message if exception. after serveral retrying, still exception, put the message to DLQ and ACK the message.

ArayChou avatar Aug 25 '22 07:08 ArayChou

This is a known and documented issue. There is nothing we can do since we have no control over the reactive stream. I know it could be confusing but in the case of reactive s-c-stream only acts as a facilitator to connect stream from binder to a stream defined by the user. That is happening during the startup. Once done s-c-stream plays no role or has any control over the stream processing. That is a far cry from the imperative model where functions are message handlers invoke by the s-c-stream on each arriving message.

olegz avatar Jan 04 '23 18:01 olegz