kafka-connect-jdbc icon indicating copy to clipboard operation
kafka-connect-jdbc copied to clipboard

JDBC Source using query is not fetching any records for Mariadb

Open sandycomp opened this issue 4 years ago • 8 comments

Here is connector configuration.

name=tx_user_nw_source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
connection.url=jdbc:mysql://localhost:3306/?user=user&password=password&autoReconnect=true&useSSL=false&useUnicode=yes&characterEncoding=UTF-8&useLegacyDatetimeCode=false&serverTimezone=EST5EDT
mode=timestamp
query=SELECT A.USER_ID, C.TX_ID, C.MAST_SUB_ID, C.MAST_ID, C.STATUS_CODE,  EXTRACTVALUE(P.USER_XML, '/CAT/SUB_CAT[@name=\"source\"]') AS CS_SOURCE, EXTRACTVALUE(P.USER_XML, '/CAT/SUB_CAT[@name=\"DIV\"]') AS DIV, EXTRACTVALUE(P.USER_XML, '/CAT/SUB_CAT[@name=\"SUB_DIV\"]') AS SUB_DIV C.COMMENT, C.CREATE_DATE, C.CREATE_DATETIME, C.MOD_DATETIME FROM USERS.USER A JOIN TXM.TXMANAGER CM ON (CM.USER = A.USER) JOIN TXM.SUBTXMANAGER C ON (C.MAST_ID = CM.MAST_ID) JOIN TXM.USER_XMLS P ON (P.MAST_SUB_ID = C.MAST_SUB_ID)
timestamp.column.name=C.MOD_DATETIME
topic.prefix=tx_user_nw
quote.sql.identifiers=never
tasks.max=1

I don't see any error in connect.log file. There are millions of record in the table but this sql doesn't return any records. I tried following configuration and it worked

name=tx_user_nw_source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
connection.url=jdbc:mysql://localhost:3306/?user=user&password=password&autoReconnect=true&useSSL=false&useUnicode=yes&characterEncoding=UTF-8&useLegacyDatetimeCode=false&serverTimezone=EST5EDT
mode=bulk
query=SELECT A.USER_ID, C.TX_ID, C.MAST_SUB_ID, C.MAST_ID, C.STATUS_CODE,  EXTRACTVALUE(P.USER_XML, '/CAT/SUB_CAT[@name=\"source\"]') AS CS_SOURCE, EXTRACTVALUE(P.USER_XML, '/CAT/SUB_CAT[@name=\"DIV\"]') AS DIV, EXTRACTVALUE(P.USER_XML, '/CAT/SUB_CAT[@name=\"SUB_DIV\"]') AS SUB_DIV C.COMMENT, C.CREATE_DATE, C.CREATE_DATETIME, C.MOD_DATETIME FROM USERS.USER A JOIN TXM.TXMANAGER CM ON (CM.USER = A.USER) JOIN TXM.SUBTXMANAGER C ON (C.MAST_ID = CM.MAST_ID) JOIN TXM.USER_XMLS P ON (P.MAST_SUB_ID = C.MAST_SUB_ID) WHERE C.TX_ID=123 
topic.prefix=tx_user_nw
quote.sql.identifiers=never
tasks.max=1

Here is some log

[2020-04-14 22:39:27,812] INFO Started JDBC source task (io.confluent.connect.jdbc.source.JdbcSourceTask:245)
[2020-04-14 22:39:27,814] INFO WorkerSourceTask{id=tx_user_nw_source-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:209)
[2020-04-14 22:39:27,815] INFO Attempting to open connection #1 to MySql (io.confluent.connect.jdbc.util.CachedConnectionProvider:92)
[2020-04-14 22:39:27,847] INFO Kafka version: 2.4.1 (org.apache.kafka.common.utils.AppInfoParser:117)
[2020-04-14 22:39:27,847] INFO Kafka commitId: c57222ae8cd7866b (org.apache.kafka.common.utils.AppInfoParser:118)
[2020-04-14 22:39:27,847] INFO Kafka startTimeMs: 1586918367847 (org.apache.kafka.common.utils.AppInfoParser:119)
[2020-04-14 22:39:27,853] INFO Created connector tx_user_nw_sink (org.apache.kafka.connect.cli.ConnectStandalone:112)
[2020-04-14 22:39:27,855] INFO [Consumer clientId=connector-consumer-tx_user_nw_sink-0, groupId=connect-tx_user_nw_sink] Subscribed to topic(s): tx_comms_nw (org.apache.kafka.clients.consumer.KafkaConsumer:969)
[2020-04-14 22:39:27,855] INFO Starting JDBC Sink task (io.confluent.connect.jdbc.sink.JdbcSinkTask:44)
[2020-04-14 22:39:27,856] INFO JdbcSinkConfig values: 
	auto.create = true
	auto.evolve = true
	batch.size = 3000
	connection.password = null
	connection.url = jdbc:mysql://localhost:3306/test?user=user&password=password?autoReconnect=true&useSSL=false&useUnicode=yes&characterEncoding=UTF-8&useLegacyDatetimeCode=false&serverTimezone=EST5EDT
	connection.user = null
	db.timezone = UTC
	delete.enabled = false
	dialect.name = 
	fields.whitelist = []
	insert.mode = upsert
	max.retries = 10
	pk.fields = [COMM_ID]
	pk.mode = record_value
	quote.sql.identifiers = ALWAYS
	retry.backoff.ms = 3000
	table.name.format = test.tx_users
 (io.confluent.connect.jdbc.sink.JdbcSinkConfig:347)
[2020-04-14 22:39:27,857] INFO Initializing writer using SQL dialect: MySqlDatabaseDialect (io.confluent.connect.jdbc.sink.JdbcSinkTask:57)
[2020-04-14 22:39:27,858] INFO WorkerSinkTask{id=tx_user_nw_sink-0} Sink task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:302)
[2020-04-14 22:39:27,871] INFO [Consumer clientId=connector-consumer-tx_user_nw_sink-0, groupId=connect-tx_user_nw_sink] Cluster ID: fgPFyIKWQqWOaJE8o9pCkQ (org.apache.kafka.clients.Metadata:259)
[2020-04-14 22:39:27,872] INFO [Consumer clientId=connector-consumer-tx_user_nw_sink-0, groupId=connect-tx_user_nw_sink] Discovered group coordinator sandys-mbp.fios-router.home:9092 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:756)

Note - gone thru existing related issues but none helped to resolve this issue

sandycomp avatar Apr 15 '20 02:04 sandycomp

Yeah I read through #442 and it seems like you're correctly disabling the quoting, otherwise the connector would throw an exception almost immediately.

The logs that you're showing don't reveal whether the connector has started to process any rows, and this might be because the log level isn't high enough. I would recommend following this guide https://rmoff.net/post/kafka-connect-change-log-level-and-write-log-to-file/ to change the log level to DEBUG or TRACE for io.confluent.connect.jdbc.source to see whats happening.

You also mentioned that there are millions of records in the table, do you have an index on the timestamp column? The query is going to be adding an ORDER BY ... ASC clause on that column, and if that's taking too long to execute, the connector may end up stalling or timing out.

gharris1727 avatar Apr 15 '20 07:04 gharris1727

Thanks for your time. I'm attaching logs, hope this helps. I see one ERROR on line 918, source was working prior to this error. I'll try to get answers to you question and update here. connector-source-sink.log

Yes, table have index on id and update date. I tried with different configurations, viz. timestamp+incrementing, timestamp and incrementing. None worked.

sandycomp avatar Apr 15 '20 13:04 sandycomp

Here is another error if mode is incrementing, rest of the configuration is same. This is unusual behavior, the same connector is working under confluent platform, but not working with apache Kafka. Hope details provided so far helps to get some insights on issue.

[2020-04-15 23:20:17,290] ERROR WorkerSourceTask{id=tx_user_nw_source-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
org.apache.kafka.connect.errors.DataException: Incrementing column A.USER_ID not found in A.USER_ID, C.TX_ID, C.MAST_SUB_ID, C.MAST_ID, C.STATUS_CODE,  EXTRACTVALUE(P.USER_XML, '/CAT/SUB_CAT[@name=\"source\"]') AS CS_SOURCE, EXTRACTVALUE(P.USER_XML, '/CAT/SUB_CAT[@name=\"DIV\"]') AS DIV, EXTRACTVALUE(P.USER_XML, '/CAT/SUB_CAT[@name=\"SUB_DIV\"]') AS SUB_DIV C.COMMENT, C.CREATE_DATE, C.CREATE_DATETIME, C.MOD_DATETIME FROM USERS.USER A JOIN TXM.TXMANAGER CM ON (CM.USER = A.USER) JOIN TXM.SUBTXMANAGER C ON (C.MAST_ID = CM.MAST_ID) JOIN TXM.USER_XMLS P ON (P.MAST_SUB_ID = C.MAST_SUB_ID)
	at io.confluent.connect.jdbc.source.TimestampIncrementingCriteria.extractOffsetIncrementedId(TimestampIncrementingCriteria.java:243)
	at io.confluent.connect.jdbc.source.TimestampIncrementingCriteria.extractValues(TimestampIncrementingCriteria.java:198)
	at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.extractRecord(TimestampIncrementingTableQuerier.java:196)
	at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:326)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:265)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:232)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
[2020-04-15 23:20:17,292] ERROR WorkerSourceTask{id=tx_user_nw_source-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:180)
[2020-04-15 23:20:17,292] INFO Stopping JDBC source task (io.confluent.connect.jdbc.source.JdbcSourceTask:270)
[2020-04-15 23:20:17,292] INFO [Producer clientId=connector-producer-tx_user_nw_source-0] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:1183)

sandycomp avatar Apr 16 '20 04:04 sandycomp

@gharris1727 Hello, did you find something? I was going thru code for Apache Kafka and Confluent Kafka, it looks like issue is related to schema and column name. For example, in schema keys is 'Mod_DATETIME' and in in sql its C.MOD_DATETIME.

sandycomp avatar Apr 20 '20 15:04 sandycomp

Yeah, I think quote.sql.identifiers=never lets the database take over the case conversions, and it's possible that mariadb is defaulting to printing in all-caps.

Looking at some tests and examples, when executing a JOIN, I only ever see "timestamp.column.name" set to the column name without the table qualifier. So for you, maybe try specifying "MOD_DATETIME" instead of "C.MOD_DATETIME".

gharris1727 avatar Apr 20 '20 17:04 gharris1727

With MOD_DATETIME, sql errors out with message “ambiguous columns name ‘MOD_DATETIME’l. Here is another surprise for me. I downloaded confluent Kafka, common and Kafka JDBC connect project and build it locally to execute same configuration, it worked once. But all successive attempts where error related to MOD_DATETIME column. The query printed in log executes on sql command. The same SQL works with ID column and incremental mode. I changed topic, source and sink connector name for every run. Additionally, I cleaned zookeeper and Kafka logs for every run. Confirmed with Kafka Tool that no topic or connector exists before run. I don’t understand why it’s behaving differently for time stamp column. Appreciate your comments.

sandycomp avatar Apr 22 '20 03:04 sandycomp

@gharris1727 Here is more information about error.

  1. If timestamp.column.name=MOD_DATETIME. This error make sense as MYSQL couldn't figure out table for column. "Column 'MOD_DATETIME' in where clause is ambiguous"
com.mysql.jdbc.exceptions.jdbc4.MySQLIntegrityConstraintViolationException: Column 'MOD_DATETIME' in where clause is ambiguous
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at com.mysql.jdbc.Util.handleNewInstance(Util.java:411)
	at com.mysql.jdbc.Util.getInstance(Util.java:386)
	at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1039)
	at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3609)
	at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3541)
	at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2002)
	at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2163)
	at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2624)
	at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:2127)
	at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:2293)
	at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.executeQuery(TimestampIncrementingTableQuerier.java:181)
	at io.confluent.connect.jdbc.source.TableQuerier.maybeStartQuery(TableQuerier.java:98)
	at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.maybeStartQuery(TimestampIncrementingTableQuerier.java:61)
	at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:364)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:270)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:237)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
^C[2020-04-20 22:55:59,326] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect:67)
  1. If timestamp.column.name=C.MOD_DATETIME. This error I don't understand, if column is present. "DataException: C.MOD_DATETIME is not a valid field name"
[2020-04-20 23:02:26,719] INFO WorkerSourceTask{id=conenct_source_1040-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:441)
[2020-04-20 23:03:26,722] INFO WorkerSourceTask{id=conenct_source_1040-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:424)
[2020-04-20 23:03:26,723] INFO WorkerSourceTask{id=conenct_source_1040-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:441)
[2020-04-20 23:04:26,726] INFO WorkerSourceTask{id=conenct_source_1040-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:424)
[2020-04-20 23:04:26,870] INFO WorkerSourceTask{id=conenct_source_1040-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:441)
[2020-04-20 23:05:13,967] INFO Closing resources for JDBC source task (io.confluent.connect.jdbc.source.JdbcSourceTask:320)
[2020-04-20 23:05:13,967] INFO Closing connection #1 to Oracle (io.confluent.connect.jdbc.util.CachedConnectionProvider:118)
[2020-04-20 23:05:13,968] INFO WorkerSourceTask{id=conenct_source_1040-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:424)
[2020-04-20 23:05:13,968] INFO WorkerSourceTask{id=conenct_source_1040-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:441)
[2020-04-20 23:05:13,969] ERROR WorkerSourceTask{id=conenct_source_1040-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:186)
org.apache.kafka.connect.errors.DataException: C.MOD_DATETIME is not a valid field name**
	at org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254)
	at org.apache.kafka.connect.data.Struct.get(Struct.java:74)
	at io.confluent.connect.jdbc.source.TimestampIncrementingCriteria.extractOffsetTimestamp(TimestampIncrementingCriteria.java:231)
	at io.confluent.connect.jdbc.source.TimestampIncrementingCriteria.extractValues(TimestampIncrementingCriteria.java:194)
	at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.extractRecord(TimestampIncrementingTableQuerier.java:199)
	at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:369)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:270)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:237)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
[2020-04-20 23:05:13,971] ERROR WorkerSourceTask{id=conenct_source_1040-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:187)
[2020-04-20 23:05:13,971] INFO Stopping JDBC source task (io.confluent.connect.jdbc.source.JdbcSourceTask:313)
[2020-04-20 23:05:13,971] INFO [Producer clientId=connector-producer-conenct_source_1040-0] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:1183)

sandycomp avatar Apr 22 '20 18:04 sandycomp

hi @sandycomp , Were you able to find a solution of this? I am also stuck with exactly same issue. If I give the increment column name only then the error I get is: "ambiguous column" and if I give incrementing column with alias then the error is: "not a valid field name" Not sure how to to proceed.

avinash0720 avatar Jan 17 '22 12:01 avinash0720