chunjun icon indicating copy to clipboard operation
chunjun copied to clipboard

Mysql Binlog数据同步基于 position 起始点不生效

Open huangfuProject opened this issue 2 years ago • 4 comments

Search before asking

  • [X] I had searched in the issues and found no similar issues.

What happened

当我想采用 binlog文件名 + 索引位 消费数据的数据,配置不生效,是我的配置有问题吗?目前官网文档不全

我的配置信息如下:

{
    "job": {
        "content": [
            {
                "reader": {
                    "parameter": {
                        "schema": "test_data",
                        "ddlSkip": true,
                        "start": {
                            "journal-name":"mysql-bin.000013",
                            "position":61507
                        },
                        "column": [
                            {
                                "name": "address",
                                "type": "varchar"
                            },
                            {
                                "name": "age",
                                "type": "int"
                            },
                            {
                                "name": "email",
                                "type": "varchar"
                            },
                            {
                                "name": "id",
                                "type": "varchar"
                            },
                            {
                                "name": "id_card",
                                "type": "varchar"
                            },
                            {
                                "name": "name",
                                "type": "varchar"
                            },
                            {
                                "name": "sex",
                                "type": "varchar"
                            },
                            {
                                "name": "student",
                                "type": "varchar"
                            },
                            {
                                "name": "type",
                                "type": "char"
                            }
                        ],
                        "pavingData": false,
                        "password": "123456",
                        "database": "test_data",
                        "splitUpdate": false,
                        "port": "3306",
                        "cat": "insert,update,delete",
                        "host": "10.0.103.252",
                        "jdbcUrl": "jdbc:mysql://10.0.103.252:3306/test_data?characterEncoding=utf8&useUnicode=true&useSSL=false",
                        "table": [
                            "test_data_min_100"
                        ],
                        "username": "root"
                    },
                    "name": "binlogreader"
                },
                "writer": {
                    "parameter": {
                        "producerSettings": {
                            "retries": "3",
                            "request.timeout.ms": "60000",
                            "batch.size": "16384",
                            "acks": "0",
                            "bootstrap.servers": "hadoop1:9092,hadoop2:9092,hadoop3:9092",
                            "buffer.memory": "33554432"
                        },
                        "topic": "duan_100"
                    },
                    "name": "kafkawriter"
                }
            }
        ],
        "setting": {
            "speed": {
                "bytes": 1024
            }
        }
    }
}

可以确认的是,这个binlog文件和索引是真实存在的,我们看一条模拟数据:

{
    "message": {
        "schema": "test_data",
        "database": null,
        "opTime": "1687140107000",
        "before": {
            
        },
        "lsn": "mysql-bin.000013/000000000000061507",
        "after": {
            "id": "Nh9EunM7kt",
            "name": "Rosa Morgan",
            "age": "503",
            "sex": "F",
            "address": "582 Grape Street",
            "email": "[email protected]",
            "id_card": "KsGQDpNgxF",
            "student": "qa99aoQt98",
            "type": "T"
        },
        "type": "INSERT",
        "table": "test_data_min_100",
        "ts": 7076390319717224464
    }
}

"lsn": "mysql-bin.000013/000000000000061507", 这个信息应该记录的是该数据所在binlog文件的具体位置,现在想实现的功能是,从当前这个数据作为起始点,向下消费!目前配置不生效

What you expected to happen

期望可以根据

journal-name:文件名,采集起点从指定文件的起始处消费;
position:文件的指定位置,采集起点从指定文件的指定位置处消费

进行消费数据!

How to reproduce

json文件如下:

{
    "job": {
        "content": [
            {
                "reader": {
                    "parameter": {
                        "schema": "test_data",
                        "ddlSkip": true,
                        "start": {
                            "journal-name":"mysql-bin.000013",
                            "position":61507
                        },
                        "column": [
                            {
                                "name": "address",
                                "type": "varchar"
                            },
                            {
                                "name": "age",
                                "type": "int"
                            },
                            {
                                "name": "email",
                                "type": "varchar"
                            },
                            {
                                "name": "id",
                                "type": "varchar"
                            },
                            {
                                "name": "id_card",
                                "type": "varchar"
                            },
                            {
                                "name": "name",
                                "type": "varchar"
                            },
                            {
                                "name": "sex",
                                "type": "varchar"
                            },
                            {
                                "name": "student",
                                "type": "varchar"
                            },
                            {
                                "name": "type",
                                "type": "char"
                            }
                        ],
                        "pavingData": false,
                        "password": "123456",
                        "database": "test_data",
                        "splitUpdate": false,
                        "port": "3306",
                        "cat": "insert,update,delete",
                        "host": "10.0.103.252",
                        "jdbcUrl": "jdbc:mysql://10.0.103.252:3306/test_data?characterEncoding=utf8&useUnicode=true&useSSL=false",
                        "table": [
                            "test_data_min_100"
                        ],
                        "username": "root"
                    },
                    "name": "binlogreader"
                },
                "writer": {
                    "parameter": {
                        "producerSettings": {
                            "retries": "3",
                            "request.timeout.ms": "60000",
                            "batch.size": "16384",
                            "acks": "0",
                            "bootstrap.servers": "hadoop1:9092,hadoop2:9092,hadoop3:9092",
                            "buffer.memory": "33554432"
                        },
                        "topic": "duan_100"
                    },
                    "name": "kafkawriter"
                }
            }
        ],
        "setting": {
            "speed": {
                "bytes": 1024
            }
        }
    }
}


Anything else

No response

Version

1.12_release

Are you willing to submit PR?

  • [X] Yes I am willing to submit a PR!

Code of Conduct

huangfuProject avatar Jun 19 '23 02:06 huangfuProject

顶一下,麻烦开发者看到能够尽快回复下!感谢

huangfuProject avatar Jun 19 '23 03:06 huangfuProject

补充下,他有这个错误信息不知道有用没:

2023-06-19 16:46:04,410 - 62971 ERROR [destination = example , address = /10.0.103.252:3306 , EventParser] com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3:dump address /10.0.103.252:3306 has an error, retrying. caused by 
com.alibaba.otter.canal.parse.exception.CanalParseException: com.taobao.tddl.dbsync.binlog.exception.TableIdNotFoundException: not found tableId:144
Caused by: com.taobao.tddl.dbsync.binlog.exception.TableIdNotFoundException: not found tableId:144
2023-06-19 16:46:04,417 - 62978 ERROR [destination = example , address = /10.0.103.252:3306 , EventParser] com.dtstack.chunjun.connector.binlog.listener.BinlogAlarmHandler:destination:example[com.alibaba.otter.canal.parse.exception.CanalParseException: com.taobao.tddl.dbsync.binlog.exception.TableIdNotFoundException: not found tableId:144

huangfuProject avatar Jun 19 '23 09:06 huangfuProject

我来了 麻烦开发者看到能够尽快回复下 感谢! 我自己好像解决不了了。。。。

huangfuProject avatar Jun 20 '23 02:06 huangfuProject

我在源码中看到: image

我不知道是不是这个原因,我新建一个事务,然后从新建事务那条数据作为起始点进行消费,也不生效

huangfuProject avatar Jun 20 '23 02:06 huangfuProject