storm-jms
storm-jms copied to clipboard
Getting java.util.NoSuchElementException sporadically!
I am getting the following error while calling ack()
on OutputCollector
.
Can anybody help me to find a cause of this exception?
2016-10-07 08:46:46.856 [Thread-19-JMS_QUEUE_SPOUT] WARN Message failed: org.apache.storm.jms.spout.JmsMessageID@77eb
2016-10-07 08:46:46.860 [Thread-19-JMS_QUEUE_SPOUT] ERROR Async loop died!
java.lang.RuntimeException: java.util.NoSuchElementException
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:87) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.disruptor$consume_batch.invoke(disruptor.clj:76) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.daemon.executor$fn__4654$fn__4669$fn__4698.invoke(executor.clj:542) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.util$async_loop$fn__458.invoke(util.clj:463) ~[storm-core-0.9.4.jar:0.9.4]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:619) [na:1.6.0_21]
Caused by: java.util.NoSuchElementException: null
at java.util.TreeMap.key(TreeMap.java:1206) ~[na:1.6.0_21]
at java.util.TreeMap.firstKey(TreeMap.java:267) ~[na:1.6.0_21]
at java.util.TreeSet.first(TreeSet.java:377) ~[na:1.6.0_21]
at org.apache.storm.jms.spout.JmsSpout.ack(JmsSpout.java:243) ~[stormjar.jar:3.7]
at backtype.storm.daemon.executor$ack_spout_msg.invoke(executor.clj:384) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.daemon.executor$fn__4654$tuple_action_fn__4660.invoke(executor.clj:446) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.daemon.executor$mk_task_receiver$fn__4645.invoke(executor.clj:401) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.disruptor$clojure_handler$reify__1446.onEvent(disruptor.clj:58) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) ~[storm-core-0.9.4.jar:0.9.4]
... 6 common frames omitted
2016-10-07 08:46:46.860 [Thread-19-JMS_QUEUE_SPOUT] ERROR
java.lang.RuntimeException: java.util.NoSuchElementException
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:87) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.disruptor$consume_batch.invoke(disruptor.clj:76) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.daemon.executor$fn__4654$fn__4669$fn__4698.invoke(executor.clj:542) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.util$async_loop$fn__458.invoke(util.clj:463) ~[storm-core-0.9.4.jar:0.9.4]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
I am setting up my topology with a JmsSpout as it is mentioned in the example. I am connecting to an activeMQ via this spout. Everything works fine but sporadically I find the above stack trace in my logs and one of the spout instances stops dequeuing the messages from activeMQ. When I re-submit my topology, everything goes back to normal again.
JmsProvider jmsQueueProvider = new SpringJmsProvider(
"file:" + options.activemqConfFile,
Constants.JMS_CONNECTION_FACTORY,
options.queueName
);
JmsTupleProducer producer = new JsonTupleProducer();
// JMS Queue Spout
JmsSpout queueSpout = new JmsSpout();
queueSpout.setJmsProvider(jmsQueueProvider);
queueSpout.setJmsTupleProducer(producer);
queueSpout.setJmsAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
queueSpout.setDistributed(true); // allow multiple instances
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(Constants.JMS_QUEUE_SPOUT, queueSpout, 5);
What could be the problem? How do I debug this?
P.S.: I am not setting a recoveringPeriod
and hence it is defaulted to -1
(if this is of any relevance).
I am not able to figure out what is wrong with the above usage of Spout.
Upon further debugging, I noticed the fail
and ack
methods in JmsSpout
.
public void fail(Object msgId) {
LOG.warn("Message failed: " + msgId);
this.pendingMessages.clear();
this.toCommit.clear();
synchronized(this.recoveryMutex) {
this.hasFailures = true;
}
}
public void ack(Object msgId) {
Message msg = this.pendingMessages.remove(msgId);
JmsMessageID oldest = this.toCommit.first();
if (msgId.equals(oldest)) {
if (msg != null) {
try {
LOG.debug("Committing...");
msg.acknowledge();
LOG.debug("JMS Message acked: " + msgId);
this.toCommit.remove(msgId);
} catch (JMSException e) {
LOG.warn("Error acknowldging JMS message: " + msgId, e);
}
} else {
LOG.warn("Couldn't acknowledge unknown JMS message ID: " + msgId);
}
} else {
this.toCommit.remove(msgId);
}
}
In a fail
method, even though we are calling a fail()
for a particular messageId, the entire TreeSet toCommit
is cleared. Eventually, if the next tuple is acked in the same bolt, this.toCommit.first()
results in java.util.NoSuchElementException
and the topology kicks in only after we kill and re-submit it.
Any help on this is really appreciated.