flink-cdc
flink-cdc copied to clipboard
[Bug] [Postgres] 全量阶段同步Time类型字段丢失精度
Search before asking
- [x] I searched in the issues and found nothing similar.
Flink version
1.14.0
Flink CDC version
2.4.1
Database and its version
PostgreSQL 10.23
Minimal reproduce step
CREATE TABLE IF NOT EXISTS s1.t1
(
id bigint NOT NULL,
tm time without time zone,
CONSTRAINT t1_pkey PRIMARY KEY (id)
)
INSERT INTO s1.t1 VALUES(1, '10:33:23.660863')
val prop = new Properties()
val pgSource = PostgresSourceBuilder.PostgresIncrementalSource.builder[String]
.hostname("localhost")
.port(5432)
.database("cdc_test")
.schemaList("s1")
.tableList("s1.t1")
.username("postgres")
.password("postgres")
.deserializer(new JsonDebeziumDeserializationSchema)
.slotName("aaa")
.decodingPluginName("pgoutput")
.debeziumProperties(prop)
.build()
env.enableCheckpointing(3000)
env.fromSource(pgSource, WatermarkStrategy.noWatermarks[String](), "PostgresParallelSource").print()
env.execute("Print Postgres Snapshot + WAL")
What did you expect to see?
"after":{"id":1,"tm":38003660863}
What did you see instead?
"after":{"id":1,"tm":38003660000}
Anything else?
com.ververica.cdc.connectors.postgres.source.fetch.PostgresScanFetchTask
private void createDataEventsForTable(
PostgresSnapshotContext snapshotContext,
EventDispatcher.SnapshotReceiver<PostgresPartition> snapshotReceiver,
Table table)
throws InterruptedException {
... ...
try (PreparedStatement selectStatement =
PostgresQueryUtils.readTableSplitDataStatement(
jdbcConnection,
selectSql,
snapshotSplit.getSplitStart() == null,
snapshotSplit.getSplitEnd() == null,
snapshotSplit.getSplitStart(),
snapshotSplit.getSplitEnd(),
snapshotSplit.getSplitKeyType().getFieldCount(),
connectorConfig.getQueryFetchSize());
ResultSet rs = selectStatement.executeQuery()) {
while (rs.next()) {
rows++;
final Object[] row = new Object[columnArray.getGreatestColumnPosition()];
for (int i = 0; i < columnArray.getColumns().length; i++) {
row[columnArray.getColumns()[i].position() - 1] = rs.getObject(i + 1);
}
... ...
}
}
}
Are you willing to submit a PR?
- [X] I'm willing to submit a PR!
@wuzhenhua01 Considering collaboration with developers around the world, please re-create your issue in English on Apache Jira under project Flink with component tag Flink CDC. Thank you!