kafka-connect-jdbc icon indicating copy to clipboard operation
kafka-connect-jdbc copied to clipboard

How to migrate enum type to another postgres?

Open hunghoang-ct opened this issue 3 years ago • 1 comments

Hi everyone, currently I'm migrating data from one Postgres to another that using Debezium and jdbc sink connector. My source DB schema has a table name "film" that contains a column name "rating" with type of enum. When "rating" column's data transfers from source database to Kafka via Debezium it has been transformed into string. And then the jdbc connect cannot convert them back to enum. So are there any ways to migrate enum type from a postgres to another?

Film table avro schema:

{
  "subject": "dvdrental.public.film-value",
  "version": 1,
  "id": 12,
  "schema": "{\"type\":\"record\",\"name\":\"Envelope\",\"namespace\":\"dvdrental.public.film\",\"fields\":[{\"name\":\"before\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Value\",\"fields\":[{\"name\":\"film_id\",\"type\":{\"type\":\"int\",\"connect.default\":0},\"default\":0},{\"name\":\"title\",\"type\":\"string\"},{\"name\":\"description\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"release_year\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"language_id\",\"type\":{\"type\":\"int\",\"connect.type\":\"int16\"}},{\"name\":\"rental_duration\",\"type\":{\"type\":\"int\",\"connect.default\":3,\"connect.type\":\"int16\"},\"default\":3},{\"name\":\"rental_rate\",\"type\":{\"type\":\"bytes\",\"scale\":2,\"precision\":4,\"connect.version\":1,\"connect.parameters\":{\"scale\":\"2\",\"connect.decimal.precision\":\"4\"},\"connect.default\":\"\\u0001ó\",\"connect.name\":\"org.apache.kafka.connect.data.Decimal\",\"logicalType\":\"decimal\"},\"default\":\"\\u0001ó\"},{\"name\":\"length\",\"type\":[\"null\",{\"type\":\"int\",\"connect.type\":\"int16\"}],\"default\":null},{\"name\":\"replacement_cost\",\"type\":{\"type\":\"bytes\",\"scale\":2,\"precision\":5,\"connect.version\":1,\"connect.parameters\":{\"scale\":\"2\",\"connect.decimal.precision\":\"5\"},\"connect.default\":\"\\u0007Ï\",\"connect.name\":\"org.apache.kafka.connect.data.Decimal\",\"logicalType\":\"decimal\"},\"default\":\"\\u0007Ï\"},{\"name\":\"rating\",\"type\":[\"null\",{\"type\":\"string\",\"connect.version\":1,\"connect.parameters\":{\"allowed\":\"G,PG,PG-13,R,NC-17\"},\"connect.name\":\"io.debezium.data.Enum\"}],\"default\":null},{\"name\":\"last_update\",\"type\":{\"type\":\"long\",\"connect.version\":1,\"connect.default\":0,\"connect.name\":\"org.apache.kafka.connect.data.Timestamp\",\"logicalType\":\"timestamp-millis\"},\"default\":0},{\"name\":\"special_features\",\"type\":[\"null\",{\"type\":\"array\",\"items\":[\"null\",\"string\"]}],\"default\":null}],\"connect.name\":\"dvdrental.public.film.Value\"}],\"default\":null},{\"name\":\"after\",\"type\":[\"null\",\"Value\"],\"default\":null},{\"name\":\"source\",\"type\":{\"type\":\"record\",\"name\":\"Source\",\"namespace\":\"io.debezium.connector.postgresql\",\"fields\":[{\"name\":\"version\",\"type\":\"string\"},{\"name\":\"connector\",\"type\":\"string\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"ts_ms\",\"type\":\"long\"},{\"name\":\"snapshot\",\"type\":[{\"type\":\"string\",\"connect.version\":1,\"connect.parameters\":{\"allowed\":\"true,last,false,incremental\"},\"connect.default\":\"false\",\"connect.name\":\"io.debezium.data.Enum\"},\"null\"],\"default\":\"false\"},{\"name\":\"db\",\"type\":\"string\"},{\"name\":\"sequence\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"schema\",\"type\":\"string\"},{\"name\":\"table\",\"type\":\"string\"},{\"name\":\"txId\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"lsn\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"xmin\",\"type\":[\"null\",\"long\"],\"default\":null}],\"connect.name\":\"io.debezium.connector.postgresql.Source\"}},{\"name\":\"op\",\"type\":\"string\"},{\"name\":\"ts_ms\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"transaction\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"ConnectDefault\",\"namespace\":\"io.confluent.connect.avro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"total_order\",\"type\":\"long\"},{\"name\":\"data_collection_order\",\"type\":\"long\"}]}],\"default\":null}],\"connect.name\":\"dvdrental.public.film.Envelope\"}"

jdbc sink connect registry

{"name":"dest-connector-film","config":{"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector","tasks.max":"1","topics":"dvdrental.public.film","connection.url":"jdbc:postgresql://dest-postgres:5432/dvdrental?user=postgres&password=123456","transforms":"unwrap","transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState","auto.create":"true","insert.mode":"upsert","pk.fields":"film_id","pk.mode":"record_value","key.converter":"io.confluent.connect.avro.AvroConverter","key.converter.schema.registry.url":"http://schema-registry:8081","key.converter.enhanced.avro.schema.support":"true","value.converter":"io.confluent.connect.avro.AvroConverter","value.converter.schema.registry.url":"http://schema-registry:8081","value.converter.enhanced.avro.schema.support":"true"}}

Error trace log:

Caused by: java.sql.SQLException: Exception chain:
java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "dvdrental"."public"."film" ("film_id","title","description","release_year","language_id","rental_duration","rental_rate","length","replacement_cost","rating","last_update","special_features") VALUES (133,'Chamber Italian','A Fateful Reflection of a Moose And a Husband who must Overcome a Monkey in Nigeria',2006,1,7,4.99,117,14.99,'NC-17','2013-05-26 14:50:58.951+00',?) ON CONFLICT ("film_id") DO UPDATE SET "title"=EXCLUDED."title","description"=EXCLUDED."description","release_year"=EXCLUDED."release_year","language_id"=EXCLUDED."language_id","rental_duration"=EXCLUDED."rental_duration","rental_rate"=EXCLUDED."rental_rate","length"=EXCLUDED."length","replacement_cost"=EXCLUDED."replacement_cost","rating"=EXCLUDED."rating","last_update"=EXCLUDED."last_update","special_features"=EXCLUDED."special_features" was aborted: ERROR: column "rating" is of type mpaa_rating but expression is of type character varying
  Hint: You will need to rewrite or cast the expression.
  Position: 241  Call getNextException to see other errors in the batch.
org.postgresql.util.PSQLException: ERROR: column "rating" is of type mpaa_rating but expression is of type character varying
  Hint: You will need to rewrite or cast the expression.
  Position: 241
org.postgresql.util.PSQLException: ERROR: column "rating" is of type mpaa_rating but expression is of type character varying
  Hint: You will need to rewrite or cast the expression.
  Position: 241

	at io.confluent.connect.jdbc.sink.JdbcSinkTask.getAllMessagesException(JdbcSinkTask.java:150)
	at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:102)
	... 11 more

hunghoang-ct avatar Jan 17 '22 04:01 hunghoang-ct

Curious about this as well. I have the same problem within the context of psql to psql migration.

maurolscla avatar May 12 '22 20:05 maurolscla