kafka-connect-jdbc
kafka-connect-jdbc copied to clipboard
[#1366] - Sink null control (upsert OracleDialect)
Problem
In different insertion, upsert, and deletion queries, there is a need to reimplement the WHERE clause in Sink connectors to ensure that when a null value in a key is received, if the record exists in the database, it is correctly detected. For example: UPDATE <TABLE> SET EXAMPLE=1 WHERE KEY_1=2 AND KEY_2 = NULL. In this case, the clause KEY_2 = NULL does not return results, however, KEY_2 IS NULL does return them correctly.
Solution
A new optional version is implemented (initially with the upsert mode of OracleDatabaseDialect) to address this issue. In this case, in the ON clause of: MERGE INTO <TABLE> using (SELECT...) incoming ON (KEY_1=incoming."KEY_1" AND KEY_2=incoming."KEY_2")
Since the type cannot be inferred, it is converted with the boolean parameter enable.null.key.protection into the following query: MERGE INTO <TABLE> using (SELECT...) incoming ON ((KEY_1=incoming."KEY_1" OR (KEY_1 IS NULL AND incoming."KEY_1" IS NULL)) AND (KEY_2=incoming."KEY_2" OR (KEY_2 IS NULL AND incoming."KEY_2" IS NULL)))
This way, we protect the use case where the key is null. I have implemented it through an additional parameter to avoid sacrificing performance if it's not really necessary.
Is it interesting to continue this process for all upsert, update, and delete operations?
Does this solution apply anywhere else?
- [ ] yes
- [x] no
Test Strategy
Testing done:
- [x] Unit tests
- [ ] Integration tests
- [ ] System tests
- [ ] Manual tests