dtle
dtle copied to clipboard
MySQL-kafka / MySQL-MySQL full stage: stop the src / dest DTLE, all data is resent
Description
MySQL-kafka / MySQL-MySQL full stage: stop the src / dest DTLE, all data is resent
Steps to reproduce the issue
- insert data on src MySQL
shell> sysbench /usr/share/sysbench/oltp_common.lua --mysql-host=172.100.9.2 --mysql-port=3306 --mysql-user=test --mysql-password=test --create_secondary=off --report-interval=10 --time=0 --mysql-db=action_db --tables=1 --table_size=100 prepare
- create dtle job
{
"job_id": "kafka_kill_src_dtle_full",
"is_password_encrypted": false,
"task_step_name": "all",
"failover": true,
"retry": 2,
"src_task": {
"task_name": "src",
"node_id": "7ac30686-37da-2b62-1685-851bc4715157",
"mysql_src_task_config": {
"gtid": "",
"binlog_relay": false
},
"drop_table_if_exists": true,
"skip_create_db_table": false,
"repl_chan_buffer_size": 120,
"chunk_size": 1,
"group_max_size": 1,
"group_timeout": 100,
"connection_config": {
"database_type": "MySQL",
"host": "172.100.9.2",
"port": 3306,
"user": "test_src",
"password": "test_src"
},
"replicate_do_db": [
{
"table_schema": "action_db",
"tables": [
{
"table_name": "sbtest1"
}
]
}
]
},
"dest_task": {
"task_name": "dest",
"node_id": "cd626ca4-95de-0940-0e04-2ad6537a05a6",
"parallel_workers": 1,
"kafka_topic": "dtle",
"kafka_broker_addrs": [
"172.100.9.21:9092"
]
}
}
- kill src dtle on full stage
shell> pgrep dtle | xargs -r -n 1 kill -9
- wait for src allocations is running
- check the old src DTLE log, sand id is 25
sc LIMIT 1" timestamp=2022-04-21T14:17:00.373+0800
2022-04-21T14:17:00.373+0800 [DEBUG] client.driver_mgr.dtle: getChunkData.: driver=dtle @module=dumper job=kafka_kill_src_dtle_full-subscription n_row=1 timestamp=2022-04-21T14:17:00.373+0800
2022-04-21T14:17:00.374+0800 [DEBUG] client.driver_mgr.dtle: GetLastMaxVal: driver=dtle @module=dumper job=kafka_kill_src_dtle_full-subscription val=['24'] timestamp=2022-04-21T14:17:00.373+0800
2022-04-21T14:17:00.374+0800 [DEBUG] client.driver_mgr.dtle: resultsChannel: driver=dtle @module=dumper job=kafka_kill_src_dtle_full-subscription n=23 timestamp=2022-04-21T14:17:00.373+0800
2022-04-21T14:17:00.374+0800 [DEBUG] client.driver_mgr.dtle: getChunkData.: driver=dtle job=kafka_kill_src_dtle_full-subscription query="SELECT * FROM `action_db`.`sbtest1` where (((`id` > '24'))) and (true) order by `id` asc LIMIT 1" @module=dumper timestamp=2022-04-21T14:17:00.373+0800
2022-04-21T14:17:00.374+0800 [DEBUG] client.driver_mgr.dtle: getChunkData.: driver=dtle job=kafka_kill_src_dtle_full-subscription n_row=1 @module=dumper timestamp=2022-04-21T14:17:00.374+0800
2022-04-21T14:17:00.374+0800 [DEBUG] client.driver_mgr.dtle: GetLastMaxVal: driver=dtle @module=dumper job=kafka_kill_src_dtle_full-subscription val=['25'] timestamp=2022-04-21T14:17:00.374+0800
2022-04-21T14:17:00.374+0800 [DEBUG] client.driver_mgr.dtle: resultsChannel: driver=dtle @module=dumper job=kafka_kill_src_dtle_full-subscription n=24 timestamp=2022-04-21T14:17:00.374+0800
2022-04-21T14:17:00.374+0800 [DEBUG] client.driver_mgr.dtle: getChunkData.: driver=dtle job=kafka_kill_src_dtle_full-subscription query="SELECT * FROM `action_db`.`sbtest1` where (((`id` > '25'))) and (true) order by `id` asc LIMIT 1" @module=dumper timestamp=2022-04-21T14:17:00.374+0800
2022-04-21T14:17:00.374+0800 [DEBUG] client.driver_mgr.dtle: getChunkData.: driver=dtle @module=dumper job=kafka_kill_src_dtle_full-subscription n_row=1 timestamp=2022-04-21T14:17:00.374+0800
2022-04-21T14:17:00.375+0800 [DEBUG] client.driver_mgr.dtle: GetLastMaxVal: driver=dtle job=kafka_kill_src_dtle_full-subscription val=['26'] @module=dumper timestamp=2022-04-21T14:17:00.374+0800
2022-04-21T14:17:00.498+0800 [DEBUG] client: updated allocations: index=19 total=1 pulled=0 filtered=1
2022-04-21T14:17:00.498+0800 [DEBUG] client: allocation updates: added=0 removed=0 updated=0 ignored=1
2022-04-21T14:17:00.498+0800 [DEBUG] client: allocation updates applied: added=0 removed=0 updated=0 ignored=1 errors=0
- chec the new src DTLE log, sand from id = 1
2022-04-21T14:17:42.009+0800 [INFO] client.driver_mgr.dtle: Step: scanning contents of x tables: driver=dtle @module=dtle.extractor job=kafka_kill_src_dtle_full-subscription n=3 x=1 timestamp=2022-04-21T14:17:42.009+0800
2022-04-21T14:17:42.010+0800 [INFO] client.driver_mgr.dtle: Step n: - scanning table (i of N tables): driver=dtle N=1 schema=action_db @module=dtle.extractor table=sbtest1 i=1 job=kafka_kill_src_dtle_full-subscription n=3 timestamp=2022-04-21T14:17:42.009+0800
2022-04-21T14:17:42.010+0800 [DEBUG] client.driver_mgr.dtle: getChunkData.: driver=dtle @module=dumper job=kafka_kill_src_dtle_full-subscription query="SELECT * FROM `action_db`.`sbtest1` where (true) and (true) order by `id` asc LIMIT 1" timestamp=2022-04-21T14:17:42.009+0800
2022-04-21T14:17:42.010+0800 [DEBUG] client.driver_mgr.dtle: getChunkData.: driver=dtle job=kafka_kill_src_dtle_full-subscription n_row=1 @module=dumper timestamp=2022-04-21T14:17:42.010+0800
2022-04-21T14:17:42.010+0800 [DEBUG] client.driver_mgr.dtle: GetLastMaxVal: driver=dtle @module=dumper job=kafka_kill_src_dtle_full-subscription val=['1'] timestamp=2022-04-21T14:17:42.010+0800
2022-04-21T14:17:42.010+0800 [DEBUG] client.driver_mgr.dtle: resultsChannel: driver=dtle @module=dumper job=kafka_kill_src_dtle_full-subscription n=0 timestamp=2022-04-21T14:17:42.010+0800
2022-04-21T14:17:42.010+0800 [DEBUG] client.driver_mgr.dtle: getChunkData.: driver=dtle job=kafka_kill_src_dtle_full-subscription query="SELECT * FROM `action_db`.`sbtest1` where (((`id` > '1'))) and (true) order by `id` asc LIMIT 1" @module=dumper timestamp=2022-04-21T14:17:42.010+0800
2022-04-21T14:17:42.010+0800 [DEBUG] client.driver_mgr.dtle: getChunkData.: driver=dtle n_row=1 @module=dumper job=kafka_kill_src_dtle_full-subscription timestamp=2022-04-21T14:17:42.010+0800
2022-04-21T14:17:42.010+0800 [DEBUG] client.driver_mgr.dtle: GetLastMaxVal: driver=dtle job=kafka_kill_src_dtle_full-subscription val=['2'] @module=dumper timestamp=2022-04-21T14:17:42.010+0800
2022-04-21T14:17:42.010+0800 [DEBUG] client.driver_mgr.dtle: resultsChannel: driver=dtle @module=dumper job=kafka_kill_src_dtle_full-subscription n=1 timestamp=2022-04-21T14:17:42.010+0800
2022-04-21T14:17:42.010+0800 [DEBUG] client.driver_mgr.dtle: getChunkData.: driver=dtle job=kafka_kill_src_dtle_full-subscription query="SELECT * FROM `action_db`.`sbtest1` where (((`id` > '2'))) and (true) order by `id` asc LIMIT 1" @module=dumper timestamp=2022-04-21T14:17:42.010+0800
2022-04-21T14:17:42.010+0800 [DEBUG] client.driver_mgr.dtle: SendMessages: driver=dtle job=kafka_kill_src_dtle_full-subscription n=1 @module=dtle.kafka timestamp=2022-04-21T14:17:42.010+0800
- compare kafka message with dbz kafka_kill_src_dtle_full.html.zip
Output of ./dtle version
:**
9.9.9.9-master-64b421c