JavaNSQClient
JavaNSQClient copied to clipboard
fail to run message.requeue();
nsq version:1.0 os:centos 7.4
codes: public static void main( String[] args ) { NSQLookup lookup = new DefaultNSQLookup(); lookup.addLookupAddress("192.168.1.228", 4161); NSQConsumer consumer = new NSQConsumer(lookup, "TestTopic", "dusti", (message) -> { System.out.println("received: " + new String(message.getMessage())); message.finished(); message.requeue(); });
consumer.start();
}
below is run time messages:
17:00:37.080 INFO Created connection: knowledgebase:4150 - Connection.
I came across the same problem when receiving messages from nsq.
The java code below
` protected MsgConsumer(MsgConfig config, String topic) {
NSQLookup lookup = new DefaultNSQLookup();
lookup.addLookupAddress(config.getHost(), config.getPort());
logger.info("start to init nsq consumer, host={},port={},topic={}", config.getHost(), config.getPort(), topic);
NSQConsumer consumer = new NSQConsumer(lookup, topic, channel,new NSQMessageCallback (){
@Override
public void message(NSQMessage message) {
try {
message.finished();
String msg = new String(message.getMessage(), "UTF-8");
System.out.println("接收 rev:"+msg);
if (Strings.isNullOrEmpty(msg)) {
logger.info("consume msg is empty.");
return;
}
onMessage(msg);
} catch (Exception e) {
message.requeue();
logger.error("failed to consume message({}), cause: {}",
message, Throwables.getStackTraceAsString(e));
}
}
});
consumer.start();
}
`
The exception below {"date":"2018-02-06T16:44:43.231","traceId":"null","sequenceId":"null","level":"WARN","appName":"box-service","class":"io.netty.util.concurrent.SingleThreadEventExecutor","method":"warn","line":"151","message":"A task raised an exception. java.lang.IllegalStateException: Queue full at java.util.AbstractQueue.add(AbstractQueue.java:98) at com.github.brainlag.nsq.Connection.incoming(Connection.java:129) at com.github.brainlag.nsq.netty.NSQHandler.lambda$channelRead0$3(NSQHandler.java:41) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:408) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:402) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) at java.lang.Thread.run(Thread.java:745)
how can I fix it?