kafka-connect-jdbc icon indicating copy to clipboard operation
kafka-connect-jdbc copied to clipboard

Query with WHERE clause cannot be used with incrementing/timestamp mode

Open rmoff opened this issue 6 years ago • 8 comments

docker-compose.yml for env setup is here

Normal query, no predicate:

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
        "name": "jdbc_source_postgres_01",
        "config": {
                "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                  "connection.url": "jdbc:postgresql://postgres:5432/postgres",
                "connection.user": "connect_user",
                "connection.password": "asgard",
                "topic.prefix": "postgres-01",
                "mode":"incrementing",
                "query":"SELECT t.txn_id, t.customer_id, t.amount, t.currency, t.txn_timestamp, c.first_name, c.last_name, c.email, c.gender, c.comments FROM demo.transactions t LEFT OUTER JOIN demo.customers c on t.customer_id = c.id",
                "incrementing.column.name": "txn_id",
                "validate.non.null": false
                }
        }'

Works as expected.

Now we want to include a predicate at the DB, e.g. on currency column. Valid postgres query:

postgres=> SELECT t.txn_id, t.customer_id, t.amount, t.currency, t.txn_timestamp, c.first_name, c.last_name, c.email, c.gender, c.comments FROM demo.transactions t LEFT OUTER JOIN demo.customers c on t.customer_id = c.id WHERE t.currency='EUR';
 txn_id | customer_id | amount | currency |    txn_timestamp     | first_name | last_name |           email            | gender |                       comments
--------+-------------+--------+----------+----------------------+------------+-----------+----------------------------+--------+------------------------------------------------------
      3 |           2 |  17.13 | EUR      | 2018-04-30T21:30:39Z | Auberon    | Sulland   | [email protected]   | Male   | Organized context-sensitive Graphical User Interface
     12 |           4 | -92.57 | EUR      | 2018-03-11T07:33:19Z | Nolana     | Yeeles    | [email protected]        | Female | Adaptive real-time archive

Create connector with WHERE clause, but no variable placeholders:

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
        "name": "jdbc_source_postgres_02",
        "config": {
                "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                  "connection.url": "jdbc:postgresql://postgres:5432/postgres",
                "connection.user": "connect_user",
                "connection.password": "asgard",
                "topic.prefix": "postgres-02",
                "mode":"incrementing",
                "query":"SELECT t.txn_id, t.customer_id, t.amount, t.currency, t.txn_timestamp, c.first_name, c.last_name, c.email, c.gender, c.comments FROM demo.transactions t LEFT OUTER JOIN demo.customers c on t.customer_id = c.id WHERE t.currency=\'EUR\'",
                "incrementing.column.name": "txn_id",
                "validate.non.null": false
                }
        }'

Fails because the query generated is invalid - two WHERE clauses:

SELECT          t.txn_id, 
                t.customer_id, 
                t.amount, 
                t.currency, 
                t.txn_timestamp, 
                c.first_name, 
                c.last_name, 
                c.email, 
                c.gender, 
                c.comments 
FROM            demo.transactions t 
LEFT OUTER JOIN demo.customers c 
ON              t.customer_id = c.id 
WHERE           t.currency='EUR' 
where           "txn_id" > ? 
ORDER BY        "txn_id" ASC
 [2019-01-08 16:34:06,979] DEBUG TimestampIncrementingTableQuerier{table=null, query='SELECT t.txn_id, t.customer_id, t.amount, t.currency, t.txn_timestamp, c.first_name, c.last_name, c.email, c.gender, c.comments FROM demo.transactions t LEFT OUTER JOIN demo.customers c on t.customer_id = c.id WHERE t.currency='EUR'', topicPrefix='postgres-02', incrementingColumn='txn_id', timestampColumns=[]} prepared SQL query: SELECT t.txn_id, t.customer_id, t.amount, t.currency, t.txn_timestamp, c.first_name, c.last_name, c.email, c.gender, c.comments FROM demo.transactions t LEFT OUTER JOIN demo.customers c on t.customer_id = c.id WHERE t.currency='EUR' WHERE "txn_id" > ? ORDER BY "txn_id" ASC (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier)
 [2019-01-08 16:34:06,995] ERROR Failed to run query for table TimestampIncrementingTableQuerier{table=null, query='SELECT t.txn_id, t.customer_id, t.amount, t.currency, t.txn_timestamp, c.first_name, c.last_name, c.email, c.gender, c.comments FROM demo.transactions t LEFT OUTER JOIN demo.customers c on t.customer_id = c.id WHERE t.currency='EUR'', topicPrefix='postgres-02', incrementingColumn='txn_id', timestampColumns=[]}: {} (io.confluent.connect.jdbc.source.JdbcSourceTask)
 org.postgresql.util.PSQLException: ERROR: syntax error at or near "WHERE"
   Position: 234
  at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2182)
  at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:1911)
  at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:173)

Build variable placeholders into query:

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
        "name": "jdbc_source_postgres_03",
        "config": {
                "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                  "connection.url": "jdbc:postgresql://postgres:5432/postgres",
                "connection.user": "connect_user",
                "connection.password": "asgard",
                "topic.prefix": "postgres-03",
                "mode":"incrementing",
                "query":"SELECT t.txn_id, t.customer_id, t.amount, t.currency, t.txn_timestamp, c.first_name, c.last_name, c.email, c.gender, c.comments FROM demo.transactions t LEFT OUTER JOIN demo.customers c on t.customer_id = c.id WHERE t.currency=\'EUR\' AND t.txn_id > ? ORDER BY t.txn_id ASC",
                "incrementing.column.name": "t.txn_id",
                "validate.non.null": false
                }
        }'

Fails because connector still appends its incrementing clause

SELECT          t.txn_id, 
                t.customer_id, 
                t.amount, 
                t.currency, 
                t.txn_timestamp, 
                c.first_name, 
                c.last_name, 
                c.email, 
                c.gender, 
                c.comments 
FROM            demo.transactions t 
LEFT OUTER JOIN demo.customers c 
ON              t.customer_id = c.id 
WHERE           t.currency='EUR' 
AND             t.txn_id > ? 
ORDER BY        t.txn_id ASC 
where           "t.txn_id" > ? 
ORDER BY        "t.txn_id" ASC
[2019-01-08 16:42:54,794] DEBUG TimestampIncrementingTableQuerier{table=null, query='SELECT t.txn_id, t.customer_id, t.amount, t.currency, t.txn_timestamp, c.first_name, c.last_name, c.email, c.gender, c.comments FROM demo.transactions t LEFT OUTER JOIN demo.customers c on t.customer_id = c.id WHERE t.currency='EUR' AND t.txn_id > ? ORDER BY t.txn_id ASC', topicPrefix='postgres-03', incrementingColumn='t.txn_id', timestampColumns=[]} prepared SQL query: SELECT t.txn_id, t.customer_id, t.amount, t.currency, t.txn_timestamp, c.first_name, c.last_name, c.email, c.gender, c.comments FROM demo.transactions t LEFT OUTER JOIN demo.customers c on t.customer_id = c.id WHERE t.currency='EUR' AND t.txn_id > ? ORDER BY t.txn_id ASC WHERE "t.txn_id" > ? ORDER BY "t.txn_id" ASC (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier)
[2019-01-08 16:42:54,794] DEBUG Executing prepared statement with incrementing value = -1 (io.confluent.connect.jdbc.source.TimestampIncrementingCriteria)
[2019-01-08 16:42:54,794] ERROR Failed to run query for table TimestampIncrementingTableQuerier{table=null, query='SELECT t.txn_id, t.customer_id, t.amount, t.currency, t.txn_timestamp, c.first_name, c.last_name, c.email, c.gender, c.comments FROM demo.transactions t LEFT OUTER JOIN demo.customers c on t.customer_id = c.id WHERE t.currency='EUR' AND t.txn_id > ? ORDER BY t.txn_id ASC', topicPrefix='postgres-03', incrementingColumn='t.txn_id', timestampColumns=[]}: {} (io.confluent.connect.jdbc.source.JdbcSourceTask)
org.postgresql.util.PSQLException: No value specified for parameter 2.
 at org.postgresql.core.v3.SimpleParameterList.checkAllParametersSet(SimpleParameterList.java:228)
 at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:163)
 at org.postgresql.jdbc2.AbstractJdbc2Statement.execute(AbstractJdbc2Statement.java:645)
 at org.postgresql.jdbc2.AbstractJdbc2Statement.executeWithFlags(AbstractJdbc2Statement.java:495)
 at org.postgresql.jdbc2.AbstractJdbc2Statement.executeQuery(AbstractJdbc2Statement.java:380)
 at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.executeQuery(TimestampIncrementingTableQuerier.java:170)
 at io.confluent.connect.jdbc.source.TableQuerier.maybeStartQuery(TableQuerier.java:86)
 at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.maybeStartQuery(TimestampIncrementingTableQuerier.java:58)
 at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:304)
 at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:244)
 at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:220)
 at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
 at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)

Currently the docs say

If you use a WHERE clause, it must handle incremental queries itself.

Is this possible with the JDBC connector? Or should the docs be clear that incremental/timestamp modes are mutually exclusive with WHERE in the query?

rmoff avatar Jan 08 '19 16:01 rmoff

Did you tried using your query with predicate as inner query? Try this:

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
        "name": "jdbc_source_postgres_03",
        "config": {
                "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                  "connection.url": "jdbc:postgresql://postgres:5432/postgres",
                "connection.user": "connect_user",
                "connection.password": "asgard",
                "topic.prefix": "postgres-03",
                "mode":"incrementing",
                "query":"select * from (SELECT t.txn_id, t.customer_id, t.amount, t.currency, t.txn_timestamp, c.first_name, c.last_name, c.email, c.gender, c.comments FROM demo.transactions t LEFT OUTER JOIN demo.customers c on t.customer_id = c.id WHERE t.currency='EUR') o",
                "incrementing.column.name": "t.txn_id",
                "validate.non.null": false
                }
        }'

aliasbadwolf avatar Mar 06 '19 07:03 aliasbadwolf

Okay I will probably burn in hell for putting this on the internet but it works. Calling it solution might not be so accurate.. If you put in your query what the connector wants to append, then comment out everything that's following the way you would do sql injection but in your own code, it will work. As simple as your query -- in your case

SELECT          t.txn_id, 
                t.customer_id, 
                t.amount, 
                t.currency, 
                t.txn_timestamp, 
                c.first_name, 
                c.last_name, 
                c.email, 
                c.gender, 
                c.comments 
FROM            demo.transactions t 
LEFT OUTER JOIN demo.customers c 
ON              t.customer_id = c.id 
WHERE           t.currency='EUR' 
AND             t.txn_id > ? 
ORDER BY        t.txn_id ASC --

Please do not punch me in the face ...

lenimartin avatar Nov 07 '19 00:11 lenimartin

@lenimartin I don't care if you will burn in hell but you are brilliant and awesome :D and I am pretty sure you will have a "solution" to bypass the burning hellfire as well ;)

aliasbadwolf avatar Nov 07 '19 02:11 aliasbadwolf

@lenimartin I can't help it but every time that email comes in front of me it puts a smile on my face :D

aliasbadwolf avatar Nov 07 '19 04:11 aliasbadwolf

@lenimartin great workaround! Hopefully, kafka connect will support WHERE clause with incrementing/timestamp mode in the future.

msalaslo avatar Oct 14 '20 17:10 msalaslo

add -- to the end of query didn't work for me, I have something like this, it worked.

{
  "name": "device_detail",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:mysql://xxx:3306/device",
    "connection.user": "xxxx",
    "connection.password": "xxxx",
    "topic.prefix": "device_detail_",
    "mode": "timestamp",
    "timestamp.column.name": "updated_date",
    "validate.non.null": "false",
    "table.poll.interval.ms": "120000000",
    "tasks.max": "5",
    "batch.max.rows": "100000",
    "query": "select * from (select * from device.device_detail where updated_date >= '2020-10-01') sub_table"
  }
}

jiegzhan avatar Oct 23 '20 17:10 jiegzhan

I had the same issue with timestamp mode, and I had a lot of inner queries plus nested where clauses, I added "--" at the end of my query and it worked many thanks @lenimartin , but be careful the change detection mechanism relied on that appended where clauses, hence it become broken and you will see duplicate events on your output topic.

JalalSordo avatar Mar 14 '22 01:03 JalalSordo

https://docs.confluent.io/kafka-connectors/jdbc/current/source-connector/overview.html#specifying-a-where-clause-with-query-modes

SELECT * FROM (SELECT ID_COL, TIMESTAMP_COL, COL1, COL2 FROM TABLE_A INNER JOIN TABLE_B ON PK=FK WHERE COL1='FOO');

Seems like this is the approved solution.

WallsTalk avatar Mar 13 '24 06:03 WallsTalk