flink-cdc icon indicating copy to clipboard operation
flink-cdc copied to clipboard

[Bug] [Postgres] 全量阶段同步Time类型字段丢失精度

Open wuzhenhua01 opened this issue 2 years ago • 1 comments

Search before asking

  • [x] I searched in the issues and found nothing similar.

Flink version

1.14.0

Flink CDC version

2.4.1

Database and its version

PostgreSQL 10.23

Minimal reproduce step

CREATE TABLE IF NOT EXISTS s1.t1
(
    id bigint NOT NULL,
    tm time without time zone,
    CONSTRAINT t1_pkey PRIMARY KEY (id)
)
INSERT INTO s1.t1 VALUES(1, '10:33:23.660863')
val prop = new Properties()

val pgSource = PostgresSourceBuilder.PostgresIncrementalSource.builder[String]
    .hostname("localhost")
    .port(5432)
    .database("cdc_test")
    .schemaList("s1")
    .tableList("s1.t1")
    .username("postgres")
    .password("postgres")
    .deserializer(new JsonDebeziumDeserializationSchema)
    .slotName("aaa")
    .decodingPluginName("pgoutput")
    .debeziumProperties(prop)
    .build()

env.enableCheckpointing(3000)
env.fromSource(pgSource, WatermarkStrategy.noWatermarks[String](), "PostgresParallelSource").print()
env.execute("Print Postgres Snapshot + WAL")

What did you expect to see?

"after":{"id":1,"tm":38003660863}

What did you see instead?

"after":{"id":1,"tm":38003660000}

Anything else?

com.ververica.cdc.connectors.postgres.source.fetch.PostgresScanFetchTask

private void createDataEventsForTable(
                PostgresSnapshotContext snapshotContext,
                EventDispatcher.SnapshotReceiver<PostgresPartition> snapshotReceiver,
                Table table)
                throws InterruptedException {

            ... ...
            try (PreparedStatement selectStatement =
                            PostgresQueryUtils.readTableSplitDataStatement(
                                    jdbcConnection,
                                    selectSql,
                                    snapshotSplit.getSplitStart() == null,
                                    snapshotSplit.getSplitEnd() == null,
                                    snapshotSplit.getSplitStart(),
                                    snapshotSplit.getSplitEnd(),
                                    snapshotSplit.getSplitKeyType().getFieldCount(),
                                    connectorConfig.getQueryFetchSize());
                    ResultSet rs = selectStatement.executeQuery()) {

                while (rs.next()) {
                    rows++;
                    final Object[] row = new Object[columnArray.getGreatestColumnPosition()];
                    for (int i = 0; i < columnArray.getColumns().length; i++) {
                        row[columnArray.getColumns()[i].position() - 1] = rs.getObject(i + 1);
                    }
                    ... ...
                }
            }
        }

image

Are you willing to submit a PR?

  • [X] I'm willing to submit a PR!

wuzhenhua01 avatar Aug 15 '23 02:08 wuzhenhua01

@wuzhenhua01 Considering collaboration with developers around the world, please re-create your issue in English on Apache Jira under project Flink with component tag Flink CDC. Thank you!

loserwang1024 avatar Feb 19 '24 02:02 loserwang1024