kafka-connect-spooldir icon indicating copy to clipboard operation
kafka-connect-spooldir copied to clipboard

CSV connector's task hanged in infinite loop after meeting record it can't process

Open MaxSamsonov opened this issue 3 years ago • 1 comments

Hello!

We're testing SpoolDirCsvSourceConnector and faced following condition:

  1. Part of file was processed (2000 rows)

  2. Error from log: [2021-08-24 18:55:46,547] ERROR Exception encountered processing line 5692 of /connect_integrations/848_quality/DEV/SWAP/SWAP-2021_06_0001_0.csv. (com.github.jcustenborder.kafka.connect.spooldir.AbstractSourceTask:265) java.io.IOException: Unterminated quoted field at end of CSV line. Beginning of lost text: [;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ] at com.opencsv.CSVReader.readNext(CSVReader.java:353) at com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceTask.process(SpoolDirCsvSourceTask.java:105) at com.github.jcustenborder.kafka.connect.spooldir.AbstractSourceTask.read(AbstractSourceTask.java:254) at com.github.jcustenborder.kafka.connect.spooldir.AbstractSourceTask.poll(AbstractSourceTask.java:148) at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:289) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:256) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) [2021-08-24 18:55:46,549] INFO Closing /connect_integrations/848_quality/DEV/SWAP/SWAP-2021_06_0001_0.csv (com.github.jcustenborder.kafka.connect.spooldir.InputFile:173) [2021-08-24 18:55:46,550] INFO Removing processing flag /connect_integrations/848_quality/DEV/SWAP/SWAP-2021_06_0001_0.csv.PROCESSING (com.github.jcustenborder.kafka.connect.spooldir.InputFile:177) [2021-08-24 18:55:46,551] ERROR Error during processing, moving /connect_integrations/848_quality/DEV/SWAP/SWAP-2021_06_0001_0.csv to /connect_integrations/848_quality/DEV/SWAP-ERROR. (com.github.jcustenborder.kafka.connect.spooldir.AbstractCleanUpPolicy:90) [2021-08-24 18:55:46,552] INFO Moving /connect_integrations/848_quality/DEV/SWAP/SWAP-2021_06_0001_0.csv to /connect_integrations/848_quality/DEV/SWAP-ERROR/SWAP-2021_06_0001_0.csv (com.github.jcustenborder.kafka.connect.spooldir.InputFile:204)

  3. File successfully moved to error folder.

  4. No new files can be processed, connector task seems to hang on processing line with error on every "empty.poll.wait.ms" interval

[2021-08-24 19:29:54,234] ERROR Exception encountered processing line 5692 of /connect_integrations/848_quality/DEV/SWAP/SWAP-2021_06_0001_0.csv. (com.github.jcustenborder.kafka.connect.spooldir.AbstractSourceTask:265) at com.opencsv.CSVReader.readNext(CSVReader.java:353) [2021-08-24 19:29:54,234] INFO Closing /connect_integrations/848_quality/DEV/SWAP/SWAP-2021_06_0001_0.csv (com.github.jcustenborder.kafka.connect.spooldir.InputFile:173) [2021-08-24 19:29:54,236] ERROR Error during processing, moving /connect_integrations/848_quality/DEV/SWAP/SWAP-2021_06_0001_0.csv to /connect_integrations/848_quality/DEV/SWAP-ERROR. (com.github.jcustenborder.kafka.connect.spooldir.AbstractCleanUpPolicy:90) [2021-08-24 19:30:00,236] ERROR Exception encountered processing line 5692 of /connect_integrations/848_quality/DEV/SWAP/SWAP-2021_06_0001_0.csv. (com.github.jcustenborder.kafka.connect.spooldir.AbstractSourceTask:265) at com.opencsv.CSVReader.readNext(CSVReader.java:353) [2021-08-24 19:30:00,237] INFO Closing /connect_integrations/848_quality/DEV/SWAP/SWAP-2021_06_0001_0.csv (com.github.jcustenborder.kafka.connect.spooldir.InputFile:173) [2021-08-24 19:30:00,239] ERROR Error during processing, moving /connect_integrations/848_quality/DEV/SWAP/SWAP-2021_06_0001_0.csv to /connect_integrations/848_quality/DEV/SWAP-ERROR. (com.github.jcustenborder.kafka.connect.spooldir.AbstractCleanUpPolicy:90) [2021-08-24 19:30:06,240] ERROR Exception encountered processing line 5692 of /connect_integrations/848_quality/DEV/SWAP/SWAP-2021_06_0001_0.csv. (com.github.jcustenborder.kafka.connect.spooldir.AbstractSourceTask:265) at com.opencsv.CSVReader.readNext(CSVReader.java:353) [2021-08-24 19:30:06,240] INFO Closing /connect_integrations/848_quality/DEV/SWAP/SWAP-2021_06_0001_0.csv (com.github.jcustenborder.kafka.connect.spooldir.InputFile:173) [2021-08-24 19:30:06,242] ERROR Error during processing, moving /connect_integrations/848_quality/DEV/SWAP/SWAP-2021_06_0001_0.csv to /connect_integrations/848_quality/DEV/SWAP-ERROR. (com.github.jcustenborder.kafka.connect.spooldir.AbstractCleanUpPolicy:90) [2021-08-24 19:30:12,243] ERROR Exception encountered processing line 5692 of /connect_integrations/848_quality/DEV/SWAP/SWAP-2021_06_0001_0.csv. (com.github.jcustenborder.kafka.connect.spooldir.AbstractSourceTask:265) at com.opencsv.CSVReader.readNext(CSVReader.java:353) [2021-08-24 19:30:12,243] INFO Closing /connect_integrations/848_quality/DEV/SWAP/SWAP-2021_06_0001_0.csv (com.github.jcustenborder.kafka.connect.spooldir.InputFile:173) [2021-08-24 19:30:12,245] ERROR Error during processing, moving /connect_integrations/848_quality/DEV/SWAP/SWAP-2021_06_0001_0.csv to /connect_integrations/848_quality/DEV/SWAP-ERROR. (com.github.jcustenborder.kafka.connect.spooldir.AbstractCleanUpPolicy:90)

Could you please help to identify where is the problem?

Confluent kafka 6.1.0 (Kafka version 2.7.1) Used jcustenborder-kafka-connect-spooldir-2.0.62 version

Connector configuration:

{ "name" : "csv-file-quality-test-orders-v0", "config" : { "connector.class" : "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector", "csv.first.row.as.header" : "true", "csv.separator.char" : "59", "empty.poll.wait.ms" : "6000", "error.path" : "/connect_integrations/848_quality/DEV/SWAP-ERROR", "finished.path" : "/connect_integrations/848_quality/DEV/SWAP-SUCCESS", "halt.on.error" : "false", "input.file.pattern" : ".*.csv", "input.path" : "/connect_integrations/848_quality/DEV/SWAP", "key.converter" : "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable" : "false", "name" : "csv-file-quality-test-orders-v0", "output.json.formatter" : "com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson", "topic" : "csv_test_topic", "value.converter" : "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable" : "false", "value.schema" : "{"name": "csv.Value", "type": "STRUCT", "isOptional": true, "fieldSchemas": {"test0": { "type": "STRING", "isOptional": true} , "test1" : { "type": "STRING", "isOptional": true} , "test2" : { "type": "STRING", "isOptional": true} , "test3" : { "type": "STRING", "isOptional": true} , "test4" : { "type": "STRING", "isOptional": true} , "test5" : { "type": "STRING", "isOptional": true} , "test6" : { "type": "STRING", "isOptional": true} , "test7" : { "type": "STRING", "isOptional": true} , "extended" : { "type": "STRING", "isOptional": true} , "extended1" : { "type": "STRING", "isOptional": true} , "extended2" : { "type": "STRING", "isOptional": true} , "extended3" : { "type": "STRING", "isOptional": true} , "extended4" : { "type": "STRING", "isOptional": true} , "extended5" : { "type": "STRING", "isOptional": true} , "extended6" : { "type": "STRING", "isOptional": true} , "extended7" : { "type": "STRING", "isOptional": true} , "extended8" : { "type": "STRING", "isOptional": true} , "extended9" : { "type": "STRING", "isOptional": true} , "extended10" : { "type": "STRING", "isOptional": true} , "extended11" : { "type": "STRING", "isOptional": true} , "extended12" : { "type": "STRING", "isOptional": true} , "extended13" : { "type": "STRING", "isOptional": true} , "extended14" : { "type": "STRING", "isOptional": true} , "extended15" : { "type": "STRING", "isOptional": true} , "extended16" : { "type": "STRING", "isOptional": true} , "extended17" : { "type": "STRING", "isOptional": true} , "extended18" : { "type": "STRING", "isOptional": true} , "extended19" : { "type": "STRING", "isOptional": true} , "extended20" : { "type": "STRING", "isOptional": true} , "extended21" : { "type": "STRING", "isOptional": true} , "extended22" : { "type": "STRING", "isOptional": true} , "extended23" : { "type": "STRING", "isOptional": true} , "extended24" : { "type": "STRING", "isOptional": true} , "extended25" : { "type": "STRING", "isOptional": true} , "extended26" : { "type": "STRING", "isOptional": true} , "extended27" : { "type": "STRING", "isOptional": true} , "extended28" : { "type": "STRING", "isOptional": true} , "extended29" : { "type": "STRING", "isOptional": true} , "extended30" : { "type": "STRING", "isOptional": true} , "extended31" : { "type": "STRING", "isOptional": true} } }" } }

Thank you.

MaxSamsonov avatar Aug 24 '21 16:08 MaxSamsonov

@MaxSamsonov I had the same problem, what worked for me was to change the parser to RFC 4180 and change the file's name convention to camel case.

cande1gut avatar Dec 06 '21 22:12 cande1gut