kafka-connect-jdbc icon indicating copy to clipboard operation
kafka-connect-jdbc copied to clipboard

JDBC Sink to Postgres cannot ignore duplicates

Open slepkin opened this issue 6 years ago • 15 comments

The JDBC Sink has three insert modes: insert, upsert, and update. In Postgres, using upsert produces an insert of the form INSERT ... ON CONFLICT (...) DO UPDATE SET ...`, which replaces the old record with the new record, as expected.

However, Postgres has an alternate form of insert of the form [INSERT ... ON CONFLICT (...) DO NOTHING], which discards the new record. This is useful when we assume that the upstream data does not change, and thus any conflicts represent duplicates. I realize that some database lack an upsert so they get this for free, but not all of us are so lucky. :smile:

Could a 4th mode be added that ignores conflicting records?

slepkin avatar May 15 '18 22:05 slepkin

+1

mihbor avatar Sep 12 '18 09:09 mihbor

I think that this is possibly related to this #401.

Unfortunately the codebase has been changed a little bit so I'm currently working on porting the suggested change in a fork.

fusillicode avatar Jan 09 '19 19:01 fusillicode

Resolved with https://github.com/confluentinc/kafka-connect-jdbc/pull/605

cyrusv avatar May 22 '19 00:05 cyrusv

hey @slepkin did you manage to achieve a ON CONFLICT DO NOTHING query? I only see that these semantics can be achieved when all fields in the table are included in the primary key, and therefore nonKeyFields is empty.

@cyrusv is it possible to reopen this issue?

markns avatar Oct 08 '20 10:10 markns

Glancing at #605, it looks like @hashhar made the Postgres JDBC Sink skip duplicates only when nonKeyColumns is empty. This was certainly the most urgent case to address, since the Sink was completely nonfunctional in that case. But there's still value is making this an option in all cases, so I'd support reopening the issue.

slepkin avatar Oct 08 '20 15:10 slepkin

I'm happy to re-open the issue, but it will help us to prioritize the fix if we can get a more detailed explanation here? Or link a PR if one already exists?

cyrusv avatar Oct 11 '20 23:10 cyrusv

Hi @cyrusv, no PR I'm afraid, but the explanation is quite straightforward.

For example, given the table...

create table events
(
  timestamp       bigint     not null,
  thing_id        int        not null,
  stuff           text       not null
  constraint events_pkey
    primary key (timestamp, thing_id)
);

...we want to deduplicate data in the database on timestamp and thing_id.

In this case, we wish our "upsert" statement to not really be an upsert at all, but just not fail on insert. ie. ON CONFLICT DO NOTHING

As it currently stands, the ON CONFLICT DO NOTHING semantics can be achieved only if all columns (stuff in our example) are included in the primary key.

Simply using insert.mode=insert and tolerating failures doesn't work, because we wish other failure modes to actually cause an exception.

markns avatar Oct 12 '20 18:10 markns

Thanks for the feature request, we'll prioritize it against our other new features @NathanNam

cyrusv avatar Oct 12 '20 21:10 cyrusv

hey @cyrusv, do you perhaps have some advice on adding a custom dialect to the classpath? As a workaround for the time being I've implemented a PostgresInsertOnConflictDoNothingSqlDatabaseDialect, but I'm having a tough time getting this to be registered as a valid dialect.

kafka-connect | [2020-10-13 19:16:27,211] DEBUG Searching for and loading all JDBC source dialects on the classpath (io.confluent.connect.jdbc.dialect.DatabaseDialects:74)
kafka-connect | [2020-10-13 19:16:27,219] DEBUG Found 'GenericDatabaseDialect' provider class io.confluent.connect.jdbc.dialect.GenericDatabaseDialect$Provider (io.confluent.connect.jdbc.dialect.DatabaseDialects:89)
kafka-connect | [2020-10-13 19:16:27,221] DEBUG Found 'Db2DatabaseDialect' provider class io.confluent.connect.jdbc.dialect.Db2DatabaseDialect$Provider (io.confluent.connect.jdbc.dialect.DatabaseDialects:89)
kafka-connect | [2020-10-13 19:16:27,222] DEBUG Found 'DerbyDatabaseDialect' provider class io.confluent.connect.jdbc.dialect.DerbyDatabaseDialect$Provider (io.confluent.connect.jdbc.dialect.DatabaseDialects:89)
kafka-connect | [2020-10-13 19:16:27,224] DEBUG Found 'OracleDatabaseDialect' provider class io.confluent.connect.jdbc.dialect.OracleDatabaseDialect$Provider (io.confluent.connect.jdbc.dialect.DatabaseDialects:89)
kafka-connect | [2020-10-13 19:16:27,225] DEBUG Found 'SqliteDatabaseDialect' provider class io.confluent.connect.jdbc.dialect.SqliteDatabaseDialect$Provider (io.confluent.connect.jdbc.dialect.DatabaseDialects:89)
kafka-connect | [2020-10-13 19:16:27,226] DEBUG Found 'PostgreSqlDatabaseDialect' provider class io.confluent.connect.jdbc.dialect.PostgreSqlDatabaseDialect$Provider (io.confluent.connect.jdbc.dialect.DatabaseDialects:89)
kafka-connect | [2020-10-13 19:16:27,227] DEBUG Found 'MySqlDatabaseDialect' provider class io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect$Provider (io.confluent.connect.jdbc.dialect.DatabaseDialects:89)
kafka-connect | [2020-10-13 19:16:27,229] DEBUG Found 'SqlServerDatabaseDialect' provider class io.confluent.connect.jdbc.dialect.SqlServerDatabaseDialect$Provider (io.confluent.connect.jdbc.dialect.DatabaseDialects:89)
kafka-connect | [2020-10-13 19:16:27,230] DEBUG Found 'SapHanaDatabaseDialect' provider class io.confluent.connect.jdbc.dialect.SapHanaDatabaseDialect$Provider (io.confluent.connect.jdbc.dialect.DatabaseDialects:89)
kafka-connect | [2020-10-13 19:16:27,231] DEBUG Found 'SybaseDatabaseDialect' provider class io.confluent.connect.jdbc.dialect.SybaseDatabaseDialect$Provider (io.confluent.connect.jdbc.dialect.DatabaseDialects:89)
kafka-connect | [2020-10-13 19:16:27,232] DEBUG Found 'VerticaDatabaseDialect' provider class io.confluent.connect.jdbc.dialect.VerticaDatabaseDialect$Provider (io.confluent.connect.jdbc.dialect.DatabaseDialects:89)
kafka-connect | [2020-10-13 19:16:27,233] DEBUG Registered 11 source dialects (io.confluent.connect.jdbc.dialect.DatabaseDialects:100)

It seems that this class can't be registered as a plugin somehow. Any ideas on how to make this work would be greatly appreciated.

markns avatar Oct 13 '20 19:10 markns

@gharris1727, any advice here?

cyrusv avatar Oct 30 '20 16:10 cyrusv

@markns To load a custom dialect, you will need three components:

  • The Dialect class itself (PostgresInsertOnConflictDoNothingSqlDatabaseDialect)
  • The Dialect Provider (typically PostgresInsertOnConflictDoNothingSqlDatabaseDialect$Provider)
  • A ServiceLoader configuration file (exactly META-INF/services/io.confluent.connect.jdbc.dialect.DatabaseDialectProvider) with a line referencing your custom dialect provider. I don't believe that you need to reference the existing dialect providers, only your new dialect provider.

You can add all of these classes/files in a separate jar in the same directory as the main kafka-connect-jdbc jar, and the SerivceLoader mechanism should be able to pick them up the next time you restart your worker.

gharris1727 avatar Oct 30 '20 16:10 gharris1727

Hey @gharris1727 thanks very much. I'll try it out next week.

markns avatar Oct 30 '20 16:10 markns

@markns Did you ever get your custom dialect to work? I've followed the steps laid out by @gharris1727, but the dialect refuses to load even though the converters and smts I have in the same jar are loaded no problem. Have tried placing the jar in every conceivable place possible (starting with /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib which is where the plugin is loaded from and is on the plugin.path), but to no avail.

caseycrites avatar Dec 23 '21 19:12 caseycrites

After struggling with this for several hours, I realized that the META-INF/services file was getting overridden by a dependency that should've been marked as provided. For anyone who comes upon this thread, make sure you take the jar should not contain any dependencies provided by kafka connect runtime warning seriously.

caseycrites avatar Dec 23 '21 21:12 caseycrites

Hey guys is there's any workaround for this besides the custom dialect?

japier avatar Aug 02 '23 14:08 japier