[PipelineOutputEmitter] kryo.serialize java.lang.OutOfMemoryError
Describe the bug LocalMode,consume kafka stream, exit with OutOfMemoryError 2025-04-10 14:16:44,143 [shuffle-writer-14-Message] ERROR ComponentUncaughtExceptionHandler:30 - FATAL exception in thread: shuffle-writer-14-Message com.antgroup.geaflow.common.exception.GeaflowRuntimeException: java.lang.OutOfMemoryError at com.antgroup.geaflow.cluster.collector.PipelineOutputEmitter$EmitterTask.run(PipelineOutputEmitter.java:200) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: java.lang.OutOfMemoryError at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) at com.esotericsoftware.kryo.io.Output.flush(Output.java:185) at com.esotericsoftware.kryo.io.Output.require(Output.java:164) at com.esotericsoftware.kryo.io.Output.writeInt(Output.java:259) at com.antgroup.geaflow.dsl.runtime.traversal.data.FieldAlignEdge$FieldAlignEdgeSerializer.write(FieldAlignEdge.java:177) at com.antgroup.geaflow.dsl.runtime.traversal.data.FieldAlignEdge$FieldAlignEdgeSerializer.write(FieldAlignEdge.java:169) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651) at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100) at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575) at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651) at com.antgroup.geaflow.dsl.runtime.traversal.path.EdgeTreePath$EdgeTreePathSerializer.write(EdgeTreePath.java:172) at com.antgroup.geaflow.dsl.runtime.traversal.path.EdgeTreePath$EdgeTreePathSerializer.write(EdgeTreePath.java:166) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651) at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100) at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651) at com.antgroup.geaflow.dsl.runtime.traversal.path.VertexTreePath$VertexTreePathSerializer.write(VertexTreePath.java:163) at com.antgroup.geaflow.dsl.runtime.traversal.path.VertexTreePath$VertexTreePathSerializer.write(VertexTreePath.java:158) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651) at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100) at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651) at com.antgroup.geaflow.dsl.runtime.traversal.path.EdgeTreePath$EdgeTreePathSerializer.write(EdgeTreePath.java:171) at com.antgroup.geaflow.dsl.runtime.traversal.path.EdgeTreePath$EdgeTreePathSerializer.write(EdgeTreePath.java:166) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651) at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100) at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651) at com.antgroup.geaflow.dsl.runtime.traversal.path.VertexTreePath$VertexTreePathSerializer.write(VertexTreePath.java:163) at com.antgroup.geaflow.dsl.runtime.traversal.path.VertexTreePath$VertexTreePathSerializer.write(VertexTreePath.java:158) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651) at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100) at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651) at com.antgroup.geaflow.dsl.runtime.traversal.path.VertexTreePath$VertexTreePathSerializer.write(VertexTreePath.java:163) at com.antgroup.geaflow.dsl.runtime.traversal.path.VertexTreePath$VertexTreePathSerializer.write(VertexTreePath.java:158) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651) at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100) at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651) at com.antgroup.geaflow.dsl.runtime.traversal.path.UnionTreePath$UnionTreePathSerializer.write(UnionTreePath.java:334) at com.antgroup.geaflow.dsl.runtime.traversal.path.UnionTreePath$UnionTreePathSerializer.write(UnionTreePath.java:330) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:361) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:302) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575) at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651) at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:113) at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:39) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575) at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575) at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651) at com.antgroup.geaflow.shuffle.serialize.RecordSerializer.doSerialize(RecordSerializer.java:39) at com.antgroup.geaflow.shuffle.serialize.AbstractRecordSerializer.serialize(AbstractRecordSerializer.java:23) at com.antgroup.geaflow.shuffle.api.writer.ShardWriter.emit(ShardWriter.java:135) at com.antgroup.geaflow.shuffle.api.writer.PipelineShardWriter.emit(PipelineShardWriter.java:74) at com.antgroup.geaflow.shuffle.api.writer.PipelineWriter.emit(PipelineWriter.java:49) at com.antgroup.geaflow.cluster.collector.PipelineOutputEmitter$EmitterTask.execute(PipelineOutputEmitter.java:217) at com.antgroup.geaflow.cluster.collector.PipelineOutputEmitter$EmitterTask.run(PipelineOutputEmitter.java:196) ... 3 more
@huhao0926, which version or branch code do you use?
I'm not sure if this issue happens in a batch or stream job. The issue occurs when the output bucket buffer is more than 2GB.
@qingwen220
stream job
the related code is here: public void emit(long windowId, List data, int channel) throws IOException { BufferBuilder outBuffer = this.buffers[channel]; for (T datum : data) { this.recordSerializer.serialize(datum, false, outBuffer); // OutOfMemory } if (outBuffer.getBufferSize() >= this.maxBufferSize) { this.sendBuffer(channel, outBuffer, windowId); } }
正常情况下,outbuffer 超过128MB 就会清空底层 buffer并重构一个新的outBuffer。 这里能写到2GB,有种情况是, 传过来的 ListData 太大了。 你看看source function里, 是不是一个batch 写太多数据了。
正常情况下,outbuffer 超过128MB 就会清空底层 buffer并重构一个新的outBuffer。这段逻辑是在哪里保证的; 传过来的 ListData 太大了。我是写的sql的方式,源数据是从kafka消费的新数据,我不是特别了解source function。 目前的本地改动的方式如下,不知道是否合理,有空您也可以帮忙指导下,感谢 for (T datum : data) { this.recordSerializer.serialize(datum, false, outBuffer); // OutOfMemory if (outBuffer.getBufferSize() >= this.maxBufferSize) { // 将判断逻辑写在这里 this.sendBuffer(channel, outBuffer, windowId); } } } @qingwen220
正常情况下,outbuffer 超过128MB 就会清空底层 buffer并重构一个新的outBuffer。这段逻辑是在哪里保证的; 传过来的 ListData 太大了。我是写的sql的方式,源数据是从kafka消费的新数据,我不是特别了解source function。 目前的本地改动的方式如下,不知道是否合理,有空您也可以帮忙指导下,感谢 for (T datum : data) { this.recordSerializer.serialize(datum, false, outBuffer); // OutOfMemory if (outBuffer.getBufferSize() >= this.maxBufferSize) { // 将判断逻辑写在这里 this.sendBuffer(channel, outBuffer, windowId); } } } @qingwen220
没理解你说的本地改动是什么, 你贴出来的这段代码是 ShardWriter 里的,outBuffer.getBufferSize() >= this.maxBufferSize 这个判断就是超过阈值清空buffer, maxBufferSize默认配置128MB,你用的 Kafka Source是 DSL 自带的吗,如果是,kafka source的配置里有两个参数控制 一批次数据量: geaflow.dsl.time.window.size (基于时间窗口,单位为秒,默认是-1,表示所有时间)和 geaflow.dsl.window.size (基于条数窗口,默认是1) 。 如果配置了 geaflow.dsl.start.time 表示走时间窗口模式,需要额外加上geaflow.dsl.time.window.size配置。
就是我在本地代码临时修改了下这块代码,然后部署, Kafka Source是 DSL 自带的。我修改下这个配置看下
Hello. Are there any other questions?
Hi @huhao0926 ,
Since we haven't received any further feedback from you, we are temporarily closing this issue. If you believe the problem is still unresolved or if you have any other related questions, feel free to reopen this issue or submit a new one.
Thank you for your support of the project! 😊
Best regards ~