kafka-connect-spooldir icon indicating copy to clipboard operation
kafka-connect-spooldir copied to clipboard

SpoolDirLineDelimited - Task failed when processing file that contains corrupted line

Open davidg2019 opened this issue 5 years ago • 5 comments

Hi,

I have a file that contains some lines of 500 columns. The last one (is corrupted) has over than 130 000 000 columns. When the connector process the file, it throw this error:

[2020-01-27 19:04:34,753] ERROR WorkerSourceTask{id=test-error-0} Failed to flush, timed out while waiting for producer to flush outstanding 1 messages (org.apache.kafka.connect.runtime.WorkerSourceTask) [2020-01-27 19:04:34,754] ERROR WorkerSourceTask{id=test-error-0} Failed to commit offsets (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter) [2020-01-27 19:04:35,234] ERROR WorkerSourceTask{id=test-error-0} failed to send record to test-error: (org.apache.kafka.connect.runtime.WorkerSourceTask) org.apache.kafka.common.errors.RecordTooLargeException: The message is 164956873 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration. [2020-01-27 19:04:35,234] INFO WorkerSourceTask{id=test-error-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask) [2020-01-27 19:04:35,234] INFO WorkerSourceTask{id=test-error-0} flushing 1 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask) [2020-01-27 19:04:40,234] ERROR WorkerSourceTask{id=test-error-0} Failed to flush, timed out while waiting for producer to flush outstanding 1 messages (org.apache.kafka.connect.runtime.WorkerSourceTask) [2020-01-27 19:04:40,234] ERROR WorkerSourceTask{id=test-error-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask) org.apache.kafka.connect.errors.ConnectException: Unrecoverable exception from producer send callback at org.apache.kafka.connect.runtime.WorkerSourceTask.maybeThrowProducerSendException(WorkerSourceTask.java:252) at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:305) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:234) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 164956873 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration. [2020-01-27 19:04:40,235] ERROR WorkerSourceTask{id=test-error-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [2020-01-27 19:04:40,235] INFO Stopping task. (com.github.jcustenborder.kafka.connect.spooldir.AbstractSourceTask) [2020-01-27 19:04:40,235] INFO Closing /sourcefiles/MyFileInError.csv.txt (com.github.jcustenborder.kafka.connect.spooldir.InputFile) [2020-01-27 19:04:40,237] INFO Removing processing flag /sourcefiles/MyFileInError.csv.txt.PROCESSING (com.github.jcustenborder.kafka.connect.spooldir.InputFile) [2020-01-27 19:04:40,238] INFO [Producer clientId=connector-producer-test-error-0] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer)

Here is my connector config:

name=test-error connector.class=com.github.jcustenborder.kafka.connect.spooldir.SpoolDirLineDelimitedSourceConnector input.file.pattern=^.+.txt$ finished.path=/sourcefiles/finished tasks.max=1 empty.poll.wait.ms=500 topic=test-error errors.tolerance=all error.path=/sourcefiles/error halt.on.error=false value.converter=org.apache.kafka.connect.storage.StringConverter input.path=/sourcefiles

I thought that with this config, halt.on.error=false, MyFileInError.zip

it will stop processing the file, move it to error directory and start process the next file. But That does not work as expected. Instead of that, the task failed and it leaved the file in error in input path.

Do you have an idea on what it happends ?

Best regards David

davidg2019 avatar Jan 29 '20 17:01 davidg2019

The same issue with my case also. In my case the task failed when there was a bad record in the input and did not push the file to error directory even, the file stays in the same location. When restarted after the whole file started loading again from the beginning, received duplicate records in the topic. Not sure how to overcome this scenario.

sivaknakka avatar Jan 29 '20 23:01 sivaknakka

Moreover I see the offsets being stored in _kafka-connect-offsets topic, but do not find the corresponding source file name, it shows only the offset number, no other entries in the topic. Also wondering how to get the source file name along with the offset in the _kafka-connect-offsets topic.

sivaknakka avatar Jan 29 '20 23:01 sivaknakka

Have either of you tried the 2.0 version?

Moreover I see the offsets being stored in _kafka-connect-offsets topic, but do not find the corresponding source file name, it shows only the offset number, no other entries in the topic. Also wondering how to get the source file name along with the offset in the _kafka-connect-offsets topic.

What does the log say when this happens? If you are using the console consumer to consume _kafka-connect-offsets try adding --property print.key=true

Hi,

I have a file that contains some lines of 500 columns. The last one (is corrupted) has over than 130 000 000 columns. When the connector process the file, it throw this error:

[2020-01-27 19:04:34,753] ERROR WorkerSourceTask{id=test-error-0} Failed to flush, timed out while waiting for producer to flush outstanding 1 messages (org.apache.kafka.connect.runtime.WorkerSourceTask) [2020-01-27 19:04:34,754] ERROR WorkerSourceTask{id=test-error-0} Failed to commit offsets (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter) [2020-01-27 19:04:35,234] ERROR WorkerSourceTask{id=test-error-0} failed to send record to test-error: (org.apache.kafka.connect.runtime.WorkerSourceTask) org.apache.kafka.common.errors.RecordTooLargeException: The message is 164956873 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration. [2020-01-27 19:04:35,234] INFO WorkerSourceTask{id=test-error-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask) [2020-01-27 19:04:35,234] INFO WorkerSourceTask{id=test-error-0} flushing 1 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask) [2020-01-27 19:04:40,234] ERROR WorkerSourceTask{id=test-error-0} Failed to flush, timed out while waiting for producer to flush outstanding 1 messages (org.apache.kafka.connect.runtime.WorkerSourceTask) [2020-01-27 19:04:40,234] ERROR WorkerSourceTask{id=test-error-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask) org.apache.kafka.connect.errors.ConnectException: Unrecoverable exception from producer send callback at org.apache.kafka.connect.runtime.WorkerSourceTask.maybeThrowProducerSendException(WorkerSourceTask.java:252) at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:305) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:234) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 164956873 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration. [2020-01-27 19:04:40,235] ERROR WorkerSourceTask{id=test-error-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [2020-01-27 19:04:40,235] INFO Stopping task. (com.github.jcustenborder.kafka.connect.spooldir.AbstractSourceTask) [2020-01-27 19:04:40,235] INFO Closing /sourcefiles/MyFileInError.csv.txt (com.github.jcustenborder.kafka.connect.spooldir.InputFile) [2020-01-27 19:04:40,237] INFO Removing processing flag /sourcefiles/MyFileInError.csv.txt.PROCESSING (com.github.jcustenborder.kafka.connect.spooldir.InputFile) [2020-01-27 19:04:40,238] INFO [Producer clientId=connector-producer-test-error-0] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer)

Here is my connector config:

name=test-error connector.class=com.github.jcustenborder.kafka.connect.spooldir.SpoolDirLineDelimitedSourceConnector input.file.pattern=^.+.txt$ finished.path=/sourcefiles/finished tasks.max=1 empty.poll.wait.ms=500 topic=test-error errors.tolerance=all error.path=/sourcefiles/error halt.on.error=false value.converter=org.apache.kafka.connect.storage.StringConverter input.path=/sourcefiles

I thought that with this config, halt.on.error=false, MyFileInError.zip

it will stop processing the file, move it to error directory and start process the next file. But That does not work as expected. Instead of that, the task failed and it leaved the file in error in input path.

Do you have an idea on what it happends ?

Best regards David

Your exception is kafka kicking the message out for being too large for a single line. The connector you are using only reads line by line and produces the line to Kafka. It looks like you have a 160mb line in the file. You could either increase the max message size or filter with a transformation. This particular message happens after the messages are returned to the connect framework.

[2020-01-27 19:04:35,234] ERROR WorkerSourceTask{id=test-error-0} failed to send record to test-error: (org.apache.kafka.connect.runtime.WorkerSourceTask)
org.apache.kafka.common.errors.RecordTooLargeException: The message is 164956873 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.

jcustenborder avatar Jan 29 '20 23:01 jcustenborder

Your exception is kafka kicking the message out for being too large for a single line. The connector you are using only reads line by line and produces the line to Kafka. It looks like you have a 160mb line in the file. You could either increase the max message size or filter with a transformation. This particular message happens after the messages are returned to the connect framework.

[2020-01-27 19:04:35,234] ERROR WorkerSourceTask{id=test-error-0} failed to send record to test-error: (org.apache.kafka.connect.runtime.WorkerSourceTask)
org.apache.kafka.common.errors.RecordTooLargeException: The message is 164956873 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.

Hi,

I understand the error. I know that the line is too big because it is a bad record. My connector process a file that is generated by an another processing so i have no control on the content of the file. In my case, i don't want that this file to be processed because it contains a bad record, so i expected that the connector catch the error and according to the config halt.on.error=false, move the file to error directory. Isn't that what the connector should do in this use case? Best regards David

davidg2019 avatar Jan 30 '20 14:01 davidg2019

Unfortunately that happens after the task has converted the data to records. I wouldn't be able to catch the exception any way. You could potentially look at implementing a transformation that inspects the value of the message and if it's longer than x, drop the message.

jcustenborder avatar Jan 30 '20 15:01 jcustenborder