storm-jms icon indicating copy to clipboard operation
storm-jms copied to clipboard

Getting java.util.NoSuchElementException sporadically!

Open theGreatHeisenberg opened this issue 8 years ago • 1 comments

I am getting the following error while calling ack() on OutputCollector.

Can anybody help me to find a cause of this exception?

2016-10-07 08:46:46.856 [Thread-19-JMS_QUEUE_SPOUT] WARN  Message failed: org.apache.storm.jms.spout.JmsMessageID@77eb
2016-10-07 08:46:46.860 [Thread-19-JMS_QUEUE_SPOUT] ERROR  Async loop died!
java.lang.RuntimeException: java.util.NoSuchElementException
        at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:87) ~[storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.disruptor$consume_batch.invoke(disruptor.clj:76) ~[storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.daemon.executor$fn__4654$fn__4669$fn__4698.invoke(executor.clj:542) ~[storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.util$async_loop$fn__458.invoke(util.clj:463) ~[storm-core-0.9.4.jar:0.9.4]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:619) [na:1.6.0_21]
Caused by: java.util.NoSuchElementException: null
        at java.util.TreeMap.key(TreeMap.java:1206) ~[na:1.6.0_21]
        at java.util.TreeMap.firstKey(TreeMap.java:267) ~[na:1.6.0_21]
        at java.util.TreeSet.first(TreeSet.java:377) ~[na:1.6.0_21]
        at org.apache.storm.jms.spout.JmsSpout.ack(JmsSpout.java:243) ~[stormjar.jar:3.7]
        at backtype.storm.daemon.executor$ack_spout_msg.invoke(executor.clj:384) ~[storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.daemon.executor$fn__4654$tuple_action_fn__4660.invoke(executor.clj:446) ~[storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.daemon.executor$mk_task_receiver$fn__4645.invoke(executor.clj:401) ~[storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.disruptor$clojure_handler$reify__1446.onEvent(disruptor.clj:58) ~[storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) ~[storm-core-0.9.4.jar:0.9.4]
        ... 6 common frames omitted
2016-10-07 08:46:46.860 [Thread-19-JMS_QUEUE_SPOUT] ERROR
java.lang.RuntimeException: java.util.NoSuchElementException
        at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:87) ~[storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.disruptor$consume_batch.invoke(disruptor.clj:76) ~[storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.daemon.executor$fn__4654$fn__4669$fn__4698.invoke(executor.clj:542) ~[storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.util$async_loop$fn__458.invoke(util.clj:463) ~[storm-core-0.9.4.jar:0.9.4]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]

I am setting up my topology with a JmsSpout as it is mentioned in the example. I am connecting to an activeMQ via this spout. Everything works fine but sporadically I find the above stack trace in my logs and one of the spout instances stops dequeuing the messages from activeMQ. When I re-submit my topology, everything goes back to normal again.

JmsProvider jmsQueueProvider = new SpringJmsProvider(
            "file:" + options.activemqConfFile,
            Constants.JMS_CONNECTION_FACTORY,
            options.queueName
        );
        JmsTupleProducer producer = new JsonTupleProducer();
        // JMS Queue Spout
        JmsSpout queueSpout = new JmsSpout();
        queueSpout.setJmsProvider(jmsQueueProvider);
        queueSpout.setJmsTupleProducer(producer);
        queueSpout.setJmsAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
        queueSpout.setDistributed(true); // allow multiple instances
        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout(Constants.JMS_QUEUE_SPOUT, queueSpout, 5);

What could be the problem? How do I debug this?

P.S.: I am not setting a recoveringPeriod and hence it is defaulted to -1(if this is of any relevance).

theGreatHeisenberg avatar Oct 07 '16 09:10 theGreatHeisenberg

I am not able to figure out what is wrong with the above usage of Spout. Upon further debugging, I noticed the fail and ack methods in JmsSpout.


public void fail(Object msgId) {
 LOG.warn("Message failed: " + msgId);
 this.pendingMessages.clear();
 this.toCommit.clear();
 synchronized(this.recoveryMutex) {
  this.hasFailures = true;
 }
}
public void ack(Object msgId) {

 Message msg = this.pendingMessages.remove(msgId);
 JmsMessageID oldest = this.toCommit.first();
 if (msgId.equals(oldest)) {
  if (msg != null) {
   try {
    LOG.debug("Committing...");
    msg.acknowledge();
    LOG.debug("JMS Message acked: " + msgId);
    this.toCommit.remove(msgId);
   } catch (JMSException e) {
    LOG.warn("Error acknowldging JMS message: " + msgId, e);
   }
  } else {
   LOG.warn("Couldn't acknowledge unknown JMS message ID: " + msgId);
  }
 } else {
  this.toCommit.remove(msgId);
 }

}

In a fail method, even though we are calling a fail() for a particular messageId, the entire TreeSet toCommit is cleared. Eventually, if the next tuple is acked in the same bolt, this.toCommit.first() results in java.util.NoSuchElementException and the topology kicks in only after we kill and re-submit it.

Any help on this is really appreciated.

theGreatHeisenberg avatar May 12 '17 06:05 theGreatHeisenberg