flink-cdc
flink-cdc copied to clipboard
flinksql postgresql cdc process geometry type data I get type error
Environment :
- Flink version : 1.14
- Flink CDC version: 2.2
- Database and version: 14
When I process geometry type data I get type error ,i try cdc data from postgresql to postgresql
In the postgresql, the 'way' field type is geometry (wkb).because postgis is extended.

then i get this error in taskmanager
**java.sql.BatchUpdateException: Batch entry 0 INSERT INTO planet_osm_line(osm_id, uuid, way) VALUES (148445277, '49d261a3-8c13-4e58-a939-371c89974f8a', 'Struct{wkb=[B@5ddcacbb,srid=3857}') ON CONFLICT (uuid) DO UPDATE SET osm_id=EXCLUDED.osm_id, uuid=EXCLUDED.uuid, way=EXCLUDED.way was aborted: ERROR: parse error - invalid geometry
Hint: "St" <-- parse error at position 2 within geometry**
Where: unnamed portal parameter $3 = '...' Call getNextException to see other errors in the batch.
at org.postgresql.jdbc.BatchResultHandler.handleCompletion(BatchResultHandler.java:189) ~[flink-sql-connector-postgres-cdc-2.2.1.jar:2.2.1]
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:521) ~[flink-sql-connector-postgres-cdc-2.2.1.jar:2.2.1]
at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:851) ~[flink-sql-connector-postgres-cdc-2.2.1.jar:2.2.1]
at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:874) ~[flink-sql-connector-postgres-cdc-2.2.1.jar:2.2.1]
at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1563) ~[flink-sql-connector-postgres-cdc-2.2.1.jar:2.2.1]
at org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl.executeBatch(FieldNamedPreparedStatementImpl.java:65) ~[flink-connector-jdbc-1.15.0.jar:1.15.0]
at org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor.executeBatch(TableSimpleStatementExecutor.java:64) ~[flink-connector-jdbc-1.15.0.jar:1.15.0]
at org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor.executeBatch(TableBufferReducedStatementExecutor.java:101) ~[flink-connector-jdbc-1.15.0.jar:1.15.0]
at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.attemptFlush(JdbcOutputFormat.java:246) ~[flink-connector-jdbc-1.15.0.jar:1.15.0]
at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:216) ~[flink-connector-jdbc-1.15.0.jar:1.15.0]
at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.lambda$open$0(JdbcOutputFormat.java:155) ~[flink-connector-jdbc-1.15.0.jar:1.15.0]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_281]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [?:1.8.0_281]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_281]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [?:1.8.0_281]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_281]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_281]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_281]
**Caused by: org.postgresql.util.PSQLException: ERROR: parse error - invalid geometry**
Hint: "St" <-- parse error at position 2 within geometry
Where: unnamed portal parameter $3 = '...'
at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2532) ~[flink-sql-connector-postgres-cdc-2.2.1.jar:2.2.1]
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2267) ~[flink-sql-connector-postgres-cdc-2.2.1.jar:2.2.1]
at org.postgresql.core.v3.QueryExecutorImpl.flushIfDeadlockRisk(QueryExecutorImpl.java:1392) ~[flink-sql-connector-postgres-cdc-2.2.1.jar:2.2.1]
at org.postgresql.core.v3.QueryExecutorImpl.sendQuery(QueryExecutorImpl.java:1417) ~[flink-sql-connector-postgres-cdc-2.2.1.jar:2.2.1]
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:496) ~[flink-sql-connector-postgres-cdc-2.2.1.jar:2.2.1]
... 16 more
I found in the documentation of mysql cdc that the geometry type is converted with STRING and my statement is as follows
CREATE TABLE planet_osm_line_slave (
osm_id INT,
uuid STRING,
way STRING,
PRIMARY KEY (uuid) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc',
'hostname' = 'xxxxx',
'port' = '5432',
'username' = 'postgres',
'password' = 'xxx',
'database-name' = 'africa',
'schema-name' = 'public',
'table-name' = 'planet_osm_line'
);
CREATE TABLE planet_osm_line_master (
osm_id INT,
uuid STRING,
way STRING,
PRIMARY KEY (uuid) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://xxxx:5432/china?stringtype=unspecified',
'username' = 'postgres',
'password' = 'xxx',
'table-name' = 'planet_osm_line',
'sink.buffer-flush.max-rows' = '500000',
'sink.buffer-flush.interval' = '3s',
'sink.max-retries' = '3'
);
INSERT INTO planet_osm_line_master
SELECT slave.osm_id, slave.uuid, slave.way
FROM planet_osm_line_slave slave;
Does any expert know what type should i use to fix this error
is there anybody can help me
Closing this issue because it was created before version 2.3.0 (2022-11-10). Please try the latest version of Flink CDC to see if the issue has been resolved. If the issue is still valid, kindly report it on Apache Jira under project Flink with component tag Flink CDC. Thank you!