iceberg
iceberg copied to clipboard
Creating 2 or more Iceburg Kafka Sink Connecturs using slow JDBC catalog causes a startup failure on first run.
Apache Iceberg version
1.9.0
Query engine
Kafka Connect
Please describe the bug 🐞
If Kafka Connect is started with 2 or more Iceberg sink connectors that are using JDBC catalog and the JDBC is slow the some of the connectors will throw:
org.apache.iceberg.jdbc.UncheckedSQLException: Cannot initialize JDBC catalog
at org.apache.iceberg.jdbc.JdbcCatalog.initializeCatalogTables(JdbcCatalog.java:206)
at org.apache.iceberg.jdbc.JdbcCatalog.initialize(JdbcCatalog.java:147)
at org.apache.iceberg.CatalogUtil.loadCatalog(CatalogUtil.java:277)
at org.apache.iceberg.CatalogUtil.buildIcebergCatalog(CatalogUtil.java:331)
at org.apache.iceberg.connect.CatalogUtils.loadCatalog(CatalogUtils.java:44)
at org.apache.iceberg.connect.IcebergSinkTask.start(IcebergSinkTask.java:48)
at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:323)
at org.apache.kafka.connect.runtime.WorkerTask.doStart(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:224)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:280)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.postgresql.util.PSQLException: ERROR: duplicate key value violates unique constraint "pg_type_typname_nsp_index"
Detail: Key (typname, typnamespace)=(iceberg_tables, 2200) already exists.
at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2733)
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2420)
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:372)
at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:517)
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:434)
at org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:194)
at org.postgresql.jdbc.PgPreparedStatement.execute(PgPreparedStatement.java:180)
at org.apache.iceberg.jdbc.JdbcCatalog.lambda$initializeCatalogTables$0(JdbcCatalog.java:178)
at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:72)
at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:65)
at org.apache.iceberg.jdbc.JdbcCatalog.initializeCatalogTables(JdbcCatalog.java:162)
... 15 more
This is caused because the JdbcCatalog checks to see if the table exists and if not creates it. However this leaves a window where a race condition can arise when another thread creates the table before the create is called.
This only occurs the first time the connector is started as the error is that the table exists and the table existence check will detect that case.
I have a test and fix for this bug and will submit same.
Willingness to contribute
- [x] I can contribute a fix for this bug independently
- [ ] I would be willing to contribute a fix for this bug with guidance from the Iceberg community
- [ ] I cannot contribute a fix for this bug at this time
What's the value of the JdbcUtil configuration about that ?
We are suppose to create table at JDBC Catalog init, before the sink is "running".
Fix in https://github.com/apache/iceberg/pull/13345
@jbonofre I don't understand the first line of your comment.
The problem here is a race condition.