chunjun
chunjun copied to clipboard
Mysql Binlog数据同步基于 position 起始点不生效
Search before asking
- [X] I had searched in the issues and found no similar issues.
What happened
当我想采用 binlog文件名 + 索引位 消费数据的数据,配置不生效,是我的配置有问题吗?目前官网文档不全
我的配置信息如下:
{
"job": {
"content": [
{
"reader": {
"parameter": {
"schema": "test_data",
"ddlSkip": true,
"start": {
"journal-name":"mysql-bin.000013",
"position":61507
},
"column": [
{
"name": "address",
"type": "varchar"
},
{
"name": "age",
"type": "int"
},
{
"name": "email",
"type": "varchar"
},
{
"name": "id",
"type": "varchar"
},
{
"name": "id_card",
"type": "varchar"
},
{
"name": "name",
"type": "varchar"
},
{
"name": "sex",
"type": "varchar"
},
{
"name": "student",
"type": "varchar"
},
{
"name": "type",
"type": "char"
}
],
"pavingData": false,
"password": "123456",
"database": "test_data",
"splitUpdate": false,
"port": "3306",
"cat": "insert,update,delete",
"host": "10.0.103.252",
"jdbcUrl": "jdbc:mysql://10.0.103.252:3306/test_data?characterEncoding=utf8&useUnicode=true&useSSL=false",
"table": [
"test_data_min_100"
],
"username": "root"
},
"name": "binlogreader"
},
"writer": {
"parameter": {
"producerSettings": {
"retries": "3",
"request.timeout.ms": "60000",
"batch.size": "16384",
"acks": "0",
"bootstrap.servers": "hadoop1:9092,hadoop2:9092,hadoop3:9092",
"buffer.memory": "33554432"
},
"topic": "duan_100"
},
"name": "kafkawriter"
}
}
],
"setting": {
"speed": {
"bytes": 1024
}
}
}
}
可以确认的是,这个binlog文件和索引是真实存在的,我们看一条模拟数据:
{
"message": {
"schema": "test_data",
"database": null,
"opTime": "1687140107000",
"before": {
},
"lsn": "mysql-bin.000013/000000000000061507",
"after": {
"id": "Nh9EunM7kt",
"name": "Rosa Morgan",
"age": "503",
"sex": "F",
"address": "582 Grape Street",
"email": "[email protected]",
"id_card": "KsGQDpNgxF",
"student": "qa99aoQt98",
"type": "T"
},
"type": "INSERT",
"table": "test_data_min_100",
"ts": 7076390319717224464
}
}
"lsn": "mysql-bin.000013/000000000000061507", 这个信息应该记录的是该数据所在binlog文件的具体位置,现在想实现的功能是,从当前这个数据作为起始点,向下消费!目前配置不生效
What you expected to happen
期望可以根据
journal-name:文件名,采集起点从指定文件的起始处消费;
position:文件的指定位置,采集起点从指定文件的指定位置处消费
进行消费数据!
How to reproduce
json文件如下:
{
"job": {
"content": [
{
"reader": {
"parameter": {
"schema": "test_data",
"ddlSkip": true,
"start": {
"journal-name":"mysql-bin.000013",
"position":61507
},
"column": [
{
"name": "address",
"type": "varchar"
},
{
"name": "age",
"type": "int"
},
{
"name": "email",
"type": "varchar"
},
{
"name": "id",
"type": "varchar"
},
{
"name": "id_card",
"type": "varchar"
},
{
"name": "name",
"type": "varchar"
},
{
"name": "sex",
"type": "varchar"
},
{
"name": "student",
"type": "varchar"
},
{
"name": "type",
"type": "char"
}
],
"pavingData": false,
"password": "123456",
"database": "test_data",
"splitUpdate": false,
"port": "3306",
"cat": "insert,update,delete",
"host": "10.0.103.252",
"jdbcUrl": "jdbc:mysql://10.0.103.252:3306/test_data?characterEncoding=utf8&useUnicode=true&useSSL=false",
"table": [
"test_data_min_100"
],
"username": "root"
},
"name": "binlogreader"
},
"writer": {
"parameter": {
"producerSettings": {
"retries": "3",
"request.timeout.ms": "60000",
"batch.size": "16384",
"acks": "0",
"bootstrap.servers": "hadoop1:9092,hadoop2:9092,hadoop3:9092",
"buffer.memory": "33554432"
},
"topic": "duan_100"
},
"name": "kafkawriter"
}
}
],
"setting": {
"speed": {
"bytes": 1024
}
}
}
}
Anything else
No response
Version
1.12_release
Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
顶一下,麻烦开发者看到能够尽快回复下!感谢
补充下,他有这个错误信息不知道有用没:
2023-06-19 16:46:04,410 - 62971 ERROR [destination = example , address = /10.0.103.252:3306 , EventParser] com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3:dump address /10.0.103.252:3306 has an error, retrying. caused by
com.alibaba.otter.canal.parse.exception.CanalParseException: com.taobao.tddl.dbsync.binlog.exception.TableIdNotFoundException: not found tableId:144
Caused by: com.taobao.tddl.dbsync.binlog.exception.TableIdNotFoundException: not found tableId:144
2023-06-19 16:46:04,417 - 62978 ERROR [destination = example , address = /10.0.103.252:3306 , EventParser] com.dtstack.chunjun.connector.binlog.listener.BinlogAlarmHandler:destination:example[com.alibaba.otter.canal.parse.exception.CanalParseException: com.taobao.tddl.dbsync.binlog.exception.TableIdNotFoundException: not found tableId:144
我来了 麻烦开发者看到能够尽快回复下 感谢! 我自己好像解决不了了。。。。
我在源码中看到:
我不知道是不是这个原因,我新建一个事务,然后从新建事务那条数据作为起始点进行消费,也不生效