chunjun icon indicating copy to clipboard operation
chunjun copied to clipboard

[Feature][ClickHouse] Clickhouse support update sql

Open FlechazoW opened this issue 3 years ago • 5 comments
trafficstars

Search before asking

  • [X] I had searched in the issues and found no similar feature requirement.

Description

当尝试在mysql数据表中更新修改某条或者删除某条数据记录的时候, 出现上述错误。

跟踪源码发现, 当实现增量实时数据同步作业时候,根据rowKind类型的值执行相关操作, 而clickhouse的 DELETE 与UPDATE_BEFORE 具体实现代码为空。 具体如下:

image

image

image

在此基础上,针对delete操作,添加了相关具体实现,但是对Flink相关语义以及数据原子性先关方面理解缺乏,还需要好好补充,希望大佬解惑, 指个方向, 具体如何更新操作。

Use case

No response

Related issues

No response

Are you willing to submit a PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

FlechazoW avatar Jul 09 '22 23:07 FlechazoW

@FlechazoW chunjun基于flink来执行作业, 比如mysql binlog source ==>> sink clickhouse中。 flink这边是通过什么方式来获取binlog数据的? 我尝试根据flink的rowkind类型扩展了基础的delete的实现,但是不清楚方向是否是正确的。我尝试扩展下update与delete操作的数据同步,接下来该如何操作?

berg-xu avatar Jul 11 '22 01:07 berg-xu

@FlechazoW chunjun基于flink来执行作业, 比如mysql binlog source ==>> sink clickhouse中。 flink这边是通过什么方式来获取binlog数据的? 我尝试根据flink的rowkind类型扩展了基础的delete的实现,但是不清楚方向是否是正确的。我尝试扩展下update与delete操作的数据同步,接下来该如何操作?

纯钧binlog插件是通过canal采集cdc数据的,但是对于下游来说,不需要关心上游获取数据的实现。例如,clickhouse 要实现delete / update 语法,只需要实现对应的方言ClickhouseDialect 即可。

FlechazoW avatar Jul 11 '22 01:07 FlechazoW

@FlechazoW 纯钧binlog插件是通过canal采集cdc数据的,但是对于下游来说,不需要关心上游获取数据的实现。例如,clickhouse 要实现delete / update 语法,只需要实现对应的方言ClickhouseDialect 即可。

我大致通读了一遍基于canal获取binlog数据sink到clickhouse的具体实现。还是很多不太理解的地方。 然后我抛弃了这些想法。我直接考虑了实现对应的方言ClickhouseDialect。 考虑到Flink中Rowkind 语义, 如下所示: image

然后在此基础上考虑了两类实现方案, 第一种方案是,针对不同的语义完成具体的语义实现,但是涉及到数据更新update操作时候,在语义上,存在更新前UPDATE_BEFORE与更新后UPDATE_AFTER的操作,个人实现上, 需要确保该更新表中存在唯一的主键标识,基于该标识确保数据更新变化成功。但是很遗憾,目前我没有办法确定fieldNames列表中哪个字段是唯一的主键,来确保幂等性的数据更新。 当然在非幂等性的情况下可以使用。

第二种方案, 较为简单粗暴。已实现对应的方言ClickhouseDialect,测试可以完成源数据Update/Delete更新操作后,目标端同步更新。 具体过程为在更新数据前先删除delete现有的数据记录(update_before),再添加insert更新后(update_after)的数据。 重点是update操作,因为已经记录了数据更新前后的数据变化值。 每次更新update操作后,直接删除delete目标端数据,然后在添加insert一条更新后update_after的数据记录。即delete与update_before绑定执行, insert与update_after绑定执行。

我不清楚第二种方案是否可取? 或者还有什么更好的方案实现? 求指导指导~~

berg-xu avatar Jul 12 '22 09:07 berg-xu

目前在其他数据源的实现是方案二,你可以看下PreparedStmtProxy这个类,通过split参数,将update拆分成update_before 和update_after,对于update_before则采用delete,update_after则采用insert,与你的第二种方案类似。

FlechazoW avatar Jul 12 '22 09:07 FlechazoW

@FlechazoW 好的,目前该问题没疑问了。

berg-xu avatar Jul 12 '22 09:07 berg-xu