flink-cdc icon indicating copy to clipboard operation
flink-cdc copied to clipboard

flinksql postgresql cdc process geometry type data I get type error

Open ya-xing opened this issue 3 years ago • 1 comments

Environment :

  • Flink version : 1.14
  • Flink CDC version: 2.2
  • Database and version: 14 When I process geometry type data I get type error ,i try cdc data from postgresql to postgresql In the postgresql, the 'way' field type is geometry (wkb).because postgis is extended. image

then i get this error in taskmanager

**java.sql.BatchUpdateException: Batch entry 0 INSERT INTO planet_osm_line(osm_id, uuid, way) VALUES (148445277, '49d261a3-8c13-4e58-a939-371c89974f8a', 'Struct{wkb=[B@5ddcacbb,srid=3857}') ON CONFLICT (uuid) DO UPDATE SET osm_id=EXCLUDED.osm_id, uuid=EXCLUDED.uuid, way=EXCLUDED.way was aborted: ERROR: parse error - invalid geometry
  Hint: "St" <-- parse error at position 2 within geometry**
  Where: unnamed portal parameter $3 = '...'  Call getNextException to see other errors in the batch.
	at org.postgresql.jdbc.BatchResultHandler.handleCompletion(BatchResultHandler.java:189) ~[flink-sql-connector-postgres-cdc-2.2.1.jar:2.2.1]
	at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:521) ~[flink-sql-connector-postgres-cdc-2.2.1.jar:2.2.1]
	at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:851) ~[flink-sql-connector-postgres-cdc-2.2.1.jar:2.2.1]
	at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:874) ~[flink-sql-connector-postgres-cdc-2.2.1.jar:2.2.1]
	at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1563) ~[flink-sql-connector-postgres-cdc-2.2.1.jar:2.2.1]
	at org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl.executeBatch(FieldNamedPreparedStatementImpl.java:65) ~[flink-connector-jdbc-1.15.0.jar:1.15.0]
	at org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor.executeBatch(TableSimpleStatementExecutor.java:64) ~[flink-connector-jdbc-1.15.0.jar:1.15.0]
	at org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor.executeBatch(TableBufferReducedStatementExecutor.java:101) ~[flink-connector-jdbc-1.15.0.jar:1.15.0]
	at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.attemptFlush(JdbcOutputFormat.java:246) ~[flink-connector-jdbc-1.15.0.jar:1.15.0]
	at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:216) ~[flink-connector-jdbc-1.15.0.jar:1.15.0]
	at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.lambda$open$0(JdbcOutputFormat.java:155) ~[flink-connector-jdbc-1.15.0.jar:1.15.0]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_281]
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [?:1.8.0_281]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_281]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [?:1.8.0_281]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_281]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_281]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_281]
**Caused by: org.postgresql.util.PSQLException: ERROR: parse error - invalid geometry**
  Hint: "St" <-- parse error at position 2 within geometry
  Where: unnamed portal parameter $3 = '...'
	at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2532) ~[flink-sql-connector-postgres-cdc-2.2.1.jar:2.2.1]
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2267) ~[flink-sql-connector-postgres-cdc-2.2.1.jar:2.2.1]
	at org.postgresql.core.v3.QueryExecutorImpl.flushIfDeadlockRisk(QueryExecutorImpl.java:1392) ~[flink-sql-connector-postgres-cdc-2.2.1.jar:2.2.1]
	at org.postgresql.core.v3.QueryExecutorImpl.sendQuery(QueryExecutorImpl.java:1417) ~[flink-sql-connector-postgres-cdc-2.2.1.jar:2.2.1]
	at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:496) ~[flink-sql-connector-postgres-cdc-2.2.1.jar:2.2.1]
	... 16 more

I found in the documentation of mysql cdc that the geometry type is converted with STRING and my statement is as follows

CREATE TABLE planet_osm_line_slave (
  osm_id INT,
  uuid STRING,
  way STRING,
  PRIMARY KEY (uuid) NOT ENFORCED
) WITH (
  'connector' = 'postgres-cdc',
  'hostname' = 'xxxxx',
  'port' = '5432',
  'username' = 'postgres',
  'password' = 'xxx',
  'database-name' = 'africa',
  'schema-name' = 'public',
  'table-name' = 'planet_osm_line'
);
CREATE TABLE planet_osm_line_master (
  osm_id INT,
  uuid STRING,
  way STRING,
  PRIMARY KEY (uuid) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:postgresql://xxxx:5432/china?stringtype=unspecified',
  'username' = 'postgres',
  'password' = 'xxx',
  'table-name' = 'planet_osm_line',
  'sink.buffer-flush.max-rows' = '500000',
  'sink.buffer-flush.interval' = '3s',
  'sink.max-retries' = '3'
);
INSERT INTO planet_osm_line_master
SELECT slave.osm_id, slave.uuid, slave.way
FROM planet_osm_line_slave slave;

Does any expert know what type should i use to fix this error

ya-xing avatar Jul 22 '22 07:07 ya-xing

is there anybody can help me

ya-xing avatar Aug 03 '22 03:08 ya-xing

Closing this issue because it was created before version 2.3.0 (2022-11-10). Please try the latest version of Flink CDC to see if the issue has been resolved. If the issue is still valid, kindly report it on Apache Jira under project Flink with component tag Flink CDC. Thank you!

PatrickRen avatar Feb 28 '24 15:02 PatrickRen