【增量同步】 delta-plus性能优化记录
-
增加Bloom过滤器加速delta读写 Delta Lake 现阶段判定一条数据是不是已经存在是比较暴力的,直接做一次全表join(如果有分区会好点)。这样可以得到待更新记录所在的文件路径。在特定场景(表数据基数很大的情况),这个开销会非常大。一个直观的解决方案是:给每个文件加上布隆过滤器,对每份落地的delta文件生成一个BloomFilter对象记录每条数据的bit位,我们将这类索引信息存储在table path的_bf_index_目录下,当每批数据进来时,先和索引数据进行比较过滤出touch的文件。这样比较优秀的情况下只会touch到某几个文件会大大减少join的数据量。查看:#11
-
优化broadcast:减少广播的数据 一开始我们将索引数据(bloom对象)广播到了所有Executor端,然后每批次foreach进来的数据进行判断记录touch到了哪些文件。这样做不太合理,表数据比较大的时候,广播的数据就会很大,内存比较紧张。解决方案:广播当前批次的数据(每批次数据量总是不大的)。查看:3787cde
-
加入RepartitionByRange操作配合Bloom过滤器使用 这个比较适用一些固定的场景,比如你的业务数据是日志型的类似订单记录:基本上只更新最近的记录或者新增。加入partitionByRange会很好的调整数据分布,使用bloom过滤器时,理想状态只会touch到最近的几个文件,能加快delta的查询和写入。查看:d5ba007
-
控制新文件数与删除文件数一致 将批次数据和touch文件没受影响记录union后一起落地,控制新的文件数与删除的文件数一致。查看:c17d36b
-
解决delta并发commit导致批次失败【重试】 如果同步流程和清理流程(清理历史版本)同时commit操作,会冲突导致批次失败。加入重试机制。查看:4fa7c85
-
调整数据分区优化序列化速度 如果是binlog数据源拿到的DataFrame只有一个分区,delta在更新之前需要把binlog的json数据deserialize,当批次内有大数据量时,单线程进行反序列化速度肯定是不够的,导致同步流程产生瓶颈。解决方案:repartition数据,增加处理线程数。查看:b8c09e7