kafka-connect-spooldir
kafka-connect-spooldir copied to clipboard
CSV connector's task hanged in infinite loop after meeting record it can't process
Hello!
We're testing SpoolDirCsvSourceConnector and faced following condition:
-
Part of file was processed (2000 rows)
-
Error from log: [2021-08-24 18:55:46,547] ERROR Exception encountered processing line 5692 of /connect_integrations/848_quality/DEV/SWAP/SWAP-2021_06_0001_0.csv. (com.github.jcustenborder.kafka.connect.spooldir.AbstractSourceTask:265) java.io.IOException: Unterminated quoted field at end of CSV line. Beginning of lost text: [;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ] at com.opencsv.CSVReader.readNext(CSVReader.java:353) at com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceTask.process(SpoolDirCsvSourceTask.java:105) at com.github.jcustenborder.kafka.connect.spooldir.AbstractSourceTask.read(AbstractSourceTask.java:254) at com.github.jcustenborder.kafka.connect.spooldir.AbstractSourceTask.poll(AbstractSourceTask.java:148) at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:289) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:256) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) [2021-08-24 18:55:46,549] INFO Closing /connect_integrations/848_quality/DEV/SWAP/SWAP-2021_06_0001_0.csv (com.github.jcustenborder.kafka.connect.spooldir.InputFile:173) [2021-08-24 18:55:46,550] INFO Removing processing flag /connect_integrations/848_quality/DEV/SWAP/SWAP-2021_06_0001_0.csv.PROCESSING (com.github.jcustenborder.kafka.connect.spooldir.InputFile:177) [2021-08-24 18:55:46,551] ERROR Error during processing, moving /connect_integrations/848_quality/DEV/SWAP/SWAP-2021_06_0001_0.csv to /connect_integrations/848_quality/DEV/SWAP-ERROR. (com.github.jcustenborder.kafka.connect.spooldir.AbstractCleanUpPolicy:90) [2021-08-24 18:55:46,552] INFO Moving /connect_integrations/848_quality/DEV/SWAP/SWAP-2021_06_0001_0.csv to /connect_integrations/848_quality/DEV/SWAP-ERROR/SWAP-2021_06_0001_0.csv (com.github.jcustenborder.kafka.connect.spooldir.InputFile:204)
-
File successfully moved to error folder.
-
No new files can be processed, connector task seems to hang on processing line with error on every "empty.poll.wait.ms" interval
[2021-08-24 19:29:54,234] ERROR Exception encountered processing line 5692 of /connect_integrations/848_quality/DEV/SWAP/SWAP-2021_06_0001_0.csv. (com.github.jcustenborder.kafka.connect.spooldir.AbstractSourceTask:265) at com.opencsv.CSVReader.readNext(CSVReader.java:353) [2021-08-24 19:29:54,234] INFO Closing /connect_integrations/848_quality/DEV/SWAP/SWAP-2021_06_0001_0.csv (com.github.jcustenborder.kafka.connect.spooldir.InputFile:173) [2021-08-24 19:29:54,236] ERROR Error during processing, moving /connect_integrations/848_quality/DEV/SWAP/SWAP-2021_06_0001_0.csv to /connect_integrations/848_quality/DEV/SWAP-ERROR. (com.github.jcustenborder.kafka.connect.spooldir.AbstractCleanUpPolicy:90) [2021-08-24 19:30:00,236] ERROR Exception encountered processing line 5692 of /connect_integrations/848_quality/DEV/SWAP/SWAP-2021_06_0001_0.csv. (com.github.jcustenborder.kafka.connect.spooldir.AbstractSourceTask:265) at com.opencsv.CSVReader.readNext(CSVReader.java:353) [2021-08-24 19:30:00,237] INFO Closing /connect_integrations/848_quality/DEV/SWAP/SWAP-2021_06_0001_0.csv (com.github.jcustenborder.kafka.connect.spooldir.InputFile:173) [2021-08-24 19:30:00,239] ERROR Error during processing, moving /connect_integrations/848_quality/DEV/SWAP/SWAP-2021_06_0001_0.csv to /connect_integrations/848_quality/DEV/SWAP-ERROR. (com.github.jcustenborder.kafka.connect.spooldir.AbstractCleanUpPolicy:90) [2021-08-24 19:30:06,240] ERROR Exception encountered processing line 5692 of /connect_integrations/848_quality/DEV/SWAP/SWAP-2021_06_0001_0.csv. (com.github.jcustenborder.kafka.connect.spooldir.AbstractSourceTask:265) at com.opencsv.CSVReader.readNext(CSVReader.java:353) [2021-08-24 19:30:06,240] INFO Closing /connect_integrations/848_quality/DEV/SWAP/SWAP-2021_06_0001_0.csv (com.github.jcustenborder.kafka.connect.spooldir.InputFile:173) [2021-08-24 19:30:06,242] ERROR Error during processing, moving /connect_integrations/848_quality/DEV/SWAP/SWAP-2021_06_0001_0.csv to /connect_integrations/848_quality/DEV/SWAP-ERROR. (com.github.jcustenborder.kafka.connect.spooldir.AbstractCleanUpPolicy:90) [2021-08-24 19:30:12,243] ERROR Exception encountered processing line 5692 of /connect_integrations/848_quality/DEV/SWAP/SWAP-2021_06_0001_0.csv. (com.github.jcustenborder.kafka.connect.spooldir.AbstractSourceTask:265) at com.opencsv.CSVReader.readNext(CSVReader.java:353) [2021-08-24 19:30:12,243] INFO Closing /connect_integrations/848_quality/DEV/SWAP/SWAP-2021_06_0001_0.csv (com.github.jcustenborder.kafka.connect.spooldir.InputFile:173) [2021-08-24 19:30:12,245] ERROR Error during processing, moving /connect_integrations/848_quality/DEV/SWAP/SWAP-2021_06_0001_0.csv to /connect_integrations/848_quality/DEV/SWAP-ERROR. (com.github.jcustenborder.kafka.connect.spooldir.AbstractCleanUpPolicy:90)
Could you please help to identify where is the problem?
Confluent kafka 6.1.0 (Kafka version 2.7.1) Used jcustenborder-kafka-connect-spooldir-2.0.62 version
Connector configuration:
{ "name" : "csv-file-quality-test-orders-v0", "config" : { "connector.class" : "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector", "csv.first.row.as.header" : "true", "csv.separator.char" : "59", "empty.poll.wait.ms" : "6000", "error.path" : "/connect_integrations/848_quality/DEV/SWAP-ERROR", "finished.path" : "/connect_integrations/848_quality/DEV/SWAP-SUCCESS", "halt.on.error" : "false", "input.file.pattern" : ".*.csv", "input.path" : "/connect_integrations/848_quality/DEV/SWAP", "key.converter" : "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable" : "false", "name" : "csv-file-quality-test-orders-v0", "output.json.formatter" : "com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson", "topic" : "csv_test_topic", "value.converter" : "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable" : "false", "value.schema" : "{"name": "csv.Value", "type": "STRUCT", "isOptional": true, "fieldSchemas": {"test0": { "type": "STRING", "isOptional": true} , "test1" : { "type": "STRING", "isOptional": true} , "test2" : { "type": "STRING", "isOptional": true} , "test3" : { "type": "STRING", "isOptional": true} , "test4" : { "type": "STRING", "isOptional": true} , "test5" : { "type": "STRING", "isOptional": true} , "test6" : { "type": "STRING", "isOptional": true} , "test7" : { "type": "STRING", "isOptional": true} , "extended" : { "type": "STRING", "isOptional": true} , "extended1" : { "type": "STRING", "isOptional": true} , "extended2" : { "type": "STRING", "isOptional": true} , "extended3" : { "type": "STRING", "isOptional": true} , "extended4" : { "type": "STRING", "isOptional": true} , "extended5" : { "type": "STRING", "isOptional": true} , "extended6" : { "type": "STRING", "isOptional": true} , "extended7" : { "type": "STRING", "isOptional": true} , "extended8" : { "type": "STRING", "isOptional": true} , "extended9" : { "type": "STRING", "isOptional": true} , "extended10" : { "type": "STRING", "isOptional": true} , "extended11" : { "type": "STRING", "isOptional": true} , "extended12" : { "type": "STRING", "isOptional": true} , "extended13" : { "type": "STRING", "isOptional": true} , "extended14" : { "type": "STRING", "isOptional": true} , "extended15" : { "type": "STRING", "isOptional": true} , "extended16" : { "type": "STRING", "isOptional": true} , "extended17" : { "type": "STRING", "isOptional": true} , "extended18" : { "type": "STRING", "isOptional": true} , "extended19" : { "type": "STRING", "isOptional": true} , "extended20" : { "type": "STRING", "isOptional": true} , "extended21" : { "type": "STRING", "isOptional": true} , "extended22" : { "type": "STRING", "isOptional": true} , "extended23" : { "type": "STRING", "isOptional": true} , "extended24" : { "type": "STRING", "isOptional": true} , "extended25" : { "type": "STRING", "isOptional": true} , "extended26" : { "type": "STRING", "isOptional": true} , "extended27" : { "type": "STRING", "isOptional": true} , "extended28" : { "type": "STRING", "isOptional": true} , "extended29" : { "type": "STRING", "isOptional": true} , "extended30" : { "type": "STRING", "isOptional": true} , "extended31" : { "type": "STRING", "isOptional": true} } }" } }
Thank you.
@MaxSamsonov I had the same problem, what worked for me was to change the parser to RFC 4180 and change the file's name convention to camel case.