smallrye-reactive-messaging
smallrye-reactive-messaging copied to clipboard
MQTT should add overflow strategy to avoid overflow buffer full
The only overflow strategy currently used by MQTT is buffer
.
https://github.com/smallrye/smallrye-reactive-messaging/blob/9120745c0fee96b0e67682f08aa36a8b017f7b79/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSource.java#L52-L59
I found a problem during the actual deployment of an instance for MQTT sync to timescaledb: When the downstream is blocked, the buffer will quickly fill up and the following exception will be thrown
2022-10-12 16:02:07,055 WARN [com.pes.pip.mqt.dao.HypertableInsertService] (vert.x-eventloop-thread-0) sql: insert into "metric_CF5_V8_Open_FB" ("timestamp", "topic", "value") values ($1, $2, $3), values: [2022-10-12T16:00:49+08:00,0aac127e-2f15-4536-b5db-f5c84d3095f2,false]: io.vertx.pgclient.PgException: FATAL: the database system is in recovery mode (57P03)
at io.vertx.pgclient.impl.codec.ErrorResponse.toException(ErrorResponse.java:31)
at io.vertx.pgclient.impl.codec.InitCommandCodec.handleErrorResponse(InitCommandCodec.java:100)
at io.vertx.pgclient.impl.codec.PgDecoder.decodeError(PgDecoder.java:246)
at io.vertx.pgclient.impl.codec.PgDecoder.decodeMessage(PgDecoder.java:132)
at io.vertx.pgclient.impl.codec.PgDecoder.channelRead(PgDecoder.java:112)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1373)
at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1236)
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1285)
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:510)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:449)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:833)
at com.oracle.svm.core.thread.PlatformThreads.threadStartRoutine(PlatformThreads.java:705)
at com.oracle.svm.core.posix.thread.PosixPlatformThreads.pthreadStartRoutine(PosixPlatformThreads.java:202)
2022-10-12 16:07:16,150 ERROR [io.sma.rea.mes.mqtt] (vert.x-eventloop-thread-0) SRMSG17105: Unable to establish a connection with the MQTT broker: io.smallrye.mutiny.subscription.BackPressureFailure: The overflow buffer is full, which is due to the upstream sending too many items w.r.t. the downstream capacity and/or the downstream not consuming items fast enough
at io.smallrye.mutiny.operators.multi.overflow.MultiOnOverflowBufferOp$OnOverflowBufferProcessor.onItem(MultiOnOverflowBufferOp.java:75)
at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
at io.smallrye.mutiny.operators.multi.multicast.MultiPublishOp$PublishSubscriber.drain(MultiPublishOp.java:487)
at io.smallrye.mutiny.operators.multi.multicast.MultiPublishOp$PublishSubscriber.onItem(MultiPublishOp.java:194)
at io.smallrye.mutiny.operators.multi.MultiMapOp$MapProcessor.onItem(MultiMapOp.java:50)
at io.smallrye.mutiny.operators.multi.MultiSelectWhereOp$MultiSelectWhereProcessor.onItem(MultiSelectWhereOp.java:58)
at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
at io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor$BroadcastSubscription.onNext(BroadcastProcessor.java:209)
at io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor.onNext(BroadcastProcessor.java:138)
at io.smallrye.reactive.messaging.mqtt.Clients$ClientHolder.lambda$new$0(Clients.java:58)
at io.smallrye.reactive.messaging.mqtt.session.impl.MqttClientSessionImpl.serverPublished(MqttClientSessionImpl.java:570)
at io.vertx.mqtt.impl.MqttClientImpl.handlePublish(MqttClientImpl.java:1186)
at io.vertx.mqtt.impl.MqttClientImpl.handleMessage(MqttClientImpl.java:951)
at io.vertx.mqtt.impl.MqttClientImpl.lambda$null$0(MqttClientImpl.java:278)
at io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:55)
at io.vertx.core.impl.ContextBase.emit(ContextBase.java:239)
at io.vertx.core.net.impl.NetSocketImpl.handleMessage(NetSocketImpl.java:394)
at io.vertx.core.net.impl.ConnectionBase.read(ConnectionBase.java:155)
at io.vertx.core.net.impl.VertxHandler.channelRead(VertxHandler.java:153)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:833)
at com.oracle.svm.core.thread.PlatformThreads.threadStartRoutine(PlatformThreads.java:705)
at com.oracle.svm.core.posix.thread.PosixPlatformThreads.pthreadStartRoutine(PosixPlatformThreads.java:202)
The exception would be that the entire service is unavailable. Since MQTT's message processing is real-time, I think we should let the user weigh up and choose the right overflow strategy for themselves.
switch (overflowStrategy) {
case "buffer":
mqtt = mqtt.onOverflow().buffer(config.getBufferSize());
break;
case "drop":
mqtt = mqtt.onOverflow().drop();
break;
}
Hi @cdmikechen, I think that the default strategy must depend on QoS. In QoS 0, loss of messages is allowed so the default should be drop, in other cases buffer is the way to go.
@dometec Thank you for your advice~
I think this design should make more sense, but it can be confusing for some beginners if some data is lost when using
I'm not very proficient in using mutiny yet, and I think the ideal situation might be to use drop
when the buffer
is full, rather than just exceptioning when the buffer
is full. Do you know if there is a similar way to do this?
The documentation about MQTT and QoS is very clear, in QoS 0 you can lose messages. I think that a WARN log where the buffer is full and the messages ad being dropped is enough. For the implementation, let me do some tests and then I'll tell you (I am seeing another possible problem with QoS 1)
@dometec @cdmikechen AFAIU QoS > 0 support for PUBACK/PUBREC needs changes in the Vert.x MQTT client. I am thinking we don't need to rollback #1948 before 3.22 release, WDYT?
Hi @ozangunalp, I'm preparing a PR to VertX Mqtt Client to manage QoS 1 and 2 ACK so will be possible to manage the message flow.
For this PR, sorry, I think that is useless (I think also is wrong also the direction = INCOMING_AND_OUTGOING, which should be only INCOMING).
For QoS = 0 --> default is to drop the message if the buffer is full (a WARN log might be useful). Maybe it could be useful to decide whether to use drop or drop-prev but I don't think it's that important.
For QoS > 0 --> we can (will...) manage the flow control, the buffer size must be large to hold messages in_flight (in MQTT 3.1 is a server configuration so we should know the value), so it will be impossible to fill the buffer.
Perfect! Thanks for the summary, I'll prepare a PR for the revert.
@ozangunalp @dometec I still have frequent problems with buffer full in test and production environments. QOS is a producer property, and my usage scenario has always been consumer, so the QOS setting is not useful.
2023-01-10 16:17:47,304 ERROR [io.sma.rea.mes.provider] (vert.x-eventloop-thread-0) SRMSG00201: Error caught while processing a message: io.smallrye.mutiny.subscription.BackPressureFailure: The overflow buffer is full, which is due to the upstream sending too many items w.r.t. the downstream capacity and/or the downstream not consuming items fast enough
at io.smallrye.mutiny.operators.multi.overflow.MultiOnOverflowBufferOp$OnOverflowBufferProcessor.onItem(MultiOnOverflowBufferOp.java:75)
at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
at io.smallrye.mutiny.operators.multi.multicast.MultiPublishOp$PublishSubscriber.drain(MultiPublishOp.java:487)
at io.smallrye.mutiny.operators.multi.multicast.MultiPublishOp$PublishSubscriber.onItem(MultiPublishOp.java:194)
at io.smallrye.mutiny.operators.multi.MultiMapOp$MapProcessor.onItem(MultiMapOp.java:50)
at io.smallrye.mutiny.operators.multi.MultiSelectWhereOp$MultiSelectWhereProcessor.onItem(MultiSelectWhereOp.java:58)
at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
at io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor$BroadcastSubscription.onNext(BroadcastProcessor.java:209)
at io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor.onNext(BroadcastProcessor.java:138)
at io.smallrye.reactive.messaging.mqtt.Clients$ClientHolder.lambda$new$0(Clients.java:58)
at io.smallrye.reactive.messaging.mqtt.session.impl.MqttClientSessionImpl.serverPublished(MqttClientSessionImpl.java:570)
at io.vertx.mqtt.impl.MqttClientImpl.handlePublish(MqttClientImpl.java:1186)
at io.vertx.mqtt.impl.MqttClientImpl.handleMessage(MqttClientImpl.java:951)
at io.vertx.mqtt.impl.MqttClientImpl.lambda$null$0(MqttClientImpl.java:278)
at io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:55)
at io.vertx.core.impl.ContextBase.emit(ContextBase.java:239)
at io.vertx.core.net.impl.NetSocketImpl.handleMessage(NetSocketImpl.java:394)
at io.vertx.core.net.impl.ConnectionBase.read(ConnectionBase.java:155)
at io.vertx.core.net.impl.VertxHandler.channelRead(VertxHandler.java:153)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314)
at io.netty.handler.codec.ReplayingDecoder.callDecode(ReplayingDecoder.java:349)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:833)
at com.oracle.svm.core.thread.PlatformThreads.threadStartRoutine(PlatformThreads.java:705)
at com.oracle.svm.core.posix.thread.PosixPlatformThreads.pthreadStartRoutine(PosixPlatformThreads.java:202)
Also, I found out that actually our current implementation of MQTT receives all messages and then filters them. I'm using the filtering feature to only process some of the topics, so I'm wondering if the buffer exception is caused by receiving a large number of requests at the same time, rather than my part of the processing code?
hi @cdmikechen, QoS is not useful until the merge of https://github.com/vert-x3/vertx-mqtt/pull/235 and the follow update of this library. At this time, the client can be overwhelming from boker's messages. You need to elaborate message quicly on client, maybe you can queue it, or you can use a shared subscription and run more instance of client.
For the filter, can you send me an example? If the client are not subscribe to a specific topic, it should not receive messages...
@dometec Here is the filter codes:
https://github.com/smallrye/smallrye-reactive-messaging/blob/d94086362eb1ac28d986c13fb1fd404de4a160d9/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSource.java#L37-L44
https://github.com/smallrye/smallrye-reactive-messaging/blob/d94086362eb1ac28d986c13fb1fd404de4a160d9/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSource.java#L53-L56
Perhaps my original expression was wrong, I meant that the code within reactive-messaging is based on a regular filter to receive messages.