storm-esper icon indicating copy to clipboard operation
storm-esper copied to clipboard

Synchronization on OutputCollector

Open ponythewhite opened this issue 10 years ago • 3 comments

Hello,

Synchronization on OutputCollector is required, e.g.:

synchronized (collector){
    collector.ack(tuple);
}

and

synchronized (collector){
    collector.emit(finalEventType.getStreamId(), toTuple(newEvent, finalEventType.getFields()));
}

Otherwise there are exceptions of this kind:

java.lang.RuntimeException: java.lang.NullPointerException
    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:84)
    at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:55)
    at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:56)
    at backtype.storm.disruptor$consume_loop_STAR_$fn__1596.invoke(disruptor.clj:67)
    at backtype.storm.util$async_loop$fn__465.invoke(util.clj:377)
    at clojure.lang.AFn.run(AFn.java:24)
    at java.lang.Thread.run(Thread.java:662)
Caused by: java.lang.NullPointerException
    at backtype.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:24)
    at backtype.storm.daemon.worker$mk_transfer_fn$fn__4126$fn__4130.invoke(worker.clj:99)
    at backtype.storm.util$fast_list_map.invoke(util.clj:771)
    at backtype.storm.daemon.worker$mk_transfer_fn$fn__4126.invoke(worker.clj:99)
    at backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3904.invoke(executor.clj:205)
    at backtype.storm.disruptor$clojure_handler$reify__1584.onEvent(disruptor.clj:43)
    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:81)
    ... 6 more

ponythewhite avatar Dec 06 '13 15:12 ponythewhite

Could you post the code that lead to this error ?

tomdz avatar Dec 06 '13 23:12 tomdz

Sure, but one thing first: I'm using the project with Storm 0.8.2 & Esper 4.10.0 - that might cause different behavior - I'm not sure.

I'm including the code for construction of the failing Bolt. Only this bolt experienced the exception.

        final EsperBolt.StatementsBuilder statBu = inBu
                .outputs()
                .onStream("alphaGroupBetaGroupGammaStream")
                .fromEventType("AlphaGroupBetaGroupGamma")
                .emit("gamma", "alphaGroup", "betaGroup", "vol", "sigmaAmount",
                        "deltaAmount", "calculatedValue", "sideValue")
                .outputs()
                .onStream("alphaGroupBetaGroupGammaRedisStream")
                .fromEventType("AlphaGroupBetaGroupGammaRedis")
                .emit("gamma", "alphaGroup", "betaGroup", "vol", "sigmaAmount",
                        "deltaAmount", "calculatedValue", "sideValue", "measurementTimestamp", "receivedTimestamp")
                .statements()
                .add("insert into AlphaOccurenceCasted select cast(identifier,String) as identifier, cast(alphaGroup,String) as alphaGroup, cast(betaGroup,String) as betaGroup, cast(beta,String) as beta, "
                        + "cast(server,String) as server, cast(gamma, int) as gamma, cast(vol, double) as vol, cast(minusModifier, double) as minusModifier, "
                        + "cast(plusModifier, double) as plusModifier,  cast(sigmaAmount, double) as sigmaAmount, cast(deltaAmount, double) as deltaAmount, cast(measurementTimestamp, long) as measurementTimestamp, cast(receivedTimestamp, long) as receivedTimestamp from AlphaOccurence")

                .add("insert into BetaAspectCasted select cast(betaGroup,String) as betaGroup, "
                        + "cast(plusMultiplier, double) as plusMultiplier, "
                        + "cast(minusMultiplier, double) as minusMultiplier, cast(theta_tickvalue,double) as theta, cast(kappa_tickvalue,double) as kappa from BetaAspect group by betaGroup output last every 1 sec")
                .add("insert into DeleteStream select * from AlphaOccurenceCasted where identifier='-9999'")
                .add("create window AlphaOccurenceWindow.std:unique(identifier, beta, server, gamma) as select * from AlphaOccurenceCasted")
                .add("insert into AlphaOccurenceWindow select * from AlphaOccurenceCasted where identifier!='-9999'")
                .add("on DeleteStream as del update AlphaOccurenceWindow as win set vol = 0, minusModifier=0, plusModifier=0 where win.server=del.server and win.gamma=del.gamma")
                .add("on DeleteStream as del insert into aux"
                        + " select win.identifier           as identifier, "
                        + "      win.beta                   as beta, "
                        + "      win.betaGroup              as betaGroup, "
                        + "      win.alphaGroup             as alphaGroup, "
                        + "      win.server                 as server, "
                        + "      win.gamma                  as gamma, "
                        + "      win.vol                    as vol, "
                        + "      win.minusModifier          as minusModifier, "
                        + "      win.plusModifier           as plusModifier,"
                        + "      win.sigmaAmount            as sigmaAmount,"
                        + "      win.deltaAmount            as deltaAmount,"
                        + "      win.measurementTimestamp   as measurementTimestamp,"
                        + "      win.receivedTimestamp      as receivedTimestamp "
                        + " from AlphaOccurenceWindow as win"
                        + " where win.server=del.server and win.gamma=del.gamma")
                .add("insert into aux select * from AlphaOccurenceWindow")
                .add("insert into AlphaGroupBetaGroupGamma "
                        + " select  occurence.betaGroup                     as betaGroup, "
                        + "         occurence.gamma                         as gamma, "
                        + "         occurence.alphaGroup                    as alphaGroup, "
                        + "         sum(occurence.vol)                      as vol,"
                        + "         sum(occurence.sigmaAmount)              as sigmaAmount,"
                        + "         sum(occurence.deltaAmount)              as deltaAmount,"
                        + " (-1)*( (case when (sum (case when (occurence.vol >= 0) then (occurence.vol * aspect.kappa) else (occurence.vol * aspect.theta) end) - sum(occurence.minusModifier)) > 0 then ((sum (case when (occurence.vol >= 0) then (occurence.vol * aspect.kappa) else (occurence.vol * aspect.theta) end) - sum(occurence.minusModifier)) * aspect.plusMultiplier) else ((sum (case when (occurence.vol >= 0) then (occurence.vol * aspect.kappa) else (occurence.vol * aspect.theta) end) - sum(occurence.minusModifier)) * aspect.minusMultiplier) end) + sum(occurence.plusModifier) ) as calculatedValue, "
                        + " (-1) * sum(occurence.plusModifier)              as sideValue, "
                        + "         max(occurence.measurementTimestamp)     as measurementTimestamp, "
                        + "         max(occurence.receivedTimestamp)        as receivedTimestamp "
                        + " from aux.std:unique(identifier, alphaGroup, beta, server, gamma) as occurence,"
                        + " BetaAspectCasted.std:unique(betaGroup) as aspect"
                        // last entry per beta
                        + " where occurence.betaGroup=aspect.betaGroup"
                        + " group by occurence.alphaGroup, occurence.betaGroup, occurence.gamma ")
                .add("insert into AlphaGroupBetaGroupGammaRedis select * from AlphaGroupBetaGroupGamma where alphaGroup <> '-7777'");

It basically is a simple join of streams: AlphaOccurrence & BetaAspect.

The only difference from other bolts is the creation of "aux" window & updating the entries on DeleteStream. The idea is that when event with identifier "-9999" is encountered, then all existing entries should be updated with zeros & published containing them.

This bolt also experiences the highest load.

Best,

Jacek

ponythewhite avatar Dec 10 '13 15:12 ponythewhite

The reason I was asking for the code is that the error indicates concurrent access to the output collector which shouldn't happen unless it's coming from Esper, and with the code I can maybe create a test case.

tomdz avatar Dec 12 '13 11:12 tomdz