kafka-connect-hdfs
kafka-connect-hdfs copied to clipboard
Solved the problem of files getting stuck in the temporary folder due…
… to the lack of new messages in the topic despite the set property rotate.interval.ms. Property flush.size now also works correctly.
Problem
Occasionally I faced a problem when files were stuck in +tmp folder despite the set property rotate.interval.ms. This situation was observed when new messages stopped falling into the topic. The parameter
rotate.interval.ms did not work, because there was no trigger that could cause a check on it.
Also, while working on this problem, I was able to fix the rotation mechanism and accordingly the flush.size parameter. As written in the documentation: rotation can be controlled by the file collection time (rotate.interval.ms) and/or by the number of messages in the file (flush.size). In fact, rotation by time did not work by the file collection time, but by the time of the field for partitioning. And if one Kafka partition contained messages whose difference in values in the field by which partitioning was done was greater than the value of rotate.interval.ms, then rotation occurred and premature flush of the file from the temporary folder to the permanent one. This led to a large number of small files and the inability to correctly control writing using the flush.size parameter. Now this problem is not observed.
Solution
I added a check for this parameter in HdfsSinkTask.preCommit(). Also, I fixed the logic of rotation in TopicPartitionWriter.shouldRotateAndMaybeUpdateTimers() method.
Does this solution apply anywhere else?
- [ ] yes
- [x] no
Test Strategy
Testing done:
- [x] Unit tests
- [ ] Integration tests
- [ ] System tests
- [x] Manual tests