dtle icon indicating copy to clipboard operation
dtle copied to clipboard

MySQL-kafka: wrong value for data type timestamp

Open asiroliu opened this issue 2 years ago • 3 comments

Description

MySQL-kafka: wrong value for data type timestamp

Steps to reproduce the issue

  1. check src MySQL time_zone
mysql> SELECT @@GLOBAL.time_zone, @@SESSION.time_zone;
+--------------------+---------------------+
| @@GLOBAL.time_zone | @@SESSION.time_zone |
+--------------------+---------------------+
| SYSTEM             | SYSTEM              |
+--------------------+---------------------+
1 row in set (0.00 sec)
shell> date
2022年 04月 20日 星期三 13:27:43 CST
  1. insert data on src MySQL
mysql> CREATE DATABASE IF NOT EXISTS `action_db`;
mysql> create table timestamp_columns( id int(11) not null primary key, c_timestamp '
 'timestamp DEFAULT CURRENT_TIMESTAMP)ENGINE=InnoDB DEFAULT CHARSET=utf8;
mysql> insert into timestamp_columns values (0,'1970-01-01 08:00:01');
mysql> insert into timestamp_columns values (1,'2038-01-19 11:14:07');
mysql> insert into timestamp_columns values (2,'2015-08-12 14:57:05');
  1. create dtle job
{
  "job_id": "data_types_default_full_timestamp_columns",
  "is_password_encrypted": false,
  "task_step_name": "all",
  "failover": true,
  "retry": 2,
  "src_task": {
    "task_name": "src",
    "node_id": "1981dac8-c247-d804-f2b0-2e27d74c40ad",
    "mysql_src_task_config": {
      "gtid": "",
      "binlog_relay": false
    },
    "drop_table_if_exists": true,
    "skip_create_db_table": false,
    "repl_chan_buffer_size": 120,
    "chunk_size": 2000,
    "group_max_size": 1,
    "group_timeout": 100,
    "connection_config": {
      "database_type": "MySQL",
      "host": "172.100.9.1",
      "port": 3306,
      "user": "test_src",
      "password": "test_src"
    },
    "replicate_do_db": [
      {
        "table_schema": "action_db",
        "tables": [
          {
            "table_name": "timestamp_columns"
          }
        ]
      }
    ]
  },
  "dest_task": {
    "task_name": "dest",
    "node_id": "3ffe60b0-8a48-32c1-28d6-473c89792fcf",
    "parallel_workers": 1,
    "kafka_topic": "dtle",
    "kafka_broker_addrs": [
      "172.100.9.21:9092"
    ]
  }
}
  1. get kafka message, and compare with dbz image
  2. check src MySQL binlog image image

Describe the results you expected

  1. 对于TIMESTAMP,它把客户端插入的时间从当前时区转化为UTC(世界标准时间)进行存储。查询时,将其又转化为客户端当前时区进行返回。
  2. DTLE发送kafka消息时应直接发送UTC时间,而不应做时区转换,因为DTLE不知道消费端的时区。

Output of ./dtle version:**

9.9.9.9-master-a269c29

asiroliu avatar Apr 20 '22 05:04 asiroliu

测试:

  1. 将每个服务器设置为不同的时区:
    src dtle
timedatectl set-timezone UTC

dest dest

timedatectl set-timezone Europe/London

src mysql

timedatectl set-timezone Europe/Monaco

dest mysql

timedatectl set-timezone America/Los_Angeles
# 开启目标端general log
set global general_log = ON;
set global general_log_file = '/tmp/general.log';
  1. 源端插入数据
create table timestamp_columns( id int(11) not null primary key, c_timestamp timestamp)ENGINE=InnoDB DEFAULT CHARSET=utf8;
insert into timestamp_columns values (0,'1970-01-01 08:00:01');
insert into timestamp_columns values (1,'2037-01-19 11:14:07');
insert into timestamp_columns values (2,'2015-08-12 14:57:05');
insert into timestamp_columns values (3,now());
insert into timestamp_columns values (4,CURRENT_TIMESTAMP);
insert into timestamp_columns values (5,NULL);
  1. 创建dtle job
  2. 检查目标端mysql general log
2022-04-25T05:54:08.799929Z        13 Query     START TRANSACTION
2022-04-25T05:54:08.800110Z        13 Query     set @@session.foreign_key_checks = 0
2022-04-25T05:54:08.800526Z        13 Query     replace into `test`.`timestamp_columns` values ('0','1970-01-01 08:00:01'),('1','2037-01-19 11:14:07'),('2','2015-08-12 14:57:05'),('3','2022-04-25 07:53:26'),('4','2022-04-25 07:53:26'),('5','2022-04-25 07:53:27')
2022-04-25T05:54:08.801155Z        13 Query     COMMIT
  1. 使用第三方MySQL clinet访问数据 image image
  2. 增量插入数据
insert into timestamp_columns values (10,'1970-01-01 08:00:01');
insert into timestamp_columns values (11,'2037-01-19 11:14:07');
insert into timestamp_columns values (12,'2015-08-12 14:57:05');
insert into timestamp_columns values (13,now());
insert into timestamp_columns values (14,CURRENT_TIMESTAMP);
insert into timestamp_columns values (15,NULL);
  1. 检查目标端mysql general log
2022-04-25T06:22:58.883775Z        12 Query     START TRANSACTION
2022-04-25T06:22:58.884325Z        12 Prepare   replace into `test`.`timestamp_columns`
(`id`, `c_timestamp`) values
(?, ?)
2022-04-25T06:22:58.884467Z        12 Execute   replace into `test`.`timestamp_columns`
(`id`, `c_timestamp`) values
(10, '1970-01-01 07:00:01')
2022-04-25T06:22:58.884955Z        12 Execute   replace into dtle.gtid_executed_v4 (job_name,source_uuid,gtid,gtid_set) values ('aaa-migration', '^^·¡\'\Ä[^Q\ì¡3^BB¬d  ^A', 14, null)
2022-04-25T06:22:58.885213Z        12 Query     COMMIT
2022-04-25T06:22:58.885758Z        12 Query     START TRANSACTION
2022-04-25T06:22:58.885924Z        12 Execute   replace into `test`.`timestamp_columns`
(`id`, `c_timestamp`) values
(11, '2037-01-19 10:14:07')
2022-04-25T06:22:58.886147Z        12 Execute   replace into dtle.gtid_executed_v4 (job_name,source_uuid,gtid,gtid_set) values ('aaa-migration', '^^·¡\'\Ä[^Q\ì¡3^BB¬d  ^A', 15, null)
2022-04-25T06:22:58.886282Z        12 Query     COMMIT
2022-04-25T06:22:58.886852Z        12 Query     START TRANSACTION
2022-04-25T06:22:58.887017Z        12 Execute   replace into `test`.`timestamp_columns`
(`id`, `c_timestamp`) values
(12, '2015-08-12 12:57:05')
2022-04-25T06:22:58.887236Z        12 Execute   replace into dtle.gtid_executed_v4 (job_name,source_uuid,gtid,gtid_set) values ('aaa-migration', '^^·¡\'\Ä[^Q\ì¡3^BB¬d  ^A', 16, null)
2022-04-25T06:22:58.887372Z        12 Query     COMMIT
2022-04-25T06:22:58.887729Z        12 Query     START TRANSACTION
2022-04-25T06:22:58.887893Z        12 Execute   replace into `test`.`timestamp_columns`
(`id`, `c_timestamp`) values
(13, '2022-04-25 06:22:58')
2022-04-25T06:22:58.888081Z        12 Execute   replace into dtle.gtid_executed_v4 (job_name,source_uuid,gtid,gtid_set) values ('aaa-migration', '^^·¡\'\Ä[^Q\ì¡3^BB¬d  ^A', 17, null)
2022-04-25T06:22:58.888213Z        12 Query     COMMIT
2022-04-25T06:22:58.888601Z        12 Query     START TRANSACTION
2022-04-25T06:22:58.888759Z        12 Execute   replace into `test`.`timestamp_columns`
(`id`, `c_timestamp`) values
(14, '2022-04-25 06:22:58')
2022-04-25T06:22:58.888967Z        12 Execute   replace into dtle.gtid_executed_v4 (job_name,source_uuid,gtid,gtid_set) values ('aaa-migration', '^^·¡\'\Ä[^Q\ì¡3^BB¬d  ^A', 18, null)
2022-04-25T06:22:58.889096Z        12 Query     COMMIT
2022-04-25T06:22:59.572726Z        12 Query     START TRANSACTION
2022-04-25T06:22:59.573120Z        12 Execute   replace into `test`.`timestamp_columns`
(`id`, `c_timestamp`) values
(15, '2022-04-25 06:22:59')
2022-04-25T06:22:59.573557Z        12 Execute   replace into dtle.gtid_executed_v4 (job_name,source_uuid,gtid,gtid_set) values ('aaa-migration', '^^·¡\'\Ä[^Q\ì¡3^BB¬d  ^A', 19, null)
2022-04-25T06:22:59.573821Z        12 Query     COMMIT
  1. 使用第三方MySQL clinet访问数据 image image

asiroliu avatar Apr 25 '22 06:04 asiroliu

MySQL-MySQL :

将源端用固定的time_zone 读取,目标端也用同一个time_zone写入

MySQL-Kafka

将源端用固定的utc -0 的time_zone 读取

默认使用utc时区读写,考虑增加一个配置来决定读写时区

LordofAvernus avatar Apr 25 '22 07:04 LordofAvernus

读取TIMESTAMP使用默认使用MySQL server的时区 https://dev.mysql.com/doc/refman/5.7/en/datetime.html

MySQL converts TIMESTAMP values from the current time zone to UTC for storage, and back from UTC to the current time zone for retrieval. (This does not occur for other types such as DATETIME.) By default, the current time zone for each connection is the server's time. The time zone can be set on a per-connection basis.

asiroliu avatar Apr 26 '22 05:04 asiroliu

MySQL 行为

$ timedatectl | grep "Time zone"
                       Time zone: Etc/UTC (UTC, +0000)

mysql> create table a.a957 (id int primary key auto_increment, val1 timestamp, val2 datetime);

mysql> select @@time_zone;
| SYSTEM      |

mysql> insert into a.a957 values (0, now(), now());

mysql> set time_zone = '+08:00';

mysql> insert into a.a957 values (0, now(), now());

mysql> select * from a.a957;
|  1 | 2022-11-22 17:17:47 | 2022-11-22 09:17:47 |
|  2 | 2022-11-22 17:18:51 | 2022-11-22 17:18:51 |

mysql> set time_zone = 'system';

mysql> select * from a.a957;
|  1 | 2022-11-22 09:17:47 | 2022-11-22 09:17:47 |
|  2 | 2022-11-22 09:18:51 | 2022-11-22 17:18:51 |

ghost avatar Nov 22 '22 09:11 ghost

fixed version:

9.9.9.9-master-7a98826

asiroliu avatar Nov 24 '22 09:11 asiroliu