debezium-datetime-converter
debezium-datetime-converter copied to clipboard
timestamp类型的处理
大佬,请问下timestamp类型的怎么处理呢?
converters=timestamp datetime.type=com.darcytech.debezium.converter.MySqlDateTimeConverter datetime.format.date=yyyy-MM-dd datetime.format.time=HH:mm:ss datetime.format.datetime=yyyy-MM-dd HH:mm:ss datetime.format.timestamp=yyyy-MM-dd HH:mm:ss datetime.format.timestamp.zone=UTC+8
这样配置以后,解析出来的是空值。
更正下配置文件,是这样的: converters=datetime datetime.type=com.darcytech.debezium.converter.MySqlDateTimeConverter datetime.format.date=yyyy-MM-dd datetime.format.time=HH:mm:ss datetime.format.datetime=yyyy-MM-dd HH:mm:ss datetime.format.timestamp=yyyy-MM-dd HH:mm:ss datetime.format.timestamp.zone=UTC+8
但是timestamp类型的解析出来之后就是空的
你用的是哪个版本的呢,我当时用的是1.4版,不清楚后面的版本有没有改动过
我用的是1.2的版本,是对历史数据解析出来是空的,新增的数据解析正常。
@louteq 可以看一下SnapshotReader 和 MySqlValueConverters这两个类怎么弄的。我这边会重新发个版解决一下这个问题
我用的1.5.4下,timestamp,datatime都是空,date是正常的
我用的1.8,timestamp,datatime都是空
Hello @fengchi66 / @holmofy Did you find any solution for this? I tested initially with debezium 2.0 and it was working fine Now I'm testing it on debezium 2.1.1. Initial data snapshot sends null value for timestamp/datetime column. When we insert a new row, then we get actual values for timestamp/datetime
kafka verson: 3.2.2 debezium verson:1.9.6
需要修改convertDateTime方法、判断是否是java.sql.Timestamp、并进行转换
/**
private String convertDateTime(Object input) {
if (input instanceof LocalDateTime) {
return datetimeFormatter.format((LocalDateTime) input);
}
if (input instanceof java.sql.Timestamp) {
Date date = (java.sql.Timestamp) input;
LocalDateTime localDateTime = dateToLocalDateTime(date);
return datetimeFormatter.format(localDateTime);
}
log.error("MySqlDateTimeConverter convertDateTime is fail getClass:{}、value:{}", input.getClass(), input.toString());
return Optional.ofNullable(input).map(p -> p.toString()).orElse(null);
}
/**
* 将 Date 转为 LocalDateTime
*
* @param date
* @return java.time.LocalDateTime;
*/
public LocalDateTime dateToLocalDateTime(Date date) {
return date.toInstant().atZone(timestampZoneId).toLocalDateTime();
}
Hello @ultimatech67 Thanks for identifying this. can you create a new jar and share with us? So we can test it with latest confluent kafka and debezium 2.1 versions? thanks
Yes、 kafka and debezium 2.1 timestamp converter、It has been used in the project
------------------ 原始邮件 ------------------ 发件人: "holmofy/debezium-datetime-converter" @.>; 发送时间: 2023年2月23日(星期四) 凌晨1:52 @.>; @.@.>; 主题: Re: [holmofy/debezium-datetime-converter] timestamp类型的处理 (#2)
Hello @ultimatech67 Thanks for identifying this. can you create a new jar and share with us? So we can test it with latest confluent kafka and debezium 2.1 versions? thanks
— Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you were mentioned.Message ID: @.***>
package com.darcytech.debezium.converter;
import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.connect.data.SchemaBuilder;
import java.time.*;
import java.time.format.DateTimeFormatter;
import java.util.Date;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.function.Consumer;
/**
* 处理Debezium时间转换的问题
* Debezium默认将MySQL中datetime类型转成UTC的时间戳({@link io.debezium.time.Timestamp}),时区是写死的没法儿改,
* 导致数据库中设置的UTC+8,到kafka中变成了多八个小时的long型时间戳
* Debezium默认将MySQL中的timestamp类型转成UTC的字符串。
* | mysql | mysql-binlog-connector | debezium |
* | ----------------------------------- | ---------------------------------------- | --------------------------------- |
* | date<br>(2021-01-28) | LocalDate<br/>(2021-01-28) | Integer<br/>(18655) |
* | time<br/>(17:29:04) | Duration<br/>(PT17H29M4S) | Long<br/>(62944000000) |
* | timestamp<br/>(2021-01-28 17:29:04) | ZonedDateTime<br/>(2021-01-28T09:29:04Z) | String<br/>(2021-01-28T09:29:04Z) |
* | Datetime<br/>(2021-01-28 17:29:04) | LocalDateTime<br/>(2021-01-28T17:29:04) | Long<br/>(1611854944000) |
*
* @see io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter
* 源代码地址: https://github.com/debezium/debezium/tree/1.9/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql
*/
@Slf4j
public class MySqlDateTimeConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {
private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE;
private DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_TIME;
private DateTimeFormatter datetimeFormatter = DateTimeFormatter.ISO_DATE_TIME;
private DateTimeFormatter timestampFormatter = DateTimeFormatter.ISO_DATE_TIME;
private ZoneId timestampZoneId = ZoneId.systemDefault();
private static final String DEFAULT_DATE = "yyyy-MM-dd";
private static final String DEFAULT_TIME = "HH:mm:ss";
private static final String DEFAULT_DATETIME = "yyyy-MM-dd HH:mm:ss";
private static final String DEFAULT_TIMESTAMP_ZONE = "UTC+8";
@Override
public void configure(Properties props) {
readProps(props, "format.date", p -> dateFormatter = DateTimeFormatter.ofPattern(p));
readProps(props, "format.time", p -> timeFormatter = DateTimeFormatter.ofPattern(p));
readProps(props, "format.datetime", p -> datetimeFormatter = DateTimeFormatter.ofPattern(p));
readProps(props, "format.timestamp", p -> timestampFormatter = DateTimeFormatter.ofPattern(p));
readProps(props, "format.timestamp.zone", z -> timestampZoneId = ZoneId.of(z));
}
private void readProps(Properties properties, String settingKey, Consumer<String> callback) {
String settingValue = (String) properties.get(settingKey);
if (settingValue == null || settingValue.length() == 0) {
if (settingKey.equals("format.date")) {
settingValue = DEFAULT_DATE;
} else if (settingKey.equals("format.time")) {
settingValue = DEFAULT_TIME;
} else if (settingKey.equals("format.datetime")) {
settingValue = DEFAULT_DATETIME;
} else if (settingKey.equals("format.timestamp")) {
settingValue = DEFAULT_DATETIME;
} else if (settingKey.equals("format.timestamp.zone")) {
settingValue = DEFAULT_TIMESTAMP_ZONE;
} else {
return;
}
}
try {
callback.accept(settingValue.trim());
} catch (IllegalArgumentException | DateTimeException e) {
log.error("MySqlDateTimeConverter The \"{}\" setting is illegal:{}", settingKey, settingValue);
throw e;
}
}
@Override
public void converterFor(RelationalColumn column, ConverterRegistration<SchemaBuilder> registration) {
String sqlType = column.typeName();
SchemaBuilder schemaBuilder = null;
Converter converter = null;
if ("DATE".equalsIgnoreCase(sqlType)) {
schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.date.string");
converter = this::convertDate;
}
if ("TIME".equalsIgnoreCase(sqlType)) {
schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.time.string");
converter = this::convertTime;
}
if ("DATETIME".equalsIgnoreCase(sqlType)) {
schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.datetime.string");
converter = this::convertDateTime;
}
if ("TIMESTAMP".equalsIgnoreCase(sqlType)) {
schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.timestamp.string");
converter = this::convertTimestamp;
}
if (schemaBuilder != null) {
registration.register(schemaBuilder, converter);
log.info("MySqlDateTimeConverter register converter for sqlType {} to schema {}", sqlType, schemaBuilder.name());
}
}
private String convertDate(Object input) {
if (Objects.isNull(input)) {
return null;
}
if (input instanceof LocalDate) {
return dateFormatter.format((LocalDate) input);
}
if (input instanceof Integer) {
LocalDate date = LocalDate.ofEpochDay((Integer) input);
return dateFormatter.format(date);
}
log.error("MySqlDateTimeConverter convertDate is fail getClass:{}、value:{}", input.getClass(), input.toString());
return Optional.ofNullable(input).map(p -> p.toString()).orElse(null);
}
private String convertTime(Object input) {
if (Objects.isNull(input)) {
return null;
}
if (input instanceof Duration) {
Duration duration = (Duration) input;
long seconds = duration.getSeconds();
int nano = duration.getNano();
LocalTime time = LocalTime.ofSecondOfDay(seconds).withNano(nano);
return timeFormatter.format(time);
}
log.error("MySqlDateTimeConverter convertTime is fail getClass:{}、value:{}", input.getClass(), input.toString());
return Optional.ofNullable(input).map(p -> p.toString()).orElse(null);
}
private String convertDateTime(Object input) {
if (Objects.isNull(input)) {
return null;
}
if (input instanceof LocalDateTime) {
return datetimeFormatter.format((LocalDateTime) input);
}
if (input instanceof java.sql.Timestamp) {
Date date = (java.sql.Timestamp) input;
LocalDateTime localDateTime = dateToLocalDateTime(date);
return datetimeFormatter.format(localDateTime);
}
log.error("MySqlDateTimeConverter convertDateTime is fail getClass:{}、value:{}", input.getClass(), input.toString());
return Optional.ofNullable(input).map(p -> p.toString()).orElse(null);
}
private String convertTimestamp(Object input) {
if (Objects.isNull(input)) {
return null;
}
if (input instanceof ZonedDateTime) {
// mysql的timestamp会转成UTC存储,这里的zonedDatetime都是UTC时间
ZonedDateTime zonedDateTime = (ZonedDateTime) input;
LocalDateTime localDateTime = zonedDateTime.withZoneSameInstant(timestampZoneId).toLocalDateTime();
return timestampFormatter.format(localDateTime);
}
log.error("MySqlDateTimeConverter convertTimestamp is fail getClass:{}、value:{}", input.getClass(), input.toString());
return Optional.ofNullable(input).map(p -> p.toString()).orElse(null);
}
/**
* 将 Date 转为 LocalDateTime
*
* @param date
* @return java.time.LocalDateTime;
*/
public LocalDateTime dateToLocalDateTime(Date date) {
return date.toInstant().atZone(timestampZoneId).toLocalDateTime();
}
}
Hello @ultimatech67 Thanks for the code. Is it possible for you to send/attach the jar file as I'm not experienced in java
Thanks and Regards, Muhammad Ali