kafka-connect-jdbc
kafka-connect-jdbc copied to clipboard
JDBC Sink to Postgres cannot ignore duplicates
The JDBC Sink has three insert modes: insert
, upsert
, and update
. In Postgres, using upsert
produces an insert of the form INSERT ... ON CONFLICT (...) DO UPDATE SET ...
`, which replaces the old record with the new record, as expected.
However, Postgres has an alternate form of insert
of the form [INSERT ... ON CONFLICT (...) DO NOTHING
], which discards the new record. This is useful when we assume that the upstream data does not change, and thus any conflicts represent duplicates. I realize that some database lack an upsert
so they get this for free, but not all of us are so lucky. :smile:
Could a 4th mode be added that ignores conflicting records?
+1
I think that this is possibly related to this #401.
Unfortunately the codebase has been changed a little bit so I'm currently working on porting the suggested change in a fork.
Resolved with https://github.com/confluentinc/kafka-connect-jdbc/pull/605
hey @slepkin did you manage to achieve a ON CONFLICT DO NOTHING
query? I only see that these semantics can be achieved when all fields in the table are included in the primary key, and therefore nonKeyFields
is empty.
@cyrusv is it possible to reopen this issue?
Glancing at #605, it looks like @hashhar made the Postgres JDBC Sink skip duplicates only when nonKeyColumns
is empty. This was certainly the most urgent case to address, since the Sink was completely nonfunctional in that case. But there's still value is making this an option in all cases, so I'd support reopening the issue.
I'm happy to re-open the issue, but it will help us to prioritize the fix if we can get a more detailed explanation here? Or link a PR if one already exists?
Hi @cyrusv, no PR I'm afraid, but the explanation is quite straightforward.
For example, given the table...
create table events
(
timestamp bigint not null,
thing_id int not null,
stuff text not null
constraint events_pkey
primary key (timestamp, thing_id)
);
...we want to deduplicate data in the database on timestamp
and thing_id
.
In this case, we wish our "upsert" statement to not really be an upsert at all, but just not fail on insert. ie. ON CONFLICT DO NOTHING
As it currently stands, the ON CONFLICT DO NOTHING
semantics can be achieved only if all columns (stuff
in our example) are included in the primary key.
Simply using insert.mode=insert
and tolerating failures doesn't work, because we wish other failure modes to actually cause an exception.
Thanks for the feature request, we'll prioritize it against our other new features @NathanNam
hey @cyrusv, do you perhaps have some advice on adding a custom dialect to the classpath? As a workaround for the time being I've implemented a PostgresInsertOnConflictDoNothingSqlDatabaseDialect
, but I'm having a tough time getting this to be registered as a valid dialect.
kafka-connect | [2020-10-13 19:16:27,211] DEBUG Searching for and loading all JDBC source dialects on the classpath (io.confluent.connect.jdbc.dialect.DatabaseDialects:74)
kafka-connect | [2020-10-13 19:16:27,219] DEBUG Found 'GenericDatabaseDialect' provider class io.confluent.connect.jdbc.dialect.GenericDatabaseDialect$Provider (io.confluent.connect.jdbc.dialect.DatabaseDialects:89)
kafka-connect | [2020-10-13 19:16:27,221] DEBUG Found 'Db2DatabaseDialect' provider class io.confluent.connect.jdbc.dialect.Db2DatabaseDialect$Provider (io.confluent.connect.jdbc.dialect.DatabaseDialects:89)
kafka-connect | [2020-10-13 19:16:27,222] DEBUG Found 'DerbyDatabaseDialect' provider class io.confluent.connect.jdbc.dialect.DerbyDatabaseDialect$Provider (io.confluent.connect.jdbc.dialect.DatabaseDialects:89)
kafka-connect | [2020-10-13 19:16:27,224] DEBUG Found 'OracleDatabaseDialect' provider class io.confluent.connect.jdbc.dialect.OracleDatabaseDialect$Provider (io.confluent.connect.jdbc.dialect.DatabaseDialects:89)
kafka-connect | [2020-10-13 19:16:27,225] DEBUG Found 'SqliteDatabaseDialect' provider class io.confluent.connect.jdbc.dialect.SqliteDatabaseDialect$Provider (io.confluent.connect.jdbc.dialect.DatabaseDialects:89)
kafka-connect | [2020-10-13 19:16:27,226] DEBUG Found 'PostgreSqlDatabaseDialect' provider class io.confluent.connect.jdbc.dialect.PostgreSqlDatabaseDialect$Provider (io.confluent.connect.jdbc.dialect.DatabaseDialects:89)
kafka-connect | [2020-10-13 19:16:27,227] DEBUG Found 'MySqlDatabaseDialect' provider class io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect$Provider (io.confluent.connect.jdbc.dialect.DatabaseDialects:89)
kafka-connect | [2020-10-13 19:16:27,229] DEBUG Found 'SqlServerDatabaseDialect' provider class io.confluent.connect.jdbc.dialect.SqlServerDatabaseDialect$Provider (io.confluent.connect.jdbc.dialect.DatabaseDialects:89)
kafka-connect | [2020-10-13 19:16:27,230] DEBUG Found 'SapHanaDatabaseDialect' provider class io.confluent.connect.jdbc.dialect.SapHanaDatabaseDialect$Provider (io.confluent.connect.jdbc.dialect.DatabaseDialects:89)
kafka-connect | [2020-10-13 19:16:27,231] DEBUG Found 'SybaseDatabaseDialect' provider class io.confluent.connect.jdbc.dialect.SybaseDatabaseDialect$Provider (io.confluent.connect.jdbc.dialect.DatabaseDialects:89)
kafka-connect | [2020-10-13 19:16:27,232] DEBUG Found 'VerticaDatabaseDialect' provider class io.confluent.connect.jdbc.dialect.VerticaDatabaseDialect$Provider (io.confluent.connect.jdbc.dialect.DatabaseDialects:89)
kafka-connect | [2020-10-13 19:16:27,233] DEBUG Registered 11 source dialects (io.confluent.connect.jdbc.dialect.DatabaseDialects:100)
It seems that this class can't be registered as a plugin
somehow. Any ideas on how to make this work would be greatly appreciated.
@gharris1727, any advice here?
@markns To load a custom dialect, you will need three components:
- The Dialect class itself (
PostgresInsertOnConflictDoNothingSqlDatabaseDialect
) -
The Dialect Provider (typically
PostgresInsertOnConflictDoNothingSqlDatabaseDialect$Provider
) -
A ServiceLoader configuration file (exactly
META-INF/services/io.confluent.connect.jdbc.dialect.DatabaseDialectProvider
) with a line referencing your custom dialect provider. I don't believe that you need to reference the existing dialect providers, only your new dialect provider.
You can add all of these classes/files in a separate jar in the same directory as the main kafka-connect-jdbc
jar, and the SerivceLoader mechanism should be able to pick them up the next time you restart your worker.
Hey @gharris1727 thanks very much. I'll try it out next week.
@markns Did you ever get your custom dialect to work? I've followed the steps laid out by @gharris1727, but the dialect refuses to load even though the converters and smts I have in the same jar are loaded no problem. Have tried placing the jar in every conceivable place possible (starting with /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib
which is where the plugin is loaded from and is on the plugin.path
), but to no avail.
After struggling with this for several hours, I realized that the META-INF/services
file was getting overridden by a dependency that should've been marked as provided
. For anyone who comes upon this thread, make sure you take the jar should not contain any dependencies provided by kafka connect runtime
warning seriously.
Hey guys is there's any workaround for this besides the custom dialect?