kafka-connect-jdbc
kafka-connect-jdbc copied to clipboard
Query with WHERE clause cannot be used with incrementing/timestamp mode
docker-compose.yml for env setup is here
Normal query, no predicate:
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_source_postgres_01",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://postgres:5432/postgres",
"connection.user": "connect_user",
"connection.password": "asgard",
"topic.prefix": "postgres-01",
"mode":"incrementing",
"query":"SELECT t.txn_id, t.customer_id, t.amount, t.currency, t.txn_timestamp, c.first_name, c.last_name, c.email, c.gender, c.comments FROM demo.transactions t LEFT OUTER JOIN demo.customers c on t.customer_id = c.id",
"incrementing.column.name": "txn_id",
"validate.non.null": false
}
}'
Works as expected.
Now we want to include a predicate at the DB, e.g. on currency column. Valid postgres query:
postgres=> SELECT t.txn_id, t.customer_id, t.amount, t.currency, t.txn_timestamp, c.first_name, c.last_name, c.email, c.gender, c.comments FROM demo.transactions t LEFT OUTER JOIN demo.customers c on t.customer_id = c.id WHERE t.currency='EUR';
txn_id | customer_id | amount | currency | txn_timestamp | first_name | last_name | email | gender | comments
--------+-------------+--------+----------+----------------------+------------+-----------+----------------------------+--------+------------------------------------------------------
3 | 2 | 17.13 | EUR | 2018-04-30T21:30:39Z | Auberon | Sulland | [email protected] | Male | Organized context-sensitive Graphical User Interface
12 | 4 | -92.57 | EUR | 2018-03-11T07:33:19Z | Nolana | Yeeles | [email protected] | Female | Adaptive real-time archive
Create connector with WHERE clause, but no variable placeholders:
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_source_postgres_02",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://postgres:5432/postgres",
"connection.user": "connect_user",
"connection.password": "asgard",
"topic.prefix": "postgres-02",
"mode":"incrementing",
"query":"SELECT t.txn_id, t.customer_id, t.amount, t.currency, t.txn_timestamp, c.first_name, c.last_name, c.email, c.gender, c.comments FROM demo.transactions t LEFT OUTER JOIN demo.customers c on t.customer_id = c.id WHERE t.currency=\'EUR\'",
"incrementing.column.name": "txn_id",
"validate.non.null": false
}
}'
Fails because the query generated is invalid - two WHERE clauses:
SELECT t.txn_id,
t.customer_id,
t.amount,
t.currency,
t.txn_timestamp,
c.first_name,
c.last_name,
c.email,
c.gender,
c.comments
FROM demo.transactions t
LEFT OUTER JOIN demo.customers c
ON t.customer_id = c.id
WHERE t.currency='EUR'
where "txn_id" > ?
ORDER BY "txn_id" ASC
[2019-01-08 16:34:06,979] DEBUG TimestampIncrementingTableQuerier{table=null, query='SELECT t.txn_id, t.customer_id, t.amount, t.currency, t.txn_timestamp, c.first_name, c.last_name, c.email, c.gender, c.comments FROM demo.transactions t LEFT OUTER JOIN demo.customers c on t.customer_id = c.id WHERE t.currency='EUR'', topicPrefix='postgres-02', incrementingColumn='txn_id', timestampColumns=[]} prepared SQL query: SELECT t.txn_id, t.customer_id, t.amount, t.currency, t.txn_timestamp, c.first_name, c.last_name, c.email, c.gender, c.comments FROM demo.transactions t LEFT OUTER JOIN demo.customers c on t.customer_id = c.id WHERE t.currency='EUR' WHERE "txn_id" > ? ORDER BY "txn_id" ASC (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier)
[2019-01-08 16:34:06,995] ERROR Failed to run query for table TimestampIncrementingTableQuerier{table=null, query='SELECT t.txn_id, t.customer_id, t.amount, t.currency, t.txn_timestamp, c.first_name, c.last_name, c.email, c.gender, c.comments FROM demo.transactions t LEFT OUTER JOIN demo.customers c on t.customer_id = c.id WHERE t.currency='EUR'', topicPrefix='postgres-02', incrementingColumn='txn_id', timestampColumns=[]}: {} (io.confluent.connect.jdbc.source.JdbcSourceTask)
org.postgresql.util.PSQLException: ERROR: syntax error at or near "WHERE"
Position: 234
at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2182)
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:1911)
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:173)
Build variable placeholders into query:
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_source_postgres_03",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://postgres:5432/postgres",
"connection.user": "connect_user",
"connection.password": "asgard",
"topic.prefix": "postgres-03",
"mode":"incrementing",
"query":"SELECT t.txn_id, t.customer_id, t.amount, t.currency, t.txn_timestamp, c.first_name, c.last_name, c.email, c.gender, c.comments FROM demo.transactions t LEFT OUTER JOIN demo.customers c on t.customer_id = c.id WHERE t.currency=\'EUR\' AND t.txn_id > ? ORDER BY t.txn_id ASC",
"incrementing.column.name": "t.txn_id",
"validate.non.null": false
}
}'
Fails because connector still appends its incrementing clause
SELECT t.txn_id,
t.customer_id,
t.amount,
t.currency,
t.txn_timestamp,
c.first_name,
c.last_name,
c.email,
c.gender,
c.comments
FROM demo.transactions t
LEFT OUTER JOIN demo.customers c
ON t.customer_id = c.id
WHERE t.currency='EUR'
AND t.txn_id > ?
ORDER BY t.txn_id ASC
where "t.txn_id" > ?
ORDER BY "t.txn_id" ASC
[2019-01-08 16:42:54,794] DEBUG TimestampIncrementingTableQuerier{table=null, query='SELECT t.txn_id, t.customer_id, t.amount, t.currency, t.txn_timestamp, c.first_name, c.last_name, c.email, c.gender, c.comments FROM demo.transactions t LEFT OUTER JOIN demo.customers c on t.customer_id = c.id WHERE t.currency='EUR' AND t.txn_id > ? ORDER BY t.txn_id ASC', topicPrefix='postgres-03', incrementingColumn='t.txn_id', timestampColumns=[]} prepared SQL query: SELECT t.txn_id, t.customer_id, t.amount, t.currency, t.txn_timestamp, c.first_name, c.last_name, c.email, c.gender, c.comments FROM demo.transactions t LEFT OUTER JOIN demo.customers c on t.customer_id = c.id WHERE t.currency='EUR' AND t.txn_id > ? ORDER BY t.txn_id ASC WHERE "t.txn_id" > ? ORDER BY "t.txn_id" ASC (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier)
[2019-01-08 16:42:54,794] DEBUG Executing prepared statement with incrementing value = -1 (io.confluent.connect.jdbc.source.TimestampIncrementingCriteria)
[2019-01-08 16:42:54,794] ERROR Failed to run query for table TimestampIncrementingTableQuerier{table=null, query='SELECT t.txn_id, t.customer_id, t.amount, t.currency, t.txn_timestamp, c.first_name, c.last_name, c.email, c.gender, c.comments FROM demo.transactions t LEFT OUTER JOIN demo.customers c on t.customer_id = c.id WHERE t.currency='EUR' AND t.txn_id > ? ORDER BY t.txn_id ASC', topicPrefix='postgres-03', incrementingColumn='t.txn_id', timestampColumns=[]}: {} (io.confluent.connect.jdbc.source.JdbcSourceTask)
org.postgresql.util.PSQLException: No value specified for parameter 2.
at org.postgresql.core.v3.SimpleParameterList.checkAllParametersSet(SimpleParameterList.java:228)
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:163)
at org.postgresql.jdbc2.AbstractJdbc2Statement.execute(AbstractJdbc2Statement.java:645)
at org.postgresql.jdbc2.AbstractJdbc2Statement.executeWithFlags(AbstractJdbc2Statement.java:495)
at org.postgresql.jdbc2.AbstractJdbc2Statement.executeQuery(AbstractJdbc2Statement.java:380)
at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.executeQuery(TimestampIncrementingTableQuerier.java:170)
at io.confluent.connect.jdbc.source.TableQuerier.maybeStartQuery(TableQuerier.java:86)
at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.maybeStartQuery(TimestampIncrementingTableQuerier.java:58)
at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:304)
at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:244)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:220)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Currently the docs say
If you use a WHERE clause, it must handle incremental queries itself.
Is this possible with the JDBC connector? Or should the docs be clear that incremental/timestamp modes are mutually exclusive with WHERE in the query?
Did you tried using your query with predicate as inner query? Try this:
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_source_postgres_03",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://postgres:5432/postgres",
"connection.user": "connect_user",
"connection.password": "asgard",
"topic.prefix": "postgres-03",
"mode":"incrementing",
"query":"select * from (SELECT t.txn_id, t.customer_id, t.amount, t.currency, t.txn_timestamp, c.first_name, c.last_name, c.email, c.gender, c.comments FROM demo.transactions t LEFT OUTER JOIN demo.customers c on t.customer_id = c.id WHERE t.currency='EUR') o",
"incrementing.column.name": "t.txn_id",
"validate.non.null": false
}
}'
Okay I will probably burn in hell for putting this on the internet but it works. Calling it solution might not be so accurate..
If you put in your query what the connector wants to append, then comment out everything that's following the way you would do sql injection but in your own code, it will work. As simple as your query -- in your case
SELECT t.txn_id,
t.customer_id,
t.amount,
t.currency,
t.txn_timestamp,
c.first_name,
c.last_name,
c.email,
c.gender,
c.comments
FROM demo.transactions t
LEFT OUTER JOIN demo.customers c
ON t.customer_id = c.id
WHERE t.currency='EUR'
AND t.txn_id > ?
ORDER BY t.txn_id ASC --
Please do not punch me in the face ...
@lenimartin I don't care if you will burn in hell but you are brilliant and awesome :D and I am pretty sure you will have a "solution" to bypass the burning hellfire as well ;)
@lenimartin I can't help it but every time that email comes in front of me it puts a smile on my face :D
@lenimartin great workaround! Hopefully, kafka connect will support WHERE clause with incrementing/timestamp mode in the future.
add -- to the end of query didn't work for me, I have something like this, it worked.
{
"name": "device_detail",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://xxx:3306/device",
"connection.user": "xxxx",
"connection.password": "xxxx",
"topic.prefix": "device_detail_",
"mode": "timestamp",
"timestamp.column.name": "updated_date",
"validate.non.null": "false",
"table.poll.interval.ms": "120000000",
"tasks.max": "5",
"batch.max.rows": "100000",
"query": "select * from (select * from device.device_detail where updated_date >= '2020-10-01') sub_table"
}
}
I had the same issue with timestamp mode, and I had a lot of inner queries plus nested where clauses, I added "--" at the end of my query and it worked many thanks @lenimartin , but be careful the change detection mechanism relied on that appended where clauses, hence it become broken and you will see duplicate events on your output topic.
https://docs.confluent.io/kafka-connectors/jdbc/current/source-connector/overview.html#specifying-a-where-clause-with-query-modes
SELECT * FROM (SELECT ID_COL, TIMESTAMP_COL, COL1, COL2 FROM TABLE_A INNER JOIN TABLE_B ON PK=FK WHERE COL1='FOO');
Seems like this is the approved solution.