storm icon indicating copy to clipboard operation
storm copied to clipboard

[STORM-4000] Processing late tuples from BaseWindowedBolt results in serialization exception

Open jira-importer opened this issue 2 years ago • 5 comments

I am developing an Apache Storm (v2.5.0) topology that reads events from a spout (BaseRichSpout), counts the number of events in tumbling windows (BaseWindowedBolt), and prints the count (BaseRichBolt). The topology works fine, but there are some out-of-order events in my dataset. The BaseWindowedBolt provides withLateTupleStream method to route late events to a separate stream. However, when I try to process late events, I get a serialization exception:

Caused by: com.esotericsoftware.kryo.KryoException: java.lang.IllegalArgumentException: Class is not registered: org.apache.storm.shade.com.google.common.collect.SingletonImmutableBiMap
Note: To register this class use: kryo.register(org.apache.storm.shade.com.google.common.collect.SingletonImmutableBiMap.class);
Serialization trace:
defaultResources (org.apache.storm.task.WorkerTopologyContext)
context (org.apache.storm.tuple.TupleImpl)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:101) ~[kryo-4.0.2.jar:?]
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508) ~[kryo-4.0.2.jar:?]
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575) ~[kryo-4.0.2.jar:?]
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79) ~[kryo-4.0.2.jar:?]
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508) ~[kryo-4.0.2.jar:?]
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651) ~[kryo-4.0.2.jar:?]
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100) ~[kryo-4.0.2.jar:?]
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40) ~[kryo-4.0.2.jar:?]
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:557) ~[kryo-4.0.2.jar:?]
    at org.apache.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:38) ~[storm-client-2.5.0.jar:2.5.0]
    at org.apache.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:40) ~[storm-client-2.5.0.jar:2.5.0]
    at org.apache.storm.daemon.worker.WorkerState.checkSerialize(WorkerState.java:613) ~[storm-client-2.5.0.jar:2.5.0]
    at org.apache.storm.executor.ExecutorTransfer.tryTransferLocal(ExecutorTransfer.java:101) ~[storm-client-2.5.0.jar:2.5.0]
    at org.apache.storm.executor.ExecutorTransfer.tryTransfer(ExecutorTransfer.java:66) ~[storm-client-2.5.0.jar:2.5.0]
    at org.apache.storm.executor.LocalExecutor$1.tryTransfer(LocalExecutor.java:36) ~[storm-client-2.5.0.jar:2.5.0]
    at org.apache.storm.executor.bolt.BoltOutputCollectorImpl.boltEmit(BoltOutputCollectorImpl.java:112) ~[storm-client-2.5.0.jar:2.5.0]
    at org.apache.storm.executor.bolt.BoltOutputCollectorImpl.emit(BoltOutputCollectorImpl.java:65) ~[storm-client-2.5.0.jar:2.5.0]
    at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93) ~[storm-client-2.5.0.jar:2.5.0]
    at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93) ~[storm-client-2.5.0.jar:2.5.0]
    at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:42) ~[storm-client-2.5.0.jar:2.5.0]
    at org.apache.storm.topology.WindowedBoltExecutor.execute(WindowedBoltExecutor.java:313) ~[storm-client-2.5.0.jar:2.5.0]
    at org.apache.storm.executor.bolt.BoltExecutor.tupleActionFn(BoltExecutor.java:212) ~[storm-client-2.5.0.jar:2.5.0]
    at org.apache.storm.executor.Executor.accept(Executor.java:294) ~[storm-client-2.5.0.jar:2.5.0]
    ... 6 more

I have defined my topology as below:
 
 

public class TestTopology {
    public static void main (String[] args) throws Exception {
Config config = new Config();
config.put(Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE, true);
config.registerSerialization(TupleImpl.class);
config.registerSerialization(WorkerTopologyContext.class);
config.registerSerialization(Fields.class);
LocalCluster cluster = new LocalCluster();

try (LocalCluster.LocalTopology topology = cluster.submitTopology("testTopology", config, getTopology().createTopology())) { Thread.sleep(50000);} cluster.shutdown(); }

<span class="code-keyword">static</span> TopologyBuilder getTopology(){

TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("eventSpout", new LateEventSpout()); builder.setBolt("windowBolt", new WindowBolt().withTumblingWindow(BaseWindowedBolt.Duration.seconds(10)). withTimestampField("time"). withLateTupleStream("lateEvents")). shuffleGrouping("eventSpout"); builder.setBolt("latePrintBolt", new LatePrintBolt()). shuffleGrouping("windowBolt", "lateEvents"); builder.setBolt("printBolt", new PrintBolt()).shuffleGrouping("windowBolt"); return builder; } }

Where `LateEventSpout` is

public class LateEventSpout extends BaseRichSpout {
<span class="code-keyword">private</span> SpoutOutputCollector collector;
<span class="code-keyword">private</span> List<<span class="code-object">Long</span>> eventTimes;
<span class="code-keyword">private</span> <span class="code-object">int</span> currentTime = 0;
<span class="code-keyword">private</span> <span class="code-object">int</span> id = 1;

<span class="code-keyword">public</span> LateEventSpout () {

eventTimes = new ArrayList<>(); for (int i = 1; i<= 61; i++) { eventTimes.add(Instant.EPOCH.plusSeconds(i).toEpochMilli()); } // eventTimes = [epoch+1, epoch+2, .., epoch+61] }

@Override
<span class="code-keyword">public</span> void open(Map<<span class="code-object">String</span>, <span class="code-object">Object</span>> conf, TopologyContext context, SpoutOutputCollector collector) {

this.collector = collector; }

@Override
<span class="code-keyword">public</span> void nextTuple() {

int eventId = id++; Long eventTime = eventTimes.get(currentTime++); if (currentTime == eventTimes.size()){ currentTime = 0; } collector.emit(new Values(eventId, eventTime)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id", "time")); } }

And `WindowBolt` is:

public class WindowBolt extends BaseWindowedBolt {
OutputCollector collector;

@Override
<span class="code-keyword">public</span> void prepare(Map<<span class="code-object">String</span>, <span class="code-object">Object</span>> topoConf, TopologyContext context, OutputCollector collector){

this.collector = collector; }

@Override
<span class="code-keyword">public</span> void execute(TupleWindow inputWindow) {

int sum = 0; for (Tuple event : inputWindow.get()){ sum++; } collector.emit(new Values(inputWindow.getStartTimestamp(), inputWindow.getEndTimestamp(), sum)); }

@Override
<span class="code-keyword">public</span> void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields("start", "end", "sum")); } }

 
And `PrintBolt` just prints the `windowBolt` output. (`LatePrintBolt` is similar)
If I don't set the `LatePrintBolt` in `TopologyBuilder`, I get the correct results
 

public class PrintBolt extends BaseRichBolt {
    @Override
    public void prepare(MapString, Object> topoConf, TopologyContext context, OutputCollector collector) {
    }
@Override
<span class="code-keyword">public</span> void execute(Tuple input) {

System.out.println(String.format("Start: %d, End: %d, Sum:%d", input.getLongByField("start"), input.getLongByField("end"), input.getIntegerByField("sum"))); }

@Override
<span class="code-keyword">public</span> void declareOutputFields(OutputFieldsDeclarer declarer) {
}

}

Start: 0, End: 10000, Sum:10
Start: 10000, End: 20000, Sum:10
Start: 20000, End: 30000, Sum:10
Start: 30000, End: 40000, Sum:10
Start: 40000, End: 50000, Sum:10
Start: 50000, End: 60000, Sum:10 

 
However, when I try to print lateEvents stream, I get the same output but on the first late event, I get the above-mentioned exception.
 
I have debugged the issue. When WindowedBoltExecutor receives a late tuple, it emits the late tuple but BoltOutputCollectorImpl  rewraps it in a new Tuple. Now, this new tuple contains WorkerTopologyContext, which is not serializable, hence the error.

 


Originally reported by jawadtahir, imported from: Processing late tuples from BaseWindowedBolt results in serialization exception
  • status: Open
  • priority: Major
  • resolution: Unresolved
  • imported: 2025-01-24

jira-importer avatar Nov 10 '23 21:11 jira-importer

rzo1:

Does it also happen with 2.6.0 ? We updated Kyro.

jira-importer avatar Nov 23 '23 11:11 jira-importer

JIRAUSER302864:

Hi rzo1 ,

Thank you for your response and sorry for the late response, I have been away from my machine.  

I just checked it, and the new version does not solve the problem.

 

As far as I understood, the problem is not with serialization but with the wrong implementation of the late tuple management. The input in WindowBoltExecutor is already a Tuple. The tuple contains WorkerTopologyContext, which is not serializable (some volatile attributes). Hence, the error. In my opinion, we should change the line to 

windowedOutputCollector.emit(lateTupleStream, input, input.getValues()); 

instead of 

windowedOutputCollector.emit(lateTupleStream, input, new Values(input)); 

 

What are your thoughts on it?

 

jira-importer avatar Dec 11 '23 18:12 jira-importer

rzo1:

Sounds valid. Do you want to submit a PR with an accompying test for it?

jira-importer avatar Dec 13 '23 07:12 jira-importer

JIRAUSER302864:

Hi rzo1 ,

 

I have been trying to fix this bug for the past two days. Upon closer inspection, I found that it is a much bigger problem than just changing the parameters of the emit function.

We cannot change it to input.getValues() as we define only one output field. By design, it expects a tuple. However, a tuple can never be serialized due to some volatile attributes. Hence, lateTupleStream will only work when there is no serialization.

I think we need the input of original authors kosii and arunm on how to solve this bug.

jira-importer avatar Dec 15 '23 18:12 jira-importer

rzo1:

Given the late community health and the discussion of moving to the attic earlier this year, I doubt, that there will be much traction from the original authors. If you can think of a good solution, feel free to provide a PR or send a mail to the dev@ list to discuss a proposal in more depth.

jira-importer avatar Dec 21 '23 07:12 jira-importer