smallrye-reactive-messaging
smallrye-reactive-messaging copied to clipboard
RabbitMQ - BackPressureFailure: Buffer full
I am regulary getting BackPressureFailures from my RabbitMQ consumer in my Quarkus (2.12.2-Final, Java 17) application.
io.smallrye.mutiny.subscription.BackPressureFailure: Buffer full, cannot emit item
at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.failOverflow(MultiFlatMapOp.java:541)
at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.tryEmit(MultiFlatMapOp.java:243)
at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapInner.onItem(MultiFlatMapOp.java:607)
at io.smallrye.mutiny.operators.multi.MultiMapOp$MapProcessor.onItem(MultiMapOp.java:50)
at io.smallrye.mutiny.operators.multi.MultiMapOp$MapProcessor.onItem(MultiMapOp.java:50)
at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
at io.smallrye.mutiny.vertx.MultiReadStream.lambda$subscribe$2(MultiReadStream.java:77)
at io.vertx.rabbitmq.impl.RabbitMQConsumerImpl.lambda$handler$0(RabbitMQConsumerImpl.java:59)
at io.vertx.core.streams.impl.InboundBuffer.handleEvent(InboundBuffer.java:239)
at io.vertx.core.streams.impl.InboundBuffer.write(InboundBuffer.java:129)
at io.vertx.rabbitmq.impl.RabbitMQConsumerImpl.handleMessage(RabbitMQConsumerImpl.java:150)
at io.vertx.rabbitmq.impl.QueueConsumerHandler.lambda$handleDelivery$0(QueueConsumerHandler.java:39)
at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:264)
at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:246)
at io.vertx.core.impl.EventLoopContext.lambda$runOnContext$0(EventLoopContext.java:43)
at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:503)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:833)
at com.oracle.svm.core.thread.JavaThreads.threadStartRoutine(JavaThreads.java:596)
at com.oracle.svm.core.posix.thread.PosixJavaThreads.pthreadStartRoutine(PosixJavaThreads.java:192)
The weird thing is that I have configured max-outstanding-messages, so in my understanding it should be impossible to get this error.
mp:
messaging:
incoming:
new-message:
automatic-recovery-enabled: true
max-outstanding-messages: 2
exchange:
name: x
type: topic
queue:
name: x
routing-keys:
- x.#
Here's the code that handles messages. consume contains a catch-all statement.
@Incoming("new-event")
@Blocking
public CompletionStage<Void> newMessage(Message<?> message) {
var meta = message.getMetadata(IncomingRabbitMQMetadata.class);
meta.ifPresent(amqp -> {
var key = amqp.getRoutingKey();
LOG.info("Message {} received", key);
consume(key, amqp.getHeaders());
});
return message.ack();
}
Any ideas how to approach this?
Was there any progress with this? I'm also seeing the same error... only restarting the app helps.
also
I'm getting this exception too! And @samodadela is right. It only starts working again after a restart.
I have the exact problem.
same problem over here, any news on this one?
Repro:
- Fill source queue with messages
- Start consumer
- Terminate incoming connection using rmq management UI
Consumer will try to recover: it manages to restore connection, channel and basicConsumer but it fails to resume actual consuming:
72
2023-02-14 14:55:50,835 INFO [io.sma.rea.mes.rabbitmq] (vert.x-worker-thread-1) SRMSG17033: A message sent to channel `source` has been ack'd
2023-02-14 14:55:51,298 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key
2023-02-14 14:55:51,797 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key
73
2023-02-14 14:55:51,840 INFO [io.sma.rea.mes.rabbitmq] (vert.x-worker-thread-1) SRMSG17033: A message sent to channel `source` has been ack'd
2023-02-14 14:55:52,295 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key
2023-02-14 14:55:52,799 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key
74
2023-02-14 14:55:52,845 INFO [io.sma.rea.mes.rabbitmq] (vert.x-worker-thread-1) SRMSG17033: A message sent to channel `source` has been ack'd
2023-02-14 14:55:53,298 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key
2023-02-14 14:55:53,797 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key
75
2023-02-14 14:55:53,851 INFO [io.sma.rea.mes.rabbitmq] (vert.x-worker-thread-1) SRMSG17033: A message sent to channel `source` has been ack'd
2023-02-14 14:55:54,252 DEBUG [io.ver.rab.imp.QueueConsumerHandler] (pool-11-thread-9) consumer has been shutdown unexpectedly: amq.ctag-Cj8_-cjNhLcv9Y1UV2N0hA
2023-02-14 14:55:54,254 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (pool-11-thread-9) Start to reconnect...
2023-02-14 14:55:54,255 INFO [io.ver.rab.imp.RabbitMQClientImpl] (pool-11-thread-9) Stopping rabbitmq client
2023-02-14 14:55:54,255 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-0) Disconnecting from rabbitmq...
2023-02-14 14:55:54,258 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-0) Already disconnected from rabbitmq !
2023-02-14 14:55:54,270 WARN [com.rab.cli.imp.ForgivingExceptionHandler] (AMQP Connection 127.0.0.1:51806) An unexpected connection driver error occured (Exception message: Socket closed)
2023-02-14 14:55:54,271 INFO [io.ver.rab.imp.RabbitMQClientImpl] (AMQP Connection 127.0.0.1:51806) RabbitMQ connection shutdown! The client will attempt to reconnect automatically: com.rabbitmq.client.ShutdownSignalException: connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - Closed via management plugin, class-id=0, method-id=0)
at com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:985)
at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:975)
at com.rabbitmq.client.impl.AMQConnection.handleConnectionClose(AMQConnection.java:913)
at com.rabbitmq.client.impl.AMQConnection.processControlCommand(AMQConnection.java:868)
at com.rabbitmq.client.impl.AMQConnection$1.processAsync(AMQConnection.java:265)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:182)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:114)
at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:719)
at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:666)
at java.base/java.lang.Thread.run(Thread.java:833)
2023-02-14 14:55:54,271 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (AMQP Connection 127.0.0.1:51806) Other consumers or producers are reconnecting. Continue to wait for reconnection
2023-02-14 14:55:54,271 INFO [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-eventloop-thread-2) Starting rabbitmq client
2023-02-14 14:55:54,271 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-0) Connecting to rabbitmq...
2023-02-14 14:55:54,284 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-0) Connected to rabbitmq !
2023-02-14 14:55:54,284 DEBUG [io.sma.rea.mes.rabbitmq] (vert.x-eventloop-thread-2) SRMSG17023: Established exchange `generator`
2023-02-14 14:55:54,285 DEBUG [io.sma.rea.mes.rabbitmq] (vert.x-eventloop-thread-2) SRMSG17025: Established queue `source`
2023-02-14 14:55:54,286 DEBUG [io.sma.rea.mes.rabbitmq] (vert.x-eventloop-thread-2) SRMSG17027: Established binding of queue `source` to exchange 'generator' using routing key '#'
2023-02-14 14:55:54,286 INFO [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-eventloop-thread-2) Successed to restart client.
2023-02-14 14:55:54,288 INFO [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-0) Reconsume queue: source success
2023-02-14 14:55:54,288 DEBUG [io.ver.rab.imp.QueueConsumerHandler] (pool-14-thread-3) Consumer tag is now amq.ctag-3Y2J11Tsn5iddvjqU1n3OQ
2023-02-14 14:55:54,290 ERROR [io.sma.rea.mes.provider] (vert.x-eventloop-thread-0) SRMSG00201: Error caught while processing a message in method org.acme.Main#process: io.smallrye.mutiny.subscription.BackPressureFailure: Buffer full, cannot emit item
at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.failOverflow(MultiFlatMapOp.java:541)
at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.tryEmit(MultiFlatMapOp.java:243)
at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapInner.onItem(MultiFlatMapOp.java:607)
at io.smallrye.mutiny.operators.multi.MultiMapOp$MapProcessor.onItem(MultiMapOp.java:50)
at io.smallrye.mutiny.operators.multi.MultiMapOp$MapProcessor.onItem(MultiMapOp.java:50)
at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
at io.smallrye.mutiny.vertx.MultiReadStream.lambda$subscribe$2(MultiReadStream.java:77)
at io.vertx.rabbitmq.impl.RabbitMQConsumerImpl.lambda$handler$0(RabbitMQConsumerImpl.java:59)
at io.vertx.core.streams.impl.InboundBuffer.handleEvent(InboundBuffer.java:239)
at io.vertx.core.streams.impl.InboundBuffer.drain(InboundBuffer.java:226)
at io.vertx.core.streams.impl.InboundBuffer.lambda$fetch$0(InboundBuffer.java:279)
at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:264)
at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:246)
at io.vertx.core.impl.EventLoopContext.lambda$runOnContext$0(EventLoopContext.java:43)
at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:833)
2023-02-14 14:55:54,290 ERROR [io.sma.rea.mes.provider] (vert.x-eventloop-thread-0) SRMSG00201: Error caught while processing a message in method org.acme.Main#process: io.smallrye.mutiny.subscription.BackPressureFailure: Buffer full, cannot emit item
at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.failOverflow(MultiFlatMapOp.java:541)
at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.tryEmit(MultiFlatMapOp.java:243)
at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapInner.onItem(MultiFlatMapOp.java:607)
at io.smallrye.mutiny.operators.multi.MultiMapOp$MapProcessor.onItem(MultiMapOp.java:50)
at io.smallrye.mutiny.operators.multi.MultiMapOp$MapProcessor.onItem(MultiMapOp.java:50)
at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
at io.smallrye.mutiny.vertx.MultiReadStream.lambda$subscribe$2(MultiReadStream.java:77)
at io.vertx.rabbitmq.impl.RabbitMQConsumerImpl.lambda$handler$0(RabbitMQConsumerImpl.java:59)
at io.vertx.core.streams.impl.InboundBuffer.handleEvent(InboundBuffer.java:239)
at io.vertx.core.streams.impl.InboundBuffer.drain(InboundBuffer.java:226)
at io.vertx.core.streams.impl.InboundBuffer.lambda$fetch$0(InboundBuffer.java:279)
at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:264)
at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:246)
at io.vertx.core.impl.EventLoopContext.lambda$runOnContext$0(EventLoopContext.java:43)
at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:833)
2023-02-14 14:55:54,295 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key
2023-02-14 14:55:54,799 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key
76
2023-02-14 14:55:54,857 INFO [io.sma.rea.mes.rabbitmq] (vert.x-worker-thread-1) SRMSG17033: A message sent to channel `source` has been ack'd
2023-02-14 14:55:55,298 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key
2023-02-14 14:55:55,799 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key
2023-02-14 14:55:56,299 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key
import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.annotations.Blocking;
import javax.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import java.time.Duration;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicInteger;
@ApplicationScoped
class Main {
@Outgoing("generator")
Multi<Integer> generator() {
AtomicInteger count = new AtomicInteger();
return Multi.createFrom().ticks().every(Duration.ofMillis(500))
.map(l -> count.incrementAndGet())
.onOverflow().drop();
}
@Incoming("source")
@Blocking
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
CompletionStage<Void> process(Message<String> in) throws InterruptedException {
final String payload = in.getPayload();
Thread.sleep(1000);
System.out.println(payload);
return in.ack();
}
}
mp.messaging.outgoing.generator.connector=smallrye-rabbitmq
mp.messaging.outgoing.generator.exchange.name=generator
mp.messaging.incoming.source.connector=smallrye-rabbitmq
mp.messaging.incoming.source.exchange.name=generator
mp.messaging.incoming.source.queue.name=source
mp.messaging.incoming.source.max-outstanding-messages=50
mp.messaging.incoming.source.automatic-recovery-on-initial-connection=false
mp.messaging.incoming.source.rabbitmq-reconnect-attempts=5000
mp.messaging.incoming.source.rabbitmq-reconnect-interval=2
However, given queue is empty, when connection is terminated, then consumer fully recovers and is able to process new messages.
Not sure if this is related but there is also an issue with qos (prefetch count) not getting reapplied after connection failure - consumer buffers all messages.
initial state:
<-- connection closed -->
after that:
@mjedwabn the qos
not being re-applied on re-connect is exactly what we are facing!
There is only one place in Microprofile Messaging where qos
gets set:
https://github.com/smallrye/smallrye-reactive-messaging/blob/3d0c49d6ed62824cfda22fc74c3b6816f58ba786/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java#L288
By looking at the underneath rabbitmq-vertx client, there are scenarios where new channel is spawned without applying qos
; in fact library never sets qos
by itself. So I did some ugly changes locally in rabbitmq-vertx client, built it and imported to demo project: qos is now effective, even after connection interrupted but it didn't help much with the Buffer full, cannot emit item
itself.
I think some message gets jammed during this failure/recovery process and never gets acked/pushed out from the buffer. Debug indicates that q
is an instance of SingletonQueue - queue holding a single element.
https://github.com/smallrye/smallrye-mutiny/blob/1.9.0/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiFlatMapOp.java#L242-L243
With max-outstanding-messages=1
it works, "kind of" - consuming is resumed but channel gets recreated every time and redeliveries can be observed:
payload: 93
2023-02-14 16:50:02,271 INFO [io.sma.rea.mes.rabbitmq] (vert.x-worker-thread-0) SRMSG17033: A message sent to channel `source` has been ack'd
2023-02-14 16:50:02,590 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key
2023-02-14 16:50:03,087 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key
payload: 94
2023-02-14 16:50:03,289 INFO [io.sma.rea.mes.rabbitmq] (vert.x-worker-thread-0) SRMSG17033: A message sent to channel `source` has been ack'd
2023-02-14 16:50:03,589 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key
2023-02-14 16:50:04,091 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key
payload: 95
2023-02-14 16:50:04,304 INFO [io.sma.rea.mes.rabbitmq] (vert.x-worker-thread-0) SRMSG17033: A message sent to channel `source` has been ack'd
2023-02-14 16:50:04,591 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key
2023-02-14 16:50:05,091 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key
payload: 96
2023-02-14 16:50:05,313 INFO [io.sma.rea.mes.rabbitmq] (vert.x-worker-thread-0) SRMSG17033: A message sent to channel `source` has been ack'd
connection closed somewhere here
2023-02-14 16:50:05,590 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key
2023-02-14 16:50:06,091 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key
payload: 97
2023-02-14 16:50:06,331 INFO [io.sma.rea.mes.rabbitmq] (vert.x-worker-thread-0) SRMSG17033: A message sent to channel `source` has been ack'd
2023-02-14 16:50:06,590 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key
2023-02-14 16:50:06,669 DEBUG [io.ver.rab.imp.QueueConsumerHandler] (pool-10-thread-12) consumer has been shutdown unexpectedly: amq.ctag-jCKAa3H0h7nbKS7OTYKBgw
2023-02-14 16:50:06,673 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (pool-10-thread-12) Start to reconnect...
2023-02-14 16:50:06,674 INFO [io.ver.rab.imp.RabbitMQClientImpl] (pool-10-thread-12) Stopping rabbitmq client
2023-02-14 16:50:06,675 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-1) Disconnecting from rabbitmq...
2023-02-14 16:50:06,688 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-1) Already disconnected from rabbitmq !
2023-02-14 16:50:06,692 WARN [com.rab.cli.imp.ForgivingExceptionHandler] (AMQP Connection 127.0.0.1:57763) An unexpected connection driver error occurred (Exception message: Socket closed)
2023-02-14 16:50:06,692 INFO [io.ver.rab.imp.RabbitMQClientImpl] (AMQP Connection 127.0.0.1:57763) RabbitMQ connection shutdown! The client will attempt to reconnect automatically: com.rabbitmq.client.ShutdownSignalException: connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - Closed via management plugin, class-id=0, method-id=0)
at com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:985)
at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:975)
at com.rabbitmq.client.impl.AMQConnection.handleConnectionClose(AMQConnection.java:913)
at com.rabbitmq.client.impl.AMQConnection.processControlCommand(AMQConnection.java:868)
at com.rabbitmq.client.impl.AMQConnection$1.processAsync(AMQConnection.java:265)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:182)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:114)
at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:719)
at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:666)
at java.base/java.lang.Thread.run(Thread.java:833)
2023-02-14 16:50:06,692 INFO [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-eventloop-thread-2) Starting rabbitmq client
2023-02-14 16:50:06,692 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-1) Connecting to rabbitmq...
2023-02-14 16:50:06,692 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (AMQP Connection 127.0.0.1:57763) Other consumers or producers are reconnecting. Continue to wait for reconnection
2023-02-14 16:50:06,699 DEBUG [com.rab.cli.imp.ConsumerWorkService] (vert.x-worker-thread-1) Creating executor service with 10 thread(s) for consumer work service
2023-02-14 16:50:06,706 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-1) Connected to rabbitmq !
2023-02-14 16:50:06,707 DEBUG [io.sma.rea.mes.rabbitmq] (vert.x-eventloop-thread-2) SRMSG17023: Established exchange `generator`
2023-02-14 16:50:06,708 DEBUG [io.sma.rea.mes.rabbitmq] (vert.x-eventloop-thread-2) SRMSG17025: Established queue `source`
2023-02-14 16:50:06,709 DEBUG [io.sma.rea.mes.rabbitmq] (vert.x-eventloop-thread-2) SRMSG17027: Established binding of queue `source` to exchange 'generator' using routing key '#'
2023-02-14 16:50:06,710 INFO [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-eventloop-thread-2) Successed to restart client.
2023-02-14 16:50:06,711 INFO [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-1) Reconsume queue: source success
2023-02-14 16:50:06,711 DEBUG [io.ver.rab.imp.QueueConsumerHandler] (pool-14-thread-3) Consumer tag is now amq.ctag-13_qKfB34BUX_thhJvML6w
2023-02-14 16:50:07,091 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key
payload: 98
2023-02-14 16:50:07,344 INFO [io.sma.rea.mes.rabbitmq] (vert.x-worker-thread-0) SRMSG17033: A message sent to channel `source` has been ack'd
2023-02-14 16:50:07,351 DEBUG [io.ver.rab.imp.QueueConsumerHandler] (pool-14-thread-4) consumer has been shutdown unexpectedly: amq.ctag-13_qKfB34BUX_thhJvML6w
2023-02-14 16:50:07,351 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (pool-14-thread-4) Start to reconnect...
2023-02-14 16:50:07,351 INFO [io.ver.rab.imp.RabbitMQClientImpl] (pool-14-thread-4) Stopping rabbitmq client
2023-02-14 16:50:07,353 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-1) Disconnecting from rabbitmq...
2023-02-14 16:50:07,358 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-1) Disconnected from rabbitmq !
2023-02-14 16:50:07,362 INFO [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-eventloop-thread-4) Starting rabbitmq client
2023-02-14 16:50:07,362 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-1) Connecting to rabbitmq...
2023-02-14 16:50:07,370 DEBUG [com.rab.cli.imp.ConsumerWorkService] (vert.x-worker-thread-1) Creating executor service with 10 thread(s) for consumer work service
2023-02-14 16:50:07,396 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-1) Connected to rabbitmq !
2023-02-14 16:50:07,397 DEBUG [io.sma.rea.mes.rabbitmq] (vert.x-eventloop-thread-4) SRMSG17023: Established exchange `generator`
2023-02-14 16:50:07,398 DEBUG [io.sma.rea.mes.rabbitmq] (vert.x-eventloop-thread-4) SRMSG17025: Established queue `source`
2023-02-14 16:50:07,399 DEBUG [io.sma.rea.mes.rabbitmq] (vert.x-eventloop-thread-4) SRMSG17027: Established binding of queue `source` to exchange 'generator' using routing key '#'
2023-02-14 16:50:07,399 INFO [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-eventloop-thread-4) Successed to restart client.
2023-02-14 16:50:07,400 INFO [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-1) Reconsume queue: source success
2023-02-14 16:50:07,400 DEBUG [io.ver.rab.imp.QueueConsumerHandler] (pool-16-thread-3) Consumer tag is now amq.ctag-wY2dSKXeUxu-RAWm1O8EIQ
2023-02-14 16:50:07,591 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key
2023-02-14 16:50:08,091 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key
payload: 98
2023-02-14 16:50:08,345 INFO [io.sma.rea.mes.rabbitmq] (vert.x-worker-thread-0) SRMSG17033: A message sent to channel `source` has been ack'd
2023-02-14 16:50:08,589 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key
2023-02-14 16:50:09,091 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key
payload: 98
2023-02-14 16:50:09,351 INFO [io.sma.rea.mes.rabbitmq] (vert.x-worker-thread-0) SRMSG17033: A message sent to channel `source` has been ack'd
2023-02-14 16:50:09,357 DEBUG [io.ver.rab.imp.QueueConsumerHandler] (pool-16-thread-5) consumer has been shutdown unexpectedly: amq.ctag-wY2dSKXeUxu-RAWm1O8EIQ
2023-02-14 16:50:09,357 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (pool-16-thread-5) Start to reconnect...
2023-02-14 16:50:09,357 INFO [io.ver.rab.imp.RabbitMQClientImpl] (pool-16-thread-5) Stopping rabbitmq client
2023-02-14 16:50:09,357 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-1) Disconnecting from rabbitmq...
2023-02-14 16:50:09,371 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-1) Disconnected from rabbitmq !
2023-02-14 16:50:09,371 INFO [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-eventloop-thread-5) Starting rabbitmq client
2023-02-14 16:50:09,371 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-1) Connecting to rabbitmq...
2023-02-14 16:50:09,378 DEBUG [com.rab.cli.imp.ConsumerWorkService] (vert.x-worker-thread-1) Creating executor service with 10 thread(s) for consumer work service
2023-02-14 16:50:09,386 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-1) Connected to rabbitmq !
2023-02-14 16:50:09,386 DEBUG [io.sma.rea.mes.rabbitmq] (vert.x-eventloop-thread-5) SRMSG17023: Established exchange `generator`
2023-02-14 16:50:09,388 DEBUG [io.sma.rea.mes.rabbitmq] (vert.x-eventloop-thread-5) SRMSG17025: Established queue `source`
2023-02-14 16:50:09,389 DEBUG [io.sma.rea.mes.rabbitmq] (vert.x-eventloop-thread-5) SRMSG17027: Established binding of queue `source` to exchange 'generator' using routing key '#'
2023-02-14 16:50:09,390 INFO [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-eventloop-thread-5) Successed to restart client.
2023-02-14 16:50:09,391 INFO [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-1) Reconsume queue: source success
2023-02-14 16:50:09,391 DEBUG [io.ver.rab.imp.QueueConsumerHandler] (pool-18-thread-3) Consumer tag is now amq.ctag-tMTiqriOqntlVPBZxT_RUA
2023-02-14 16:50:09,589 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key
2023-02-14 16:50:10,088 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key
payload: 99
2023-02-14 16:50:10,358 INFO [io.sma.rea.mes.rabbitmq] (vert.x-worker-thread-0) SRMSG17033: A message sent to channel `source` has been ack'd
2023-02-14 16:50:10,364 DEBUG [io.ver.rab.imp.QueueConsumerHandler] (pool-18-thread-4) consumer has been shutdown unexpectedly: amq.ctag-tMTiqriOqntlVPBZxT_RUA
2023-02-14 16:50:10,364 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (pool-18-thread-4) Start to reconnect...
2023-02-14 16:50:10,364 INFO [io.ver.rab.imp.RabbitMQClientImpl] (pool-18-thread-4) Stopping rabbitmq client
2023-02-14 16:50:10,365 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-1) Disconnecting from rabbitmq...
2023-02-14 16:50:10,379 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-1) Disconnected from rabbitmq !
2023-02-14 16:50:10,379 INFO [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-eventloop-thread-6) Starting rabbitmq client
2023-02-14 16:50:10,379 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-1) Connecting to rabbitmq...
2023-02-14 16:50:10,387 DEBUG [com.rab.cli.imp.ConsumerWorkService] (vert.x-worker-thread-1) Creating executor service with 10 thread(s) for consumer work service
2023-02-14 16:50:10,396 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-1) Connected to rabbitmq !
2023-02-14 16:50:10,397 DEBUG [io.sma.rea.mes.rabbitmq] (vert.x-eventloop-thread-6) SRMSG17023: Established exchange `generator`
2023-02-14 16:50:10,399 DEBUG [io.sma.rea.mes.rabbitmq] (vert.x-eventloop-thread-6) SRMSG17025: Established queue `source`
2023-02-14 16:50:10,402 DEBUG [io.sma.rea.mes.rabbitmq] (vert.x-eventloop-thread-6) SRMSG17027: Established binding of queue `source` to exchange 'generator' using routing key '#'
2023-02-14 16:50:10,403 INFO [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-eventloop-thread-6) Successed to restart client.
2023-02-14 16:50:10,405 INFO [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-1) Reconsume queue: source success
2023-02-14 16:50:10,405 DEBUG [io.ver.rab.imp.QueueConsumerHandler] (pool-20-thread-3) Consumer tag is now amq.ctag-1gtZmBKUhqlMPbqE4aAZnQ
2023-02-14 16:50:10,588 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key
2023-02-14 16:50:11,091 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key
payload: 99
2023-02-14 16:50:11,361 INFO [io.sma.rea.mes.rabbitmq] (vert.x-worker-thread-0) SRMSG17033: A message sent to channel `source` has been ack'd
2023-02-14 16:50:11,591 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key
2023-02-14 16:50:12,091 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key
payload: 99
2023-02-14 16:50:12,368 INFO [io.sma.rea.mes.rabbitmq] (vert.x-worker-thread-0) SRMSG17033: A message sent to channel `source` has been ack'd
2023-02-14 16:50:12,376 DEBUG [io.ver.rab.imp.QueueConsumerHandler] (pool-20-thread-5) consumer has been shutdown unexpectedly: amq.ctag-1gtZmBKUhqlMPbqE4aAZnQ
2023-02-14 16:50:12,376 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (pool-20-thread-5) Start to reconnect...
2023-02-14 16:50:12,376 INFO [io.ver.rab.imp.RabbitMQClientImpl] (pool-20-thread-5) Stopping rabbitmq client
2023-02-14 16:50:12,378 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-1) Disconnecting from rabbitmq...
2023-02-14 16:50:12,401 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-1) Disconnected from rabbitmq !
2023-02-14 16:50:12,402 INFO [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-eventloop-thread-7) Starting rabbitmq client
2023-02-14 16:50:12,403 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-1) Connecting to rabbitmq...
2023-02-14 16:50:12,410 DEBUG [com.rab.cli.imp.ConsumerWorkService] (vert.x-worker-thread-1) Creating executor service with 10 thread(s) for consumer work service
2023-02-14 16:50:12,418 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-1) Connected to rabbitmq !
2023-02-14 16:50:12,419 DEBUG [io.sma.rea.mes.rabbitmq] (vert.x-eventloop-thread-7) SRMSG17023: Established exchange `generator`
2023-02-14 16:50:12,420 DEBUG [io.sma.rea.mes.rabbitmq] (vert.x-eventloop-thread-7) SRMSG17025: Established queue `source`
2023-02-14 16:50:12,421 DEBUG [io.sma.rea.mes.rabbitmq] (vert.x-eventloop-thread-7) SRMSG17027: Established binding of queue `source` to exchange 'generator' using routing key '#'
2023-02-14 16:50:12,421 INFO [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-eventloop-thread-7) Successed to restart client.
2023-02-14 16:50:12,422 INFO [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-1) Reconsume queue: source success
2023-02-14 16:50:12,423 DEBUG [io.ver.rab.imp.QueueConsumerHandler] (pool-22-thread-3) Consumer tag is now amq.ctag-RSPBQVr2f3tAAuaQ2zXTzA
2023-02-14 16:50:12,591 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key
2023-02-14 16:50:13,091 DEBUG [io.sma.rea.mes.rabbitmq] (executor-thread-0) SRMSG17022: Sending a message to exchange `generator` with routing key
payload: 100
FYI @vietj
By looking at both the vertx-rabbitmq-client and smallrye-reactive-messaging-rabbitmq libraries, I found that the main issue was with the RabbitMQConsumer.resume()
during the consumer restart: https://github.com/vert-x3/vertx-rabbitmq-client/blob/master/src/main/java/io/vertx/rabbitmq/impl/RabbitMQClientImpl.java#L221
From what I understand the RabbitMQConsumer
is a ReadStream
that is working in flowing mode and not in fetch mode (based on the documentation of the ReadStream), because when the consumer is initialized the resume method is called setting InboundBuffer's demand to Long.MAX_VALUE
. When the RabbitMQConnector
sets up the Multi
the demand is reset to 1, fetching messages using the backpressure, but when the connection is restarted then the demand is changed back to Long.MAX_VALUE
, filling up the internal queue of the flat map operation and causing the backpressure failure.
The demand is not reset, since this operation is done during the onSubscribe
of the flow.
The main issue I see here is that ReadStream
supports both the flowing and the fetch mode, but the RabbitMQConsumer
is not handling appropriately the fetch mode.
I have checked that removing the resume during the restart solves the issue, but this change is in the vertx-rabbitmq-client library and I am not sure it does not affect the flowing mode, since I don't have any examples using that mode.
Hi everyone, any news about this topic? We have the same problem introduced when we migrated application from springBoot to quarkus and for simulate the prefetch count we used the "max-outstanding-messages" setting the value to 250.
Should we have a workaround? For example increasing this number... Regards
@engineering-dotcms For the issue with the prefetch count resetting to 0 after restarting the connection, it is a different issue, as someone also pointed out in this ticket. I created a separate ticket here: https://github.com/smallrye/smallrye-reactive-messaging/issues/2084
Same problem, joining the waiting list.
is there any update on this
Same issue here, any updates?
Same error here, setting max-outstanding-messages to 1 seems to do the trick, but I would much prefer a solution working also with any other value.
Same issue here
Same issues here