binlog异步处理机制会导致flink异常中断时数据丢失
版本:1.12-release
canal的EventParser在每个transaction的sink后会获取当前事物的position,然后persistLogPosition持久化点位信息。
flinkx实现了canal的logPositionManager,persistLogPosition会做2个动作:
- flink状态更新:format.setEntryPosition(logPosition.getPostion())
- 本地缓存更新:logPositionCache.put(destination, logPosition);
flinkx的binlogReader的实现中,canal的sink是通过queue异步处理的,由flink的DtInputFormatSourceFunction在执行nextRecord时从queue中poll出来处理row。
那么问题来了,
如果此时server断电flink程序异常中断,format的state已经往前走,但是异步处理比较慢,还没处理完被异常中断了,
重启时读取的checkpoint的点位是后面的position,会导致有些日志数据未被处理。
遇到相同问题,可稳定复现,请问已经找到解决方式了么?
还没解决 @deadwind4 了解的话可以帮解答下?
public BinlogEventSink(BinlogInputFormat format) {
this.format = format;
this.queue = new LinkedBlockingDeque<>(1);
this.rowConverter = format.getRowConverter();
}
方式:
BinlogInputFormat#queue队列容量修改为 1
原理
- 避免
snapshotState时queue残留未处理数据,导致状态为“超前数据”位点 - 进而避免
restore时,跳过问题数据 / 未处理数据,直接从状态中某条超前数据开始处理,导致数据丢失
不足:
- 性能损耗,可能非最优解
- 可能未覆盖所有场景( 本地多次验证通过)
或者参考 KafkaConsuemr 为每条 BinlogEventRow 和 RowData 记录新增 journalName & position 字段,并在 BinlogInputFormat#nextRecordInternal 时 setEntryPosition?
请问 @deadwind4 大佬方便解答并修复么,应该是个比较核心的问题?
@WTZ468071157
社区这个问题现在有更新吗?