flink-siddhi
flink-siddhi copied to clipboard
KryoSerializer Exception
StreamOutputHandler.receive exception when executing multiple Query Is Output<StreamRecord<R>> output thread safe?
java.lang.ArrayIndexOutOfBoundsException: -1 at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157) at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822) at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863) at com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:157) at com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21) at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:217) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667) at org.apache.flink.streaming.siddhi.operator.StreamOutputHandler.receive(StreamOutputHandler.java:70)
@zhuan77241 could you pls. share your use case, as I am unable re-produce the problem.
@haoch Sorry for delayed reply. My case is to execute multiple sql on the same operate. So I so I modified the method ExecutionSiddhiStream.cql(String executionPlan) to ExecutionSiddhiStream.cqls(Map<String, String> executionMap). The exception occurs when I execute multiple sql with grop by (sql1: select .. from xx group by srcAddress,sql2:select ... from xx group by xx and many more). When I synchronized StreamOutputHandler.receive exception disappears.
Tag v0.1.3
@zhuan77241 thanks, I would have a look and update you back once any findings.
I have encountered the same issue, and I modify code like below to solve it:
tag:0.2.0