kafka-connect-fs
kafka-connect-fs copied to clipboard
Get duplicate records in kafka
Below is my connect-file-source.properties:
name=local-file-source1 connector.class=com.github.mmolimar.kafka.connect.fs.FsSourceConnector tasks.max=1 fs.uris=file:///data/test_file topic=connect-test1 policy.class=com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy policy.sleepy.sleep=50000 policy.recursive=true poll.interval.ms=0 policy.regexp=.* policy.batch_size=0 #policy.cleanup=none file_reader.class=com.github.mmolimar.kafka.connect.fs.file.reader.TextFileReader file_reader.batch_size=0
when I run echo "bbb" >>/data/test_file/1.txt echo "ddd" >> /data/test_file/1.txt
I got below from kafka: {"schema":{"type":"struct","fields":[{"type":"string","optional":false,"field":"value"}],"optional":false},"payload":{"value":"bbb"}} {"schema":{"type":"struct","fields":[{"type":"string","optional":false,"field":"value"}],"optional":false},"payload":{"value":"bbb"}} {"schema":{"type":"struct","fields":[{"type":"string","optional":false,"field":"value"}],"optional":false},"payload":{"value":"ddd"}} {"schema":{"type":"struct","fields":[{"type":"string","optional":false,"field":"value"}],"optional":false},"payload":{"value":"ddd"}}
below is log: nect.fs.policy.SleepyPolicy:285) [2023-02-20 13:21:01,087] INFO [local-file-source1|task-0] SleepyPolicy Skipping file [file:/data/test_file/=] due to it was already processed. (com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy:285) [2023-02-20 13:21:01,088] INFO [local-file-source1|task-0] SleepyPolicy Seeking to offset [2] for file [file:/data/test_file/7.txt]. (com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy:242) [2023-02-20 13:21:01,110] INFO [local-file-source1|task-0] FsSourceTask Processing records for file [path = file:/data/test_file/7.txt, length = 16, blocks = [[offset = 0, length = 16, corrupt = false]]]... (com.github.mmolimar.kafka.connect.fs.FsSourceTask:93) [2023-02-20 13:21:01,131] INFO [local-file-source1|task-0] [Producer clientId=connector-producer-local-file-source1-0] Resetting the last seen epoch of partition connect-test1-0 to 0 since the associated topicId changed from null to pOAivNaKReC7FstKHYOn_A (org.apache.kafka.clients.Metadata:402)
[2023-02-20 13:21:11,218] INFO [local-file-source1|task-0] SleepyPolicy Skipping file [file:/data/test_file/1.txt] due to it was already processed. (com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy:285) [2023-02-20 13:21:11,218] INFO [local-file-source1|task-0] SleepyPolicy Skipping file [file:/data/test_file/2.txt] due to it was already processed. (com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy:285) [2023-02-20 13:21:11,218] INFO [local-file-source1|task-0] SleepyPolicy Skipping file [file:/data/test_file/3.txt] due to it was already processed. (com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy:285) [2023-02-20 13:21:11,219] INFO [local-file-source1|task-0] SleepyPolicy Skipping file [file:/data/test_file/6.txt] due to it was already processed. (com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy:285) [2023-02-20 13:21:11,219] INFO [local-file-source1|task-0] SleepyPolicy Skipping file [file:/data/test_file/5.txt] due to it was already processed. (com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy:285) [2023-02-20 13:21:11,219] INFO [local-file-source1|task-0] SleepyPolicy Skipping file [file:/data/test_file/=] due to it was already processed. (com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy:285) [2023-02-20 13:21:11,219] INFO [local-file-source1|task-0] SleepyPolicy Seeking to offset [2] for file [file:/data/test_file/7.txt]. (com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy:242) [2023-02-20 13:21:11,220] INFO [local-file-source1|task-0] FsSourceTask Processing records for file [path = file:/data/test_file/7.txt, length = 24, blocks = [[offset = 0, length
@mmolimar , Do you have any further updates or plan to fix this issue? After upgrading to the master branch we see duplicate data.