dtle icon indicating copy to clipboard operation
dtle copied to clipboard

MySQL-kafka / MySQL-MySQL full stage: stop the src / dest DTLE, all data is resent

Open asiroliu opened this issue 2 years ago • 0 comments

Description

MySQL-kafka / MySQL-MySQL full stage: stop the src / dest DTLE, all data is resent

Steps to reproduce the issue

  1. insert data on src MySQL
shell> sysbench /usr/share/sysbench/oltp_common.lua --mysql-host=172.100.9.2 --mysql-port=3306 --mysql-user=test --mysql-password=test --create_secondary=off --report-interval=10 --time=0 --mysql-db=action_db --tables=1 --table_size=100 prepare
  1. create dtle job
{
  "job_id": "kafka_kill_src_dtle_full",
  "is_password_encrypted": false,
  "task_step_name": "all",
  "failover": true,
  "retry": 2,
  "src_task": {
    "task_name": "src",
    "node_id": "7ac30686-37da-2b62-1685-851bc4715157",
    "mysql_src_task_config": {
      "gtid": "",
      "binlog_relay": false
    },
    "drop_table_if_exists": true,
    "skip_create_db_table": false,
    "repl_chan_buffer_size": 120,
    "chunk_size": 1,
    "group_max_size": 1,
    "group_timeout": 100,
    "connection_config": {
      "database_type": "MySQL",
      "host": "172.100.9.2",
      "port": 3306,
      "user": "test_src",
      "password": "test_src"
    },
    "replicate_do_db": [
      {
        "table_schema": "action_db",
        "tables": [
          {
            "table_name": "sbtest1"
          }
        ]
      }
    ]
  },
  "dest_task": {
    "task_name": "dest",
    "node_id": "cd626ca4-95de-0940-0e04-2ad6537a05a6",
    "parallel_workers": 1,
    "kafka_topic": "dtle",
    "kafka_broker_addrs": [
      "172.100.9.21:9092"
    ]
  }
}
  1. kill src dtle on full stage
shell> pgrep dtle | xargs -r -n 1 kill -9
  1. wait for src allocations is running
  2. check the old src DTLE log, sand id is 25
sc LIMIT 1" timestamp=2022-04-21T14:17:00.373+0800
2022-04-21T14:17:00.373+0800 [DEBUG] client.driver_mgr.dtle: getChunkData.: driver=dtle @module=dumper job=kafka_kill_src_dtle_full-subscription n_row=1 timestamp=2022-04-21T14:17:00.373+0800
2022-04-21T14:17:00.374+0800 [DEBUG] client.driver_mgr.dtle: GetLastMaxVal: driver=dtle @module=dumper job=kafka_kill_src_dtle_full-subscription val=['24'] timestamp=2022-04-21T14:17:00.373+0800
2022-04-21T14:17:00.374+0800 [DEBUG] client.driver_mgr.dtle: resultsChannel: driver=dtle @module=dumper job=kafka_kill_src_dtle_full-subscription n=23 timestamp=2022-04-21T14:17:00.373+0800
2022-04-21T14:17:00.374+0800 [DEBUG] client.driver_mgr.dtle: getChunkData.: driver=dtle job=kafka_kill_src_dtle_full-subscription query="SELECT * FROM `action_db`.`sbtest1` where (((`id` > '24'))) and (true) order by `id` asc LIMIT 1" @module=dumper timestamp=2022-04-21T14:17:00.373+0800
2022-04-21T14:17:00.374+0800 [DEBUG] client.driver_mgr.dtle: getChunkData.: driver=dtle job=kafka_kill_src_dtle_full-subscription n_row=1 @module=dumper timestamp=2022-04-21T14:17:00.374+0800
2022-04-21T14:17:00.374+0800 [DEBUG] client.driver_mgr.dtle: GetLastMaxVal: driver=dtle @module=dumper job=kafka_kill_src_dtle_full-subscription val=['25'] timestamp=2022-04-21T14:17:00.374+0800
2022-04-21T14:17:00.374+0800 [DEBUG] client.driver_mgr.dtle: resultsChannel: driver=dtle @module=dumper job=kafka_kill_src_dtle_full-subscription n=24 timestamp=2022-04-21T14:17:00.374+0800
2022-04-21T14:17:00.374+0800 [DEBUG] client.driver_mgr.dtle: getChunkData.: driver=dtle job=kafka_kill_src_dtle_full-subscription query="SELECT * FROM `action_db`.`sbtest1` where (((`id` > '25'))) and (true) order by `id` asc LIMIT 1" @module=dumper timestamp=2022-04-21T14:17:00.374+0800
2022-04-21T14:17:00.374+0800 [DEBUG] client.driver_mgr.dtle: getChunkData.: driver=dtle @module=dumper job=kafka_kill_src_dtle_full-subscription n_row=1 timestamp=2022-04-21T14:17:00.374+0800
2022-04-21T14:17:00.375+0800 [DEBUG] client.driver_mgr.dtle: GetLastMaxVal: driver=dtle job=kafka_kill_src_dtle_full-subscription val=['26'] @module=dumper timestamp=2022-04-21T14:17:00.374+0800
2022-04-21T14:17:00.498+0800 [DEBUG] client: updated allocations: index=19 total=1 pulled=0 filtered=1
2022-04-21T14:17:00.498+0800 [DEBUG] client: allocation updates: added=0 removed=0 updated=0 ignored=1
2022-04-21T14:17:00.498+0800 [DEBUG] client: allocation updates applied: added=0 removed=0 updated=0 ignored=1 errors=0
  1. chec the new src DTLE log, sand from id = 1
2022-04-21T14:17:42.009+0800 [INFO]  client.driver_mgr.dtle: Step: scanning contents of x tables: driver=dtle @module=dtle.extractor job=kafka_kill_src_dtle_full-subscription n=3 x=1 timestamp=2022-04-21T14:17:42.009+0800
2022-04-21T14:17:42.010+0800 [INFO]  client.driver_mgr.dtle: Step n: - scanning table (i of N tables): driver=dtle N=1 schema=action_db @module=dtle.extractor table=sbtest1 i=1 job=kafka_kill_src_dtle_full-subscription n=3 timestamp=2022-04-21T14:17:42.009+0800
2022-04-21T14:17:42.010+0800 [DEBUG] client.driver_mgr.dtle: getChunkData.: driver=dtle @module=dumper job=kafka_kill_src_dtle_full-subscription query="SELECT * FROM `action_db`.`sbtest1` where (true) and (true) order by `id` asc LIMIT 1" timestamp=2022-04-21T14:17:42.009+0800
2022-04-21T14:17:42.010+0800 [DEBUG] client.driver_mgr.dtle: getChunkData.: driver=dtle job=kafka_kill_src_dtle_full-subscription n_row=1 @module=dumper timestamp=2022-04-21T14:17:42.010+0800
2022-04-21T14:17:42.010+0800 [DEBUG] client.driver_mgr.dtle: GetLastMaxVal: driver=dtle @module=dumper job=kafka_kill_src_dtle_full-subscription val=['1'] timestamp=2022-04-21T14:17:42.010+0800
2022-04-21T14:17:42.010+0800 [DEBUG] client.driver_mgr.dtle: resultsChannel: driver=dtle @module=dumper job=kafka_kill_src_dtle_full-subscription n=0 timestamp=2022-04-21T14:17:42.010+0800
2022-04-21T14:17:42.010+0800 [DEBUG] client.driver_mgr.dtle: getChunkData.: driver=dtle job=kafka_kill_src_dtle_full-subscription query="SELECT * FROM `action_db`.`sbtest1` where (((`id` > '1'))) and (true) order by `id` asc LIMIT 1" @module=dumper timestamp=2022-04-21T14:17:42.010+0800
2022-04-21T14:17:42.010+0800 [DEBUG] client.driver_mgr.dtle: getChunkData.: driver=dtle n_row=1 @module=dumper job=kafka_kill_src_dtle_full-subscription timestamp=2022-04-21T14:17:42.010+0800
2022-04-21T14:17:42.010+0800 [DEBUG] client.driver_mgr.dtle: GetLastMaxVal: driver=dtle job=kafka_kill_src_dtle_full-subscription val=['2'] @module=dumper timestamp=2022-04-21T14:17:42.010+0800
2022-04-21T14:17:42.010+0800 [DEBUG] client.driver_mgr.dtle: resultsChannel: driver=dtle @module=dumper job=kafka_kill_src_dtle_full-subscription n=1 timestamp=2022-04-21T14:17:42.010+0800
2022-04-21T14:17:42.010+0800 [DEBUG] client.driver_mgr.dtle: getChunkData.: driver=dtle job=kafka_kill_src_dtle_full-subscription query="SELECT * FROM `action_db`.`sbtest1` where (((`id` > '2'))) and (true) order by `id` asc LIMIT 1" @module=dumper timestamp=2022-04-21T14:17:42.010+0800
2022-04-21T14:17:42.010+0800 [DEBUG] client.driver_mgr.dtle: SendMessages: driver=dtle job=kafka_kill_src_dtle_full-subscription n=1 @module=dtle.kafka timestamp=2022-04-21T14:17:42.010+0800
  1. compare kafka message with dbz kafka_kill_src_dtle_full.html.zip

Output of ./dtle version:**

9.9.9.9-master-64b421c

asiroliu avatar Apr 21 '22 06:04 asiroliu