feat(io): Add Streaming File Input Component
Summary
This pull request introduces a streaming_file input component designed for continuously reading from files with high reliability. It functions similarly to tail -F but adds critical features for production environments, including crash recovery, seamless log rotation handling, and at-least-once delivery guarantees.
The component monitors a target file for new data, persisting its read position to a state file. This allows it to resume from the exact last-acknowledged position after a restart. It is built to handle common operational scenarios like file rotation and truncation automatically, ensuring data is not lost or duplicated.
Key Features
- Persistent State: The reader's position (byte offset and inode) is durably persisted to a state directory, enabling seamless recovery from crashes or restarts.
- Automatic File Rotation: Uses file inodes (on Unix systems) to reliably detect file rotations. It finishes reading the old file before automatically switching to the new one and resetting its position.
- Truncation Handling: Detects when a file has been truncated (i.e., its size is smaller than the last read offset) and correctly resets its position to the beginning of the file.
-
At-Least-Once Semantics: Implements an acknowledgment (
AckFunc) mechanism. The read position is only advanced and checkpointed after a message has been successfully processed and acknowledged downstream. -
Efficient Monitoring: Uses
fsnotifyto react to file system events (writes, creates, renames) in real-time, avoiding inefficient polling. - Comprehensive Observability: Integrated with OpenTelemetry to provide key metrics, including lines read, bytes read, file rotations, and buffer saturation.
- Graceful Shutdown: Ensures all in-flight messages are acknowledged before closing (with a configurable timeout) and performs a final state save to prevent data loss.
Notes for Reviewer
This is a large but self-contained feature. The core logic resides in internal/impl/io/input_streaming_file.go.
📝 Design Highlights
-
Durable State Saving: The
savePositionDurablefunction implements a robust save pattern: write to a temporary file,fsync()the file,rename()it to the final destination, and thenfsync()the parent directory. This ensures the state file is never left in a corrupted state, even if the process crashes mid-write. -
Platform-Specific Inode Handling: Logic to retrieve a file's inode is split into
inode_unix.goandinode_other.gousing Go build tags. This provides the most reliable method for rotation detection on Unix while gracefully falling back to other heuristics on systems like Windows. -
Stale ACK Prevention: A
generationcounter (atomic.Uint64) is incremented every time a file is rotated or truncated. The acknowledgment function captures the generation number at the time of message creation and will ignore any acks from a previous "generation." This is critical to prevent an ack from an old, rotated-away file from incorrectly advancing the byte offset of the new file. -
Concurrency Model:
- The primary
monitorFilegoroutine listens forfsnotifyevents and triggers data draining. - Data is read into a buffered channel (
sfi.buffer) to decouple file I/O from downstream consumption in theReadmethod. - Atomic operations are used for high-frequency updates (metrics counters, offsets), while mutexes protect access to shared resources like the file handle and position struct.
- The primary
-
Testing: A comprehensive test suite (
input_streaming_file_test.go) is included, covering the main success paths and edge cases, including position persistence, file rotation, file truncation, and concurrent reads.
Hey @aronchick 👋 Looks like you need to run go mod tidy and include the new go.mod in your changes. See failing CI step here.
I cloned your branch and seems it's that the go.opentelemetry.io/otel/metric v1.35.0 dependency is no longer an indirect dependency.
@jem-davies thank you SO MUCH for the thorough analysis/feedback. I've taken them all and made updates in isolated PRs which are above and all squashed.
To your direct question, I wrote this code and then passed it all through several GPTs with the criteria to find race conditions, hotspots, etc and took many (but not all) of their feedback because they seemed appropriate and logical. And, for sure, i had it flesh out the PR description (which i'm generally terrible at).
I've also tried to reduce the scope of the PR as well - including making it experimental.
Would love your feedback!
OK great - thanks for such a quick reply!
I'll look to get a more detailed review done and then we should merge soon 😄
To your direct question, I wrote this code and then passed it all through several GPTs with the criteria to find race conditions, hotspots, etc and took many (but not all) of their feedback because they seemed appropriate and logical. And, for sure, i had it flesh out the PR description (which i'm generally terrible at).
OK - yeah I find that it is a good way to work with LLMs too 👍
If there was one thing I think we could possibly strip out to get to the MVP, it would be the state tracking. I feel like this is a nice feature but if your process really does crash mid-stream, then I think it's okay to start the tail at the start of the file again and just allow people to handle their dedupe on their own. State tracking seems really advanced but it felt like something that I would want to have
On reflection - I really think we should pull state maintenance out. It's quite complex and is a big leap over what tail -F offers.
Thoughts?
On reflection - I really think we should pull state maintenance out. It's quite complex and is a big leap over what tail -F offers.
If you think that then I think we could take it out (at least for now) ... I can see why it would be useful.
For now maybe what we could try is to include the FilePosition struct as metadata, and then a stream pipeline could make use of a cache and implement it's own logic around discarding lines already read?
Also I think that more could come out so:
Metrics
Need to look at the metrics this input is exposing and if they conflict with ones the Bento stream engine already exposes listed here. Also I notice that in this PR we have a metricsFlusher() func - is this needed - could we take an approach similar to processor_metric.go?
EDIT: - unsure of another component that does actually emit it's own metrics - would consider removing them entirely at this time.
Logging
It's safe to assume that the *service.Logger is not nil so we can just remove nil checks around the logger:
if logger != nil {
logger.Warnf("Failed to load previous position: %v", err)
}
and:
func (sfi *StreamingFileInput) logDebugf(format string, args ...interface{}) {
if sfi.logger != nil {
sfi.logger.Debugf(format, args...)
}
}
I'll hold off on a finer detail review until we can decide on scope of the new streaming_file input.
Ok! I've done a really hard core stripping out :)
I think it's much cleaner now - and only 600 lines. Want to look again?
I think it's much cleaner now - and only 600 lines. Want to look again?
Ok great - I'll do a more finer review - and then we can look to merge 😄
Started to do a more in-depth review but fyi I went to setup a simple config:
log.txt:
hello Alice
config.yaml:
input:
streaming_file:
path: ./log.txt
output:
stdout: {}
append to the log.txt
echo "hello Bob\n" >> log.txt
ERRO Failed to read message: context deadline exceeded @service=bento label="" path=root.input
I will continue to review but just wanted to make sure that ☝️ is right - and I am not missing something obvious?
UH. No that definitely worked on my machine. Let me see if i can repro.
How are you setting up and using bento? Do you have a test app/framework you're using i can copy?
OK. I figured it out. I was putting a ReadTimeout in for the purposes of being defensive, but that's actually silly - tail doesn't have a read timeout, it listens forever.
So dropped it and all good. Let me do some more testing, and i'll check in.
OK. I figured it out. I was putting a ReadTimeout in for the purposes of being defensive, but that's actually silly - tail doesn't have a read timeout, it listens forever.
Ok it's working now 😄
I went a little nuts and created some scripts to test in a bunch of ways (one is super aggressive - 50% drop rate is expected (5.5M rows in 3/sec ;)).
- Stress test script: https://gist.github.com/aronchick/ea9ef3c03e4032db57727d2ce779303e
- Simple script testing: https://gist.github.com/aronchick/f4e5b90d3cd7df7adbf479a8e3a6e926
- Sample config file: https://gist.github.com/aronchick/6acfdb7381205d9958b9dda21590c253
i cut a bit more of the complexity thanks to @gregfurman's comments on my other PR #624 :)