smallrye-reactive-messaging icon indicating copy to clipboard operation
smallrye-reactive-messaging copied to clipboard

MQTT should add overflow strategy to avoid overflow buffer full

Open cdmikechen opened this issue 2 years ago • 10 comments

The only overflow strategy currently used by MQTT is buffer.

https://github.com/smallrye/smallrye-reactive-messaging/blob/9120745c0fee96b0e67682f08aa36a8b017f7b79/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSource.java#L52-L59

I found a problem during the actual deployment of an instance for MQTT sync to timescaledb: When the downstream is blocked, the buffer will quickly fill up and the following exception will be thrown

2022-10-12 16:02:07,055 WARN  [com.pes.pip.mqt.dao.HypertableInsertService] (vert.x-eventloop-thread-0) sql: insert into "metric_CF5_V8_Open_FB" ("timestamp", "topic", "value") values ($1, $2, $3), values: [2022-10-12T16:00:49+08:00,0aac127e-2f15-4536-b5db-f5c84d3095f2,false]: io.vertx.pgclient.PgException: FATAL: the database system is in recovery mode (57P03)
	at io.vertx.pgclient.impl.codec.ErrorResponse.toException(ErrorResponse.java:31)
	at io.vertx.pgclient.impl.codec.InitCommandCodec.handleErrorResponse(InitCommandCodec.java:100)
	at io.vertx.pgclient.impl.codec.PgDecoder.decodeError(PgDecoder.java:246)
	at io.vertx.pgclient.impl.codec.PgDecoder.decodeMessage(PgDecoder.java:132)
	at io.vertx.pgclient.impl.codec.PgDecoder.channelRead(PgDecoder.java:112)
	at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1373)
	at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1236)
	at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1285)
	at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:510)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:449)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:833)
	at com.oracle.svm.core.thread.PlatformThreads.threadStartRoutine(PlatformThreads.java:705)
	at com.oracle.svm.core.posix.thread.PosixPlatformThreads.pthreadStartRoutine(PosixPlatformThreads.java:202)

2022-10-12 16:07:16,150 ERROR [io.sma.rea.mes.mqtt] (vert.x-eventloop-thread-0) SRMSG17105: Unable to establish a connection with the MQTT broker: io.smallrye.mutiny.subscription.BackPressureFailure: The overflow buffer is full, which is due to the upstream sending too many items w.r.t. the downstream capacity and/or the downstream not consuming items fast enough
	at io.smallrye.mutiny.operators.multi.overflow.MultiOnOverflowBufferOp$OnOverflowBufferProcessor.onItem(MultiOnOverflowBufferOp.java:75)
	at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
	at io.smallrye.mutiny.operators.multi.multicast.MultiPublishOp$PublishSubscriber.drain(MultiPublishOp.java:487)
	at io.smallrye.mutiny.operators.multi.multicast.MultiPublishOp$PublishSubscriber.onItem(MultiPublishOp.java:194)
	at io.smallrye.mutiny.operators.multi.MultiMapOp$MapProcessor.onItem(MultiMapOp.java:50)
	at io.smallrye.mutiny.operators.multi.MultiSelectWhereOp$MultiSelectWhereProcessor.onItem(MultiSelectWhereOp.java:58)
	at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
	at io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor$BroadcastSubscription.onNext(BroadcastProcessor.java:209)
	at io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor.onNext(BroadcastProcessor.java:138)
	at io.smallrye.reactive.messaging.mqtt.Clients$ClientHolder.lambda$new$0(Clients.java:58)
	at io.smallrye.reactive.messaging.mqtt.session.impl.MqttClientSessionImpl.serverPublished(MqttClientSessionImpl.java:570)
	at io.vertx.mqtt.impl.MqttClientImpl.handlePublish(MqttClientImpl.java:1186)
	at io.vertx.mqtt.impl.MqttClientImpl.handleMessage(MqttClientImpl.java:951)
	at io.vertx.mqtt.impl.MqttClientImpl.lambda$null$0(MqttClientImpl.java:278)
	at io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:55)
	at io.vertx.core.impl.ContextBase.emit(ContextBase.java:239)
	at io.vertx.core.net.impl.NetSocketImpl.handleMessage(NetSocketImpl.java:394)
	at io.vertx.core.net.impl.ConnectionBase.read(ConnectionBase.java:155)
	at io.vertx.core.net.impl.VertxHandler.channelRead(VertxHandler.java:153)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:833)
	at com.oracle.svm.core.thread.PlatformThreads.threadStartRoutine(PlatformThreads.java:705)
	at com.oracle.svm.core.posix.thread.PosixPlatformThreads.pthreadStartRoutine(PosixPlatformThreads.java:202)

The exception would be that the entire service is unavailable. Since MQTT's message processing is real-time, I think we should let the user weigh up and choose the right overflow strategy for themselves.

switch (overflowStrategy) {
   case "buffer":
    mqtt = mqtt.onOverflow().buffer(config.getBufferSize());
    break;
  case "drop":
    mqtt = mqtt.onOverflow().drop();
    break;
}

cdmikechen avatar Oct 12 '22 09:10 cdmikechen

Hi @cdmikechen, I think that the default strategy must depend on QoS. In QoS 0, loss of messages is allowed so the default should be drop, in other cases buffer is the way to go.

dometec avatar Oct 21 '22 06:10 dometec

@dometec Thank you for your advice~ I think this design should make more sense, but it can be confusing for some beginners if some data is lost when using I'm not very proficient in using mutiny yet, and I think the ideal situation might be to use drop when the buffer is full, rather than just exceptioning when the buffer is full. Do you know if there is a similar way to do this?

cdmikechen avatar Oct 25 '22 00:10 cdmikechen

The documentation about MQTT and QoS is very clear, in QoS 0 you can lose messages. I think that a WARN log where the buffer is full and the messages ad being dropped is enough. For the implementation, let me do some tests and then I'll tell you (I am seeing another possible problem with QoS 1)

dometec avatar Oct 25 '22 12:10 dometec

@dometec @cdmikechen AFAIU QoS > 0 support for PUBACK/PUBREC needs changes in the Vert.x MQTT client. I am thinking we don't need to rollback #1948 before 3.22 release, WDYT?

ozangunalp avatar Nov 02 '22 17:11 ozangunalp

Hi @ozangunalp, I'm preparing a PR to VertX Mqtt Client to manage QoS 1 and 2 ACK so will be possible to manage the message flow.

For this PR, sorry, I think that is useless (I think also is wrong also the direction = INCOMING_AND_OUTGOING, which should be only INCOMING).

For QoS = 0 --> default is to drop the message if the buffer is full (a WARN log might be useful). Maybe it could be useful to decide whether to use drop or drop-prev but I don't think it's that important.

For QoS > 0 --> we can (will...) manage the flow control, the buffer size must be large to hold messages in_flight (in MQTT 3.1 is a server configuration so we should know the value), so it will be impossible to fill the buffer.

dometec avatar Nov 07 '22 11:11 dometec

Perfect! Thanks for the summary, I'll prepare a PR for the revert.

ozangunalp avatar Nov 07 '22 13:11 ozangunalp

@ozangunalp @dometec I still have frequent problems with buffer full in test and production environments. QOS is a producer property, and my usage scenario has always been consumer, so the QOS setting is not useful.

2023-01-10 16:17:47,304 ERROR [io.sma.rea.mes.provider] (vert.x-eventloop-thread-0) SRMSG00201: Error caught while processing a message: io.smallrye.mutiny.subscription.BackPressureFailure: The overflow buffer is full, which is due to the upstream sending too many items w.r.t. the downstream capacity and/or the downstream not consuming items fast enough
	at io.smallrye.mutiny.operators.multi.overflow.MultiOnOverflowBufferOp$OnOverflowBufferProcessor.onItem(MultiOnOverflowBufferOp.java:75)
	at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
	at io.smallrye.mutiny.operators.multi.multicast.MultiPublishOp$PublishSubscriber.drain(MultiPublishOp.java:487)
	at io.smallrye.mutiny.operators.multi.multicast.MultiPublishOp$PublishSubscriber.onItem(MultiPublishOp.java:194)
	at io.smallrye.mutiny.operators.multi.MultiMapOp$MapProcessor.onItem(MultiMapOp.java:50)
	at io.smallrye.mutiny.operators.multi.MultiSelectWhereOp$MultiSelectWhereProcessor.onItem(MultiSelectWhereOp.java:58)
	at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
	at io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor$BroadcastSubscription.onNext(BroadcastProcessor.java:209)
	at io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor.onNext(BroadcastProcessor.java:138)
	at io.smallrye.reactive.messaging.mqtt.Clients$ClientHolder.lambda$new$0(Clients.java:58)
	at io.smallrye.reactive.messaging.mqtt.session.impl.MqttClientSessionImpl.serverPublished(MqttClientSessionImpl.java:570)
	at io.vertx.mqtt.impl.MqttClientImpl.handlePublish(MqttClientImpl.java:1186)
	at io.vertx.mqtt.impl.MqttClientImpl.handleMessage(MqttClientImpl.java:951)
	at io.vertx.mqtt.impl.MqttClientImpl.lambda$null$0(MqttClientImpl.java:278)
	at io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:55)
	at io.vertx.core.impl.ContextBase.emit(ContextBase.java:239)
	at io.vertx.core.net.impl.NetSocketImpl.handleMessage(NetSocketImpl.java:394)
	at io.vertx.core.net.impl.ConnectionBase.read(ConnectionBase.java:155)
	at io.vertx.core.net.impl.VertxHandler.channelRead(VertxHandler.java:153)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314)
	at io.netty.handler.codec.ReplayingDecoder.callDecode(ReplayingDecoder.java:349)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:833)
	at com.oracle.svm.core.thread.PlatformThreads.threadStartRoutine(PlatformThreads.java:705)
	at com.oracle.svm.core.posix.thread.PosixPlatformThreads.pthreadStartRoutine(PosixPlatformThreads.java:202)

cdmikechen avatar Jan 11 '23 00:01 cdmikechen

Also, I found out that actually our current implementation of MQTT receives all messages and then filters them. I'm using the filtering feature to only process some of the topics, so I'm wondering if the buffer exception is caused by receiving a large number of requests at the same time, rather than my part of the processing code?

cdmikechen avatar Jan 11 '23 00:01 cdmikechen

hi @cdmikechen, QoS is not useful until the merge of https://github.com/vert-x3/vertx-mqtt/pull/235 and the follow update of this library. At this time, the client can be overwhelming from boker's messages. You need to elaborate message quicly on client, maybe you can queue it, or you can use a shared subscription and run more instance of client.

For the filter, can you send me an example? If the client are not subscribe to a specific topic, it should not receive messages...

dometec avatar Jan 11 '23 08:01 dometec

@dometec Here is the filter codes:

https://github.com/smallrye/smallrye-reactive-messaging/blob/d94086362eb1ac28d986c13fb1fd404de4a160d9/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSource.java#L37-L44

https://github.com/smallrye/smallrye-reactive-messaging/blob/d94086362eb1ac28d986c13fb1fd404de4a160d9/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSource.java#L53-L56

Perhaps my original expression was wrong, I meant that the code within reactive-messaging is based on a regular filter to receive messages.

cdmikechen avatar Jan 11 '23 09:01 cdmikechen