tugraph-analytics icon indicating copy to clipboard operation
tugraph-analytics copied to clipboard

[PipelineOutputEmitter] kryo.serialize java.lang.OutOfMemoryError

Open huhao0926 opened this issue 8 months ago • 7 comments

Describe the bug LocalMode,consume kafka stream, exit with OutOfMemoryError 2025-04-10 14:16:44,143 [shuffle-writer-14-Message] ERROR ComponentUncaughtExceptionHandler:30 - FATAL exception in thread: shuffle-writer-14-Message com.antgroup.geaflow.common.exception.GeaflowRuntimeException: java.lang.OutOfMemoryError at com.antgroup.geaflow.cluster.collector.PipelineOutputEmitter$EmitterTask.run(PipelineOutputEmitter.java:200) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: java.lang.OutOfMemoryError at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) at com.esotericsoftware.kryo.io.Output.flush(Output.java:185) at com.esotericsoftware.kryo.io.Output.require(Output.java:164) at com.esotericsoftware.kryo.io.Output.writeInt(Output.java:259) at com.antgroup.geaflow.dsl.runtime.traversal.data.FieldAlignEdge$FieldAlignEdgeSerializer.write(FieldAlignEdge.java:177) at com.antgroup.geaflow.dsl.runtime.traversal.data.FieldAlignEdge$FieldAlignEdgeSerializer.write(FieldAlignEdge.java:169) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651) at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100) at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575) at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651) at com.antgroup.geaflow.dsl.runtime.traversal.path.EdgeTreePath$EdgeTreePathSerializer.write(EdgeTreePath.java:172) at com.antgroup.geaflow.dsl.runtime.traversal.path.EdgeTreePath$EdgeTreePathSerializer.write(EdgeTreePath.java:166) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651) at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100) at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651) at com.antgroup.geaflow.dsl.runtime.traversal.path.VertexTreePath$VertexTreePathSerializer.write(VertexTreePath.java:163) at com.antgroup.geaflow.dsl.runtime.traversal.path.VertexTreePath$VertexTreePathSerializer.write(VertexTreePath.java:158) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651) at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100) at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651) at com.antgroup.geaflow.dsl.runtime.traversal.path.EdgeTreePath$EdgeTreePathSerializer.write(EdgeTreePath.java:171) at com.antgroup.geaflow.dsl.runtime.traversal.path.EdgeTreePath$EdgeTreePathSerializer.write(EdgeTreePath.java:166) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651) at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100) at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651) at com.antgroup.geaflow.dsl.runtime.traversal.path.VertexTreePath$VertexTreePathSerializer.write(VertexTreePath.java:163) at com.antgroup.geaflow.dsl.runtime.traversal.path.VertexTreePath$VertexTreePathSerializer.write(VertexTreePath.java:158) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651) at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100) at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651) at com.antgroup.geaflow.dsl.runtime.traversal.path.VertexTreePath$VertexTreePathSerializer.write(VertexTreePath.java:163) at com.antgroup.geaflow.dsl.runtime.traversal.path.VertexTreePath$VertexTreePathSerializer.write(VertexTreePath.java:158) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651) at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100) at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651) at com.antgroup.geaflow.dsl.runtime.traversal.path.UnionTreePath$UnionTreePathSerializer.write(UnionTreePath.java:334) at com.antgroup.geaflow.dsl.runtime.traversal.path.UnionTreePath$UnionTreePathSerializer.write(UnionTreePath.java:330) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:361) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:302) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575) at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651) at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:113) at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:39) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575) at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575) at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651) at com.antgroup.geaflow.shuffle.serialize.RecordSerializer.doSerialize(RecordSerializer.java:39) at com.antgroup.geaflow.shuffle.serialize.AbstractRecordSerializer.serialize(AbstractRecordSerializer.java:23) at com.antgroup.geaflow.shuffle.api.writer.ShardWriter.emit(ShardWriter.java:135) at com.antgroup.geaflow.shuffle.api.writer.PipelineShardWriter.emit(PipelineShardWriter.java:74) at com.antgroup.geaflow.shuffle.api.writer.PipelineWriter.emit(PipelineWriter.java:49) at com.antgroup.geaflow.cluster.collector.PipelineOutputEmitter$EmitterTask.execute(PipelineOutputEmitter.java:217) at com.antgroup.geaflow.cluster.collector.PipelineOutputEmitter$EmitterTask.run(PipelineOutputEmitter.java:196) ... 3 more

huhao0926 avatar Apr 11 '25 02:04 huhao0926

@huhao0926, which version or branch code do you use?

I'm not sure if this issue happens in a batch or stream job. The issue occurs when the output bucket buffer is more than 2GB.

qingwen220 avatar Apr 15 '25 03:04 qingwen220

@qingwen220

stream job

the related code is here: public void emit(long windowId, List data, int channel) throws IOException { BufferBuilder outBuffer = this.buffers[channel]; for (T datum : data) { this.recordSerializer.serialize(datum, false, outBuffer); // OutOfMemory } if (outBuffer.getBufferSize() >= this.maxBufferSize) { this.sendBuffer(channel, outBuffer, windowId); } }

huhao0926 avatar Apr 15 '25 10:04 huhao0926

正常情况下,outbuffer 超过128MB 就会清空底层 buffer并重构一个新的outBuffer。 这里能写到2GB,有种情况是, 传过来的 ListData 太大了。 你看看source function里, 是不是一个batch 写太多数据了。

qingwen220 avatar Apr 16 '25 02:04 qingwen220

正常情况下,outbuffer 超过128MB 就会清空底层 buffer并重构一个新的outBuffer。这段逻辑是在哪里保证的; 传过来的 ListData 太大了。我是写的sql的方式,源数据是从kafka消费的新数据,我不是特别了解source function。 目前的本地改动的方式如下,不知道是否合理,有空您也可以帮忙指导下,感谢 for (T datum : data) { this.recordSerializer.serialize(datum, false, outBuffer); // OutOfMemory if (outBuffer.getBufferSize() >= this.maxBufferSize) { // 将判断逻辑写在这里 this.sendBuffer(channel, outBuffer, windowId); } } } @qingwen220

huhao0926 avatar Apr 16 '25 09:04 huhao0926

正常情况下,outbuffer 超过128MB 就会清空底层 buffer并重构一个新的outBuffer。这段逻辑是在哪里保证的; 传过来的 ListData 太大了。我是写的sql的方式,源数据是从kafka消费的新数据,我不是特别了解source function。 目前的本地改动的方式如下,不知道是否合理,有空您也可以帮忙指导下,感谢 for (T datum : data) { this.recordSerializer.serialize(datum, false, outBuffer); // OutOfMemory if (outBuffer.getBufferSize() >= this.maxBufferSize) { // 将判断逻辑写在这里 this.sendBuffer(channel, outBuffer, windowId); } } } @qingwen220

没理解你说的本地改动是什么, 你贴出来的这段代码是 ShardWriter 里的,outBuffer.getBufferSize() >= this.maxBufferSize 这个判断就是超过阈值清空buffer, maxBufferSize默认配置128MB,你用的 Kafka Source是 DSL 自带的吗,如果是,kafka source的配置里有两个参数控制 一批次数据量: geaflow.dsl.time.window.size (基于时间窗口,单位为秒,默认是-1,表示所有时间)和 geaflow.dsl.window.size (基于条数窗口,默认是1) 。 如果配置了 geaflow.dsl.start.time 表示走时间窗口模式,需要额外加上geaflow.dsl.time.window.size配置。

qingwen220 avatar Apr 18 '25 07:04 qingwen220

就是我在本地代码临时修改了下这块代码,然后部署, Kafka Source是 DSL 自带的。我修改下这个配置看下

huhao0926 avatar Apr 19 '25 07:04 huhao0926

Hello. Are there any other questions?

Loognqiang avatar Apr 21 '25 02:04 Loognqiang

Hi @huhao0926 ,

Since we haven't received any further feedback from you, we are temporarily closing this issue. If you believe the problem is still unresolved or if you have any other related questions, feel free to reopen this issue or submit a new one.

Thank you for your support of the project! 😊

Best regards ~

Loognqiang avatar May 13 '25 03:05 Loognqiang