storm-esper
storm-esper copied to clipboard
Synchronization on OutputCollector
Hello,
Synchronization on OutputCollector is required, e.g.:
synchronized (collector){
collector.ack(tuple);
}
and
synchronized (collector){
collector.emit(finalEventType.getStreamId(), toTuple(newEvent, finalEventType.getFields()));
}
Otherwise there are exceptions of this kind:
java.lang.RuntimeException: java.lang.NullPointerException
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:84)
at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:55)
at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:56)
at backtype.storm.disruptor$consume_loop_STAR_$fn__1596.invoke(disruptor.clj:67)
at backtype.storm.util$async_loop$fn__465.invoke(util.clj:377)
at clojure.lang.AFn.run(AFn.java:24)
at java.lang.Thread.run(Thread.java:662)
Caused by: java.lang.NullPointerException
at backtype.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:24)
at backtype.storm.daemon.worker$mk_transfer_fn$fn__4126$fn__4130.invoke(worker.clj:99)
at backtype.storm.util$fast_list_map.invoke(util.clj:771)
at backtype.storm.daemon.worker$mk_transfer_fn$fn__4126.invoke(worker.clj:99)
at backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3904.invoke(executor.clj:205)
at backtype.storm.disruptor$clojure_handler$reify__1584.onEvent(disruptor.clj:43)
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:81)
... 6 more
Could you post the code that lead to this error ?
Sure, but one thing first: I'm using the project with Storm 0.8.2 & Esper 4.10.0 - that might cause different behavior - I'm not sure.
I'm including the code for construction of the failing Bolt. Only this bolt experienced the exception.
final EsperBolt.StatementsBuilder statBu = inBu
.outputs()
.onStream("alphaGroupBetaGroupGammaStream")
.fromEventType("AlphaGroupBetaGroupGamma")
.emit("gamma", "alphaGroup", "betaGroup", "vol", "sigmaAmount",
"deltaAmount", "calculatedValue", "sideValue")
.outputs()
.onStream("alphaGroupBetaGroupGammaRedisStream")
.fromEventType("AlphaGroupBetaGroupGammaRedis")
.emit("gamma", "alphaGroup", "betaGroup", "vol", "sigmaAmount",
"deltaAmount", "calculatedValue", "sideValue", "measurementTimestamp", "receivedTimestamp")
.statements()
.add("insert into AlphaOccurenceCasted select cast(identifier,String) as identifier, cast(alphaGroup,String) as alphaGroup, cast(betaGroup,String) as betaGroup, cast(beta,String) as beta, "
+ "cast(server,String) as server, cast(gamma, int) as gamma, cast(vol, double) as vol, cast(minusModifier, double) as minusModifier, "
+ "cast(plusModifier, double) as plusModifier, cast(sigmaAmount, double) as sigmaAmount, cast(deltaAmount, double) as deltaAmount, cast(measurementTimestamp, long) as measurementTimestamp, cast(receivedTimestamp, long) as receivedTimestamp from AlphaOccurence")
.add("insert into BetaAspectCasted select cast(betaGroup,String) as betaGroup, "
+ "cast(plusMultiplier, double) as plusMultiplier, "
+ "cast(minusMultiplier, double) as minusMultiplier, cast(theta_tickvalue,double) as theta, cast(kappa_tickvalue,double) as kappa from BetaAspect group by betaGroup output last every 1 sec")
.add("insert into DeleteStream select * from AlphaOccurenceCasted where identifier='-9999'")
.add("create window AlphaOccurenceWindow.std:unique(identifier, beta, server, gamma) as select * from AlphaOccurenceCasted")
.add("insert into AlphaOccurenceWindow select * from AlphaOccurenceCasted where identifier!='-9999'")
.add("on DeleteStream as del update AlphaOccurenceWindow as win set vol = 0, minusModifier=0, plusModifier=0 where win.server=del.server and win.gamma=del.gamma")
.add("on DeleteStream as del insert into aux"
+ " select win.identifier as identifier, "
+ " win.beta as beta, "
+ " win.betaGroup as betaGroup, "
+ " win.alphaGroup as alphaGroup, "
+ " win.server as server, "
+ " win.gamma as gamma, "
+ " win.vol as vol, "
+ " win.minusModifier as minusModifier, "
+ " win.plusModifier as plusModifier,"
+ " win.sigmaAmount as sigmaAmount,"
+ " win.deltaAmount as deltaAmount,"
+ " win.measurementTimestamp as measurementTimestamp,"
+ " win.receivedTimestamp as receivedTimestamp "
+ " from AlphaOccurenceWindow as win"
+ " where win.server=del.server and win.gamma=del.gamma")
.add("insert into aux select * from AlphaOccurenceWindow")
.add("insert into AlphaGroupBetaGroupGamma "
+ " select occurence.betaGroup as betaGroup, "
+ " occurence.gamma as gamma, "
+ " occurence.alphaGroup as alphaGroup, "
+ " sum(occurence.vol) as vol,"
+ " sum(occurence.sigmaAmount) as sigmaAmount,"
+ " sum(occurence.deltaAmount) as deltaAmount,"
+ " (-1)*( (case when (sum (case when (occurence.vol >= 0) then (occurence.vol * aspect.kappa) else (occurence.vol * aspect.theta) end) - sum(occurence.minusModifier)) > 0 then ((sum (case when (occurence.vol >= 0) then (occurence.vol * aspect.kappa) else (occurence.vol * aspect.theta) end) - sum(occurence.minusModifier)) * aspect.plusMultiplier) else ((sum (case when (occurence.vol >= 0) then (occurence.vol * aspect.kappa) else (occurence.vol * aspect.theta) end) - sum(occurence.minusModifier)) * aspect.minusMultiplier) end) + sum(occurence.plusModifier) ) as calculatedValue, "
+ " (-1) * sum(occurence.plusModifier) as sideValue, "
+ " max(occurence.measurementTimestamp) as measurementTimestamp, "
+ " max(occurence.receivedTimestamp) as receivedTimestamp "
+ " from aux.std:unique(identifier, alphaGroup, beta, server, gamma) as occurence,"
+ " BetaAspectCasted.std:unique(betaGroup) as aspect"
// last entry per beta
+ " where occurence.betaGroup=aspect.betaGroup"
+ " group by occurence.alphaGroup, occurence.betaGroup, occurence.gamma ")
.add("insert into AlphaGroupBetaGroupGammaRedis select * from AlphaGroupBetaGroupGamma where alphaGroup <> '-7777'");
It basically is a simple join of streams: AlphaOccurrence & BetaAspect.
The only difference from other bolts is the creation of "aux" window & updating the entries on DeleteStream. The idea is that when event with identifier "-9999" is encountered, then all existing entries should be updated with zeros & published containing them.
This bolt also experiences the highest load.
Best,
Jacek
The reason I was asking for the code is that the error indicates concurrent access to the output collector which shouldn't happen unless it's coming from Esper, and with the code I can maybe create a test case.