chunjun
chunjun copied to clipboard
mysql binlog实时同步增量数据到clickhouse时,ClickhouseOutputFormat - Writing records failed
Describe the bug 背景: 测试使用binlog-x插件读取数据源端Mysql数据表中数据,获取mysql实时增量数据,同步写入Sink目标端Clickhouse中。
执行脚本如下: ${CHUNJUN_HOME}/bin/start-chunjun -mode local -jobType sync -job /xxx_job.json -chunjunDistDir ${CHUNJUN_HOME}/
xxx_job.json内容具体如下:
{ "job" : { "content" : [ { "reader" : { "parameter" : { "jdbcUrl" : "jdbc:mysql://localhost:3306/adc?useSSL=false", "schema" : "adc", "username" : "root", "password" : "root", "host" : "localhost", "port" : 3306, "cat" : "insert,delete,update", "table" : ["white_list"] }, "name" : "binlogreader" }, "writer" : { "parameter" : { "column" : [ { "name" : "id", "index" : 0, "isPart" : false, "type" : "int64", "key" : "id" }, { "name" : "item", "index" : 1, "isPart" : false, "type" : "string", "key" : "item" } ], "preSql":["truncate table white_list"], "writeMode" : "insert", "encoding" : "utf-8", "fullColumnName" : [ "id", "item"], "fullColumnType" : [ "INT64", "STRING"], "connection" : [ { "jdbcUrl" : "jdbc:clickhouse://localhost:1234/adc", "table" : [ "white_list" ] } ], "schema": "adc", "table": [ "white_list" ], "password" : "root", "username" : "root" }, "name" : "clickhousewriter" } } ], "setting" : { "restore" : { "maxRowNumForCheckpoint" : 0, "isRestore" : false, "restoreColumnName" : "", "restoreColumnIndex" : 0 }, "errorLimit" : { "record" : 100 }, "speed" : { "bytes" : 0, "channel" : 1 } } } }
其中数据源端mysql数据表属性字段只有两列, 分别为 id:bigint; item:varchar;
clickhouser建表语句如下:
CREATE TABLE test.white_list (
id Int64,
item Nullable(String)
) ENGINE = MergeTree
ORDER BY
tuple() SETTINGS index_granularity = 8192
==============================================================
问题描述:



ERROR c.d.chunjun.connector.clickhouse.sink.ClickhouseOutputFormat - Writing records failed. com.dtstack.chunjun.throwable.NoRestartException: The dirty consumer shutdown, due to the consumed count exceed the max-consumed [0]
疑惑: clickhouse writer模块中的column该如何来表示呢?

江湖救急~~ 江湖救急 麻烦大佬解答下
I use this task script and it works. But clickhouse doesn't support delete right now, only insert.
{
"job": {
"content": [
{
"reader": {
"parameter": {
"schema": "tiezhu",
"username": "root",
"password": "admin123",
"cat": "insert,delete,update,alter,truncate,create",
"jdbcUrl": "jdbc:mysql://k3:3306/tiezhu?useSSL=false",
"host": "k3",
"port": 3306,
"table": [
"tiezhu.one"
],
"split": true,
"pavingData": true
},
"name": "binlogreader"
},
"writer": {
"parameter": {
"writeMode": "overwrite",
"encoding": "utf-8",
"connection": [
{
"jdbcUrl": "jdbc:clickhouse://172.16.21.76:8123/test",
"table": [
"*"
]
}
]
},
"name": "clickhousewriter"
},
"nameMapping": {
"schemaMapings": {
"tiezhu": "tiezhu"
},
"tableMappings": {
"tiezhu": {
"one": "one"
}
},
"fieldMappings": {
"tiezhu": {
"one": {
"id": "id",
"name": "name"
},
"two": {
"id": "id",
"name": "name"
}
}
}
}
}
],
"setting": {
"speed": {
"bytes": 0,
"channel": 1
}
}
}
}
@FlechazoW
我提一下我得疑问, 是否如此?
感谢指导~~
@berg-xu 图上的表述是正确的,binlog读取的时候可以读取多个表,DML还原的时候也可以写入多个表