chunjun icon indicating copy to clipboard operation
chunjun copied to clipboard

binlog异步处理机制会导致flink异常中断时数据丢失

Open geosmart opened this issue 4 years ago • 6 comments

版本:1.12-release

canal的EventParser在每个transaction的sink后会获取当前事物的position,然后persistLogPosition持久化点位信息。 flinkx实现了canal的logPositionManager,persistLogPosition会做2个动作:

  1. flink状态更新:format.setEntryPosition(logPosition.getPostion())
  2. 本地缓存更新:logPositionCache.put(destination, logPosition);

flinkx的binlogReader的实现中,canal的sink是通过queue异步处理的,由flink的DtInputFormatSourceFunction在执行nextRecord时从queue中poll出来处理row。 那么问题来了, 如果此时server断电flink程序异常中断,format的state已经往前走,但是异步处理比较慢,还没处理完被异常中断了, 重启时读取的checkpoint的点位是后面的position,会导致有些日志数据未被处理。

geosmart avatar Nov 05 '21 02:11 geosmart

遇到相同问题,可稳定复现,请问已经找到解决方式了么?

ovictorain avatar Dec 27 '21 11:12 ovictorain

还没解决 @deadwind4 了解的话可以帮解答下?

geosmart avatar Dec 27 '21 12:12 geosmart

    public BinlogEventSink(BinlogInputFormat format) {
        this.format = format;
        this.queue = new LinkedBlockingDeque<>(1);
        this.rowConverter = format.getRowConverter();
    }

方式:

  • BinlogInputFormat#queue 队列容量修改为 1

原理

  • 避免 snapshotStatequeue 残留未处理数据,导致状态为“超前数据”位点
  • 进而避免 restore 时,跳过问题数据 / 未处理数据,直接从状态中某条超前数据开始处理,导致数据丢失

不足:

  • 性能损耗,可能非最优解
  • 可能未覆盖所有场景( 本地多次验证通过)

ovictorain avatar Dec 28 '21 01:12 ovictorain

或者参考 KafkaConsuemr 为每条 BinlogEventRowRowData 记录新增 journalName & position 字段,并在 BinlogInputFormat#nextRecordInternalsetEntryPosition

请问 @deadwind4 大佬方便解答并修复么,应该是个比较核心的问题?

ovictorain avatar Dec 29 '21 02:12 ovictorain

@WTZ468071157

a49a avatar Dec 29 '21 02:12 a49a

社区这个问题现在有更新吗?

geosmart avatar Jul 20 '22 04:07 geosmart