huhao0926

Results 3 comments of huhao0926

@qingwen220 stream job the related code is here: public void emit(long windowId, List data, int channel) throws IOException { BufferBuilder outBuffer = this.buffers[channel]; for (T datum : data) { this.recordSerializer.serialize(datum,...

正常情况下,outbuffer 超过128MB 就会清空底层 buffer并重构一个新的outBuffer。这段逻辑是在哪里保证的; 传过来的 ListData 太大了。我是写的sql的方式,源数据是从kafka消费的新数据,我不是特别了解source function。 目前的本地改动的方式如下,不知道是否合理,有空您也可以帮忙指导下,感谢 for (T datum : data) { this.recordSerializer.serialize(datum, false, outBuffer); // OutOfMemory if (outBuffer.getBufferSize() >= this.maxBufferSize) { // 将判断逻辑写在这里 this.sendBuffer(channel, outBuffer, windowId);...

就是我在本地代码临时修改了下这块代码,然后部署, Kafka Source是 DSL 自带的。我修改下这个配置看下