JavaNSQClient
JavaNSQClient copied to clipboard
java.lang.IllegalStateException: Queue full
java.lang.IllegalStateException: Queue full
at java.util.AbstractQueue.add(AbstractQueue.java:98)
at com.github.brainlag.nsq.Connection.incoming(Connection.java:115)
at com.github.brainlag.nsq.netty.NSQHandler.lambda$channelRead0$3(NSQHandler.java:41)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:358)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:748)
Went through the code and found the following line in Connection.java :
private final LinkedBlockingQueue<NSQFrame> responses = new LinkedBlockingQueue<>(1);
Wanted to understand why this is initialized with capacity 1 and what could potentially be causing this error. Could it be related to high rate of polling NSQ? We deal with a lot of data.
Same here.
I find this problem is caused by the 'no-waiting' command and ErrorFrame processing logic
For example:
If I call FIN command multiple times, nsqd will response "E_FIN_FAILED FIN 0a8c981c6da8c000 failed ID not in flight"
The problem is:
- the FIN command is sent but not waiting for response
- Read Channel received ErrorFrame, this frame is added to
responses
queue, but no consumer is waiting on this queue - if 1 and 2 happens again, the
responses
queue is overflow
connection.command(NSQCommand.finish(this.id))
public ChannelFuture command(final NSQCommand command) {
return channel.writeAndFlush(command);
}
if (frame instanceof ErrorFrame) {
if (errorCallback != null) {
errorCallback.error(NSQException.of((ErrorFrame) frame));
}
responses.add(frame);
return;
}
@rohithbv1991 I was facing the same issue again, I tracked it and read the the whole source code then came here to raise the issue and found this. :joy:
Faced the same problem. It crashes after the second timed out message.
hello i'm here too now
Go to RabbitMQ, NSQ is dead.
Connection
I met too,because message.finished() Execute many times
This forked from nsqio/TrendrrNSQClient , and in that project,
responses.add(frame);
has been replaced by
responses.offer(frame,1,...);
Hope help anyone.