kafka-connect-jdbc
kafka-connect-jdbc copied to clipboard
How to migrate enum type to another postgres?
Hi everyone, currently I'm migrating data from one Postgres to another that using Debezium and jdbc sink connector. My source DB schema has a table name "film" that contains a column name "rating" with type of enum. When "rating" column's data transfers from source database to Kafka via Debezium it has been transformed into string. And then the jdbc connect cannot convert them back to enum. So are there any ways to migrate enum type from a postgres to another?
Film table avro schema:
{
"subject": "dvdrental.public.film-value",
"version": 1,
"id": 12,
"schema": "{\"type\":\"record\",\"name\":\"Envelope\",\"namespace\":\"dvdrental.public.film\",\"fields\":[{\"name\":\"before\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Value\",\"fields\":[{\"name\":\"film_id\",\"type\":{\"type\":\"int\",\"connect.default\":0},\"default\":0},{\"name\":\"title\",\"type\":\"string\"},{\"name\":\"description\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"release_year\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"language_id\",\"type\":{\"type\":\"int\",\"connect.type\":\"int16\"}},{\"name\":\"rental_duration\",\"type\":{\"type\":\"int\",\"connect.default\":3,\"connect.type\":\"int16\"},\"default\":3},{\"name\":\"rental_rate\",\"type\":{\"type\":\"bytes\",\"scale\":2,\"precision\":4,\"connect.version\":1,\"connect.parameters\":{\"scale\":\"2\",\"connect.decimal.precision\":\"4\"},\"connect.default\":\"\\u0001ó\",\"connect.name\":\"org.apache.kafka.connect.data.Decimal\",\"logicalType\":\"decimal\"},\"default\":\"\\u0001ó\"},{\"name\":\"length\",\"type\":[\"null\",{\"type\":\"int\",\"connect.type\":\"int16\"}],\"default\":null},{\"name\":\"replacement_cost\",\"type\":{\"type\":\"bytes\",\"scale\":2,\"precision\":5,\"connect.version\":1,\"connect.parameters\":{\"scale\":\"2\",\"connect.decimal.precision\":\"5\"},\"connect.default\":\"\\u0007Ï\",\"connect.name\":\"org.apache.kafka.connect.data.Decimal\",\"logicalType\":\"decimal\"},\"default\":\"\\u0007Ï\"},{\"name\":\"rating\",\"type\":[\"null\",{\"type\":\"string\",\"connect.version\":1,\"connect.parameters\":{\"allowed\":\"G,PG,PG-13,R,NC-17\"},\"connect.name\":\"io.debezium.data.Enum\"}],\"default\":null},{\"name\":\"last_update\",\"type\":{\"type\":\"long\",\"connect.version\":1,\"connect.default\":0,\"connect.name\":\"org.apache.kafka.connect.data.Timestamp\",\"logicalType\":\"timestamp-millis\"},\"default\":0},{\"name\":\"special_features\",\"type\":[\"null\",{\"type\":\"array\",\"items\":[\"null\",\"string\"]}],\"default\":null}],\"connect.name\":\"dvdrental.public.film.Value\"}],\"default\":null},{\"name\":\"after\",\"type\":[\"null\",\"Value\"],\"default\":null},{\"name\":\"source\",\"type\":{\"type\":\"record\",\"name\":\"Source\",\"namespace\":\"io.debezium.connector.postgresql\",\"fields\":[{\"name\":\"version\",\"type\":\"string\"},{\"name\":\"connector\",\"type\":\"string\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"ts_ms\",\"type\":\"long\"},{\"name\":\"snapshot\",\"type\":[{\"type\":\"string\",\"connect.version\":1,\"connect.parameters\":{\"allowed\":\"true,last,false,incremental\"},\"connect.default\":\"false\",\"connect.name\":\"io.debezium.data.Enum\"},\"null\"],\"default\":\"false\"},{\"name\":\"db\",\"type\":\"string\"},{\"name\":\"sequence\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"schema\",\"type\":\"string\"},{\"name\":\"table\",\"type\":\"string\"},{\"name\":\"txId\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"lsn\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"xmin\",\"type\":[\"null\",\"long\"],\"default\":null}],\"connect.name\":\"io.debezium.connector.postgresql.Source\"}},{\"name\":\"op\",\"type\":\"string\"},{\"name\":\"ts_ms\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"transaction\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"ConnectDefault\",\"namespace\":\"io.confluent.connect.avro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"total_order\",\"type\":\"long\"},{\"name\":\"data_collection_order\",\"type\":\"long\"}]}],\"default\":null}],\"connect.name\":\"dvdrental.public.film.Envelope\"}"
jdbc sink connect registry
{"name":"dest-connector-film","config":{"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector","tasks.max":"1","topics":"dvdrental.public.film","connection.url":"jdbc:postgresql://dest-postgres:5432/dvdrental?user=postgres&password=123456","transforms":"unwrap","transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState","auto.create":"true","insert.mode":"upsert","pk.fields":"film_id","pk.mode":"record_value","key.converter":"io.confluent.connect.avro.AvroConverter","key.converter.schema.registry.url":"http://schema-registry:8081","key.converter.enhanced.avro.schema.support":"true","value.converter":"io.confluent.connect.avro.AvroConverter","value.converter.schema.registry.url":"http://schema-registry:8081","value.converter.enhanced.avro.schema.support":"true"}}
Error trace log:
Caused by: java.sql.SQLException: Exception chain:
java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "dvdrental"."public"."film" ("film_id","title","description","release_year","language_id","rental_duration","rental_rate","length","replacement_cost","rating","last_update","special_features") VALUES (133,'Chamber Italian','A Fateful Reflection of a Moose And a Husband who must Overcome a Monkey in Nigeria',2006,1,7,4.99,117,14.99,'NC-17','2013-05-26 14:50:58.951+00',?) ON CONFLICT ("film_id") DO UPDATE SET "title"=EXCLUDED."title","description"=EXCLUDED."description","release_year"=EXCLUDED."release_year","language_id"=EXCLUDED."language_id","rental_duration"=EXCLUDED."rental_duration","rental_rate"=EXCLUDED."rental_rate","length"=EXCLUDED."length","replacement_cost"=EXCLUDED."replacement_cost","rating"=EXCLUDED."rating","last_update"=EXCLUDED."last_update","special_features"=EXCLUDED."special_features" was aborted: ERROR: column "rating" is of type mpaa_rating but expression is of type character varying
Hint: You will need to rewrite or cast the expression.
Position: 241 Call getNextException to see other errors in the batch.
org.postgresql.util.PSQLException: ERROR: column "rating" is of type mpaa_rating but expression is of type character varying
Hint: You will need to rewrite or cast the expression.
Position: 241
org.postgresql.util.PSQLException: ERROR: column "rating" is of type mpaa_rating but expression is of type character varying
Hint: You will need to rewrite or cast the expression.
Position: 241
at io.confluent.connect.jdbc.sink.JdbcSinkTask.getAllMessagesException(JdbcSinkTask.java:150)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:102)
... 11 more
Curious about this as well. I have the same problem within the context of psql to psql migration.