kafka-connect-jdbc
kafka-connect-jdbc copied to clipboard
JDBC Source using query is not fetching any records for Mariadb
Here is connector configuration.
name=tx_user_nw_source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
connection.url=jdbc:mysql://localhost:3306/?user=user&password=password&autoReconnect=true&useSSL=false&useUnicode=yes&characterEncoding=UTF-8&useLegacyDatetimeCode=false&serverTimezone=EST5EDT
mode=timestamp
query=SELECT A.USER_ID, C.TX_ID, C.MAST_SUB_ID, C.MAST_ID, C.STATUS_CODE, EXTRACTVALUE(P.USER_XML, '/CAT/SUB_CAT[@name=\"source\"]') AS CS_SOURCE, EXTRACTVALUE(P.USER_XML, '/CAT/SUB_CAT[@name=\"DIV\"]') AS DIV, EXTRACTVALUE(P.USER_XML, '/CAT/SUB_CAT[@name=\"SUB_DIV\"]') AS SUB_DIV C.COMMENT, C.CREATE_DATE, C.CREATE_DATETIME, C.MOD_DATETIME FROM USERS.USER A JOIN TXM.TXMANAGER CM ON (CM.USER = A.USER) JOIN TXM.SUBTXMANAGER C ON (C.MAST_ID = CM.MAST_ID) JOIN TXM.USER_XMLS P ON (P.MAST_SUB_ID = C.MAST_SUB_ID)
timestamp.column.name=C.MOD_DATETIME
topic.prefix=tx_user_nw
quote.sql.identifiers=never
tasks.max=1
I don't see any error in connect.log file. There are millions of record in the table but this sql doesn't return any records. I tried following configuration and it worked
name=tx_user_nw_source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
connection.url=jdbc:mysql://localhost:3306/?user=user&password=password&autoReconnect=true&useSSL=false&useUnicode=yes&characterEncoding=UTF-8&useLegacyDatetimeCode=false&serverTimezone=EST5EDT
mode=bulk
query=SELECT A.USER_ID, C.TX_ID, C.MAST_SUB_ID, C.MAST_ID, C.STATUS_CODE, EXTRACTVALUE(P.USER_XML, '/CAT/SUB_CAT[@name=\"source\"]') AS CS_SOURCE, EXTRACTVALUE(P.USER_XML, '/CAT/SUB_CAT[@name=\"DIV\"]') AS DIV, EXTRACTVALUE(P.USER_XML, '/CAT/SUB_CAT[@name=\"SUB_DIV\"]') AS SUB_DIV C.COMMENT, C.CREATE_DATE, C.CREATE_DATETIME, C.MOD_DATETIME FROM USERS.USER A JOIN TXM.TXMANAGER CM ON (CM.USER = A.USER) JOIN TXM.SUBTXMANAGER C ON (C.MAST_ID = CM.MAST_ID) JOIN TXM.USER_XMLS P ON (P.MAST_SUB_ID = C.MAST_SUB_ID) WHERE C.TX_ID=123
topic.prefix=tx_user_nw
quote.sql.identifiers=never
tasks.max=1
Here is some log
[2020-04-14 22:39:27,812] INFO Started JDBC source task (io.confluent.connect.jdbc.source.JdbcSourceTask:245)
[2020-04-14 22:39:27,814] INFO WorkerSourceTask{id=tx_user_nw_source-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:209)
[2020-04-14 22:39:27,815] INFO Attempting to open connection #1 to MySql (io.confluent.connect.jdbc.util.CachedConnectionProvider:92)
[2020-04-14 22:39:27,847] INFO Kafka version: 2.4.1 (org.apache.kafka.common.utils.AppInfoParser:117)
[2020-04-14 22:39:27,847] INFO Kafka commitId: c57222ae8cd7866b (org.apache.kafka.common.utils.AppInfoParser:118)
[2020-04-14 22:39:27,847] INFO Kafka startTimeMs: 1586918367847 (org.apache.kafka.common.utils.AppInfoParser:119)
[2020-04-14 22:39:27,853] INFO Created connector tx_user_nw_sink (org.apache.kafka.connect.cli.ConnectStandalone:112)
[2020-04-14 22:39:27,855] INFO [Consumer clientId=connector-consumer-tx_user_nw_sink-0, groupId=connect-tx_user_nw_sink] Subscribed to topic(s): tx_comms_nw (org.apache.kafka.clients.consumer.KafkaConsumer:969)
[2020-04-14 22:39:27,855] INFO Starting JDBC Sink task (io.confluent.connect.jdbc.sink.JdbcSinkTask:44)
[2020-04-14 22:39:27,856] INFO JdbcSinkConfig values:
auto.create = true
auto.evolve = true
batch.size = 3000
connection.password = null
connection.url = jdbc:mysql://localhost:3306/test?user=user&password=password?autoReconnect=true&useSSL=false&useUnicode=yes&characterEncoding=UTF-8&useLegacyDatetimeCode=false&serverTimezone=EST5EDT
connection.user = null
db.timezone = UTC
delete.enabled = false
dialect.name =
fields.whitelist = []
insert.mode = upsert
max.retries = 10
pk.fields = [COMM_ID]
pk.mode = record_value
quote.sql.identifiers = ALWAYS
retry.backoff.ms = 3000
table.name.format = test.tx_users
(io.confluent.connect.jdbc.sink.JdbcSinkConfig:347)
[2020-04-14 22:39:27,857] INFO Initializing writer using SQL dialect: MySqlDatabaseDialect (io.confluent.connect.jdbc.sink.JdbcSinkTask:57)
[2020-04-14 22:39:27,858] INFO WorkerSinkTask{id=tx_user_nw_sink-0} Sink task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:302)
[2020-04-14 22:39:27,871] INFO [Consumer clientId=connector-consumer-tx_user_nw_sink-0, groupId=connect-tx_user_nw_sink] Cluster ID: fgPFyIKWQqWOaJE8o9pCkQ (org.apache.kafka.clients.Metadata:259)
[2020-04-14 22:39:27,872] INFO [Consumer clientId=connector-consumer-tx_user_nw_sink-0, groupId=connect-tx_user_nw_sink] Discovered group coordinator sandys-mbp.fios-router.home:9092 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:756)
Note - gone thru existing related issues but none helped to resolve this issue
Yeah I read through #442 and it seems like you're correctly disabling the quoting, otherwise the connector would throw an exception almost immediately.
The logs that you're showing don't reveal whether the connector has started to process any rows, and this might be because the log level isn't high enough. I would recommend following this guide https://rmoff.net/post/kafka-connect-change-log-level-and-write-log-to-file/ to change the log level to DEBUG or TRACE for io.confluent.connect.jdbc.source
to see whats happening.
You also mentioned that there are millions of records in the table, do you have an index on the timestamp column? The query is going to be adding an ORDER BY ... ASC
clause on that column, and if that's taking too long to execute, the connector may end up stalling or timing out.
Thanks for your time. I'm attaching logs, hope this helps. I see one ERROR on line 918, source was working prior to this error. I'll try to get answers to you question and update here. connector-source-sink.log
Yes, table have index on id and update date. I tried with different configurations, viz. timestamp+incrementing, timestamp and incrementing. None worked.
Here is another error if mode is incrementing, rest of the configuration is same. This is unusual behavior, the same connector is working under confluent platform, but not working with apache Kafka. Hope details provided so far helps to get some insights on issue.
[2020-04-15 23:20:17,290] ERROR WorkerSourceTask{id=tx_user_nw_source-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
org.apache.kafka.connect.errors.DataException: Incrementing column A.USER_ID not found in A.USER_ID, C.TX_ID, C.MAST_SUB_ID, C.MAST_ID, C.STATUS_CODE, EXTRACTVALUE(P.USER_XML, '/CAT/SUB_CAT[@name=\"source\"]') AS CS_SOURCE, EXTRACTVALUE(P.USER_XML, '/CAT/SUB_CAT[@name=\"DIV\"]') AS DIV, EXTRACTVALUE(P.USER_XML, '/CAT/SUB_CAT[@name=\"SUB_DIV\"]') AS SUB_DIV C.COMMENT, C.CREATE_DATE, C.CREATE_DATETIME, C.MOD_DATETIME FROM USERS.USER A JOIN TXM.TXMANAGER CM ON (CM.USER = A.USER) JOIN TXM.SUBTXMANAGER C ON (C.MAST_ID = CM.MAST_ID) JOIN TXM.USER_XMLS P ON (P.MAST_SUB_ID = C.MAST_SUB_ID)
at io.confluent.connect.jdbc.source.TimestampIncrementingCriteria.extractOffsetIncrementedId(TimestampIncrementingCriteria.java:243)
at io.confluent.connect.jdbc.source.TimestampIncrementingCriteria.extractValues(TimestampIncrementingCriteria.java:198)
at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.extractRecord(TimestampIncrementingTableQuerier.java:196)
at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:326)
at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:265)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:232)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2020-04-15 23:20:17,292] ERROR WorkerSourceTask{id=tx_user_nw_source-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:180)
[2020-04-15 23:20:17,292] INFO Stopping JDBC source task (io.confluent.connect.jdbc.source.JdbcSourceTask:270)
[2020-04-15 23:20:17,292] INFO [Producer clientId=connector-producer-tx_user_nw_source-0] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:1183)
@gharris1727 Hello, did you find something? I was going thru code for Apache Kafka and Confluent Kafka, it looks like issue is related to schema and column name. For example, in schema keys is 'Mod_DATETIME' and in in sql its C.MOD_DATETIME.
Yeah, I think quote.sql.identifiers=never lets the database take over the case conversions, and it's possible that mariadb is defaulting to printing in all-caps.
Looking at some tests and examples, when executing a JOIN, I only ever see "timestamp.column.name" set to the column name without the table qualifier. So for you, maybe try specifying "MOD_DATETIME" instead of "C.MOD_DATETIME".
With MOD_DATETIME, sql errors out with message “ambiguous columns name ‘MOD_DATETIME’l. Here is another surprise for me. I downloaded confluent Kafka, common and Kafka JDBC connect project and build it locally to execute same configuration, it worked once. But all successive attempts where error related to MOD_DATETIME column. The query printed in log executes on sql command. The same SQL works with ID column and incremental mode. I changed topic, source and sink connector name for every run. Additionally, I cleaned zookeeper and Kafka logs for every run. Confirmed with Kafka Tool that no topic or connector exists before run. I don’t understand why it’s behaving differently for time stamp column. Appreciate your comments.
@gharris1727 Here is more information about error.
- If timestamp.column.name=MOD_DATETIME. This error make sense as MYSQL couldn't figure out table for column. "Column 'MOD_DATETIME' in where clause is ambiguous"
com.mysql.jdbc.exceptions.jdbc4.MySQLIntegrityConstraintViolationException: Column 'MOD_DATETIME' in where clause is ambiguous
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at com.mysql.jdbc.Util.handleNewInstance(Util.java:411)
at com.mysql.jdbc.Util.getInstance(Util.java:386)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1039)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3609)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3541)
at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2002)
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2163)
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2624)
at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:2127)
at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:2293)
at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.executeQuery(TimestampIncrementingTableQuerier.java:181)
at io.confluent.connect.jdbc.source.TableQuerier.maybeStartQuery(TableQuerier.java:98)
at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.maybeStartQuery(TimestampIncrementingTableQuerier.java:61)
at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:364)
at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:270)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:237)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
^C[2020-04-20 22:55:59,326] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect:67)
- If timestamp.column.name=C.MOD_DATETIME. This error I don't understand, if column is present. "DataException: C.MOD_DATETIME is not a valid field name"
[2020-04-20 23:02:26,719] INFO WorkerSourceTask{id=conenct_source_1040-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:441)
[2020-04-20 23:03:26,722] INFO WorkerSourceTask{id=conenct_source_1040-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:424)
[2020-04-20 23:03:26,723] INFO WorkerSourceTask{id=conenct_source_1040-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:441)
[2020-04-20 23:04:26,726] INFO WorkerSourceTask{id=conenct_source_1040-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:424)
[2020-04-20 23:04:26,870] INFO WorkerSourceTask{id=conenct_source_1040-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:441)
[2020-04-20 23:05:13,967] INFO Closing resources for JDBC source task (io.confluent.connect.jdbc.source.JdbcSourceTask:320)
[2020-04-20 23:05:13,967] INFO Closing connection #1 to Oracle (io.confluent.connect.jdbc.util.CachedConnectionProvider:118)
[2020-04-20 23:05:13,968] INFO WorkerSourceTask{id=conenct_source_1040-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:424)
[2020-04-20 23:05:13,968] INFO WorkerSourceTask{id=conenct_source_1040-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:441)
[2020-04-20 23:05:13,969] ERROR WorkerSourceTask{id=conenct_source_1040-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:186)
org.apache.kafka.connect.errors.DataException: C.MOD_DATETIME is not a valid field name**
at org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254)
at org.apache.kafka.connect.data.Struct.get(Struct.java:74)
at io.confluent.connect.jdbc.source.TimestampIncrementingCriteria.extractOffsetTimestamp(TimestampIncrementingCriteria.java:231)
at io.confluent.connect.jdbc.source.TimestampIncrementingCriteria.extractValues(TimestampIncrementingCriteria.java:194)
at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.extractRecord(TimestampIncrementingTableQuerier.java:199)
at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:369)
at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:270)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:237)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2020-04-20 23:05:13,971] ERROR WorkerSourceTask{id=conenct_source_1040-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:187)
[2020-04-20 23:05:13,971] INFO Stopping JDBC source task (io.confluent.connect.jdbc.source.JdbcSourceTask:313)
[2020-04-20 23:05:13,971] INFO [Producer clientId=connector-producer-conenct_source_1040-0] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:1183)
hi @sandycomp , Were you able to find a solution of this? I am also stuck with exactly same issue. If I give the increment column name only then the error I get is: "ambiguous column" and if I give incrementing column with alias then the error is: "not a valid field name" Not sure how to to proceed.