bento icon indicating copy to clipboard operation
bento copied to clipboard

feat(io): Add Streaming File Input Component

Open aronchick opened this issue 2 months ago • 1 comments

Summary

This pull request introduces a streaming_file input component designed for continuously reading from files with high reliability. It functions similarly to tail -F but adds critical features for production environments, including crash recovery, seamless log rotation handling, and at-least-once delivery guarantees.

The component monitors a target file for new data, persisting its read position to a state file. This allows it to resume from the exact last-acknowledged position after a restart. It is built to handle common operational scenarios like file rotation and truncation automatically, ensuring data is not lost or duplicated.

Key Features

  • Persistent State: The reader's position (byte offset and inode) is durably persisted to a state directory, enabling seamless recovery from crashes or restarts.
  • Automatic File Rotation: Uses file inodes (on Unix systems) to reliably detect file rotations. It finishes reading the old file before automatically switching to the new one and resetting its position.
  • Truncation Handling: Detects when a file has been truncated (i.e., its size is smaller than the last read offset) and correctly resets its position to the beginning of the file.
  • At-Least-Once Semantics: Implements an acknowledgment (AckFunc) mechanism. The read position is only advanced and checkpointed after a message has been successfully processed and acknowledged downstream.
  • Efficient Monitoring: Uses fsnotify to react to file system events (writes, creates, renames) in real-time, avoiding inefficient polling.
  • Comprehensive Observability: Integrated with OpenTelemetry to provide key metrics, including lines read, bytes read, file rotations, and buffer saturation.
  • Graceful Shutdown: Ensures all in-flight messages are acknowledged before closing (with a configurable timeout) and performs a final state save to prevent data loss.

Notes for Reviewer

This is a large but self-contained feature. The core logic resides in internal/impl/io/input_streaming_file.go.

📝 Design Highlights

  1. Durable State Saving: The savePositionDurable function implements a robust save pattern: write to a temporary file, fsync() the file, rename() it to the final destination, and then fsync() the parent directory. This ensures the state file is never left in a corrupted state, even if the process crashes mid-write.
  2. Platform-Specific Inode Handling: Logic to retrieve a file's inode is split into inode_unix.go and inode_other.go using Go build tags. This provides the most reliable method for rotation detection on Unix while gracefully falling back to other heuristics on systems like Windows.
  3. Stale ACK Prevention: A generation counter (atomic.Uint64) is incremented every time a file is rotated or truncated. The acknowledgment function captures the generation number at the time of message creation and will ignore any acks from a previous "generation." This is critical to prevent an ack from an old, rotated-away file from incorrectly advancing the byte offset of the new file.
  4. Concurrency Model:
    • The primary monitorFile goroutine listens for fsnotify events and triggers data draining.
    • Data is read into a buffered channel (sfi.buffer) to decouple file I/O from downstream consumption in the Read method.
    • Atomic operations are used for high-frequency updates (metrics counters, offsets), while mutexes protect access to shared resources like the file handle and position struct.
  5. Testing: A comprehensive test suite (input_streaming_file_test.go) is included, covering the main success paths and edge cases, including position persistence, file rotation, file truncation, and concurrent reads.

aronchick avatar Oct 20 '25 04:10 aronchick

Hey @aronchick 👋 Looks like you need to run go mod tidy and include the new go.mod in your changes. See failing CI step here.

I cloned your branch and seems it's that the go.opentelemetry.io/otel/metric v1.35.0 dependency is no longer an indirect dependency.

gregfurman avatar Nov 19 '25 09:11 gregfurman

@jem-davies thank you SO MUCH for the thorough analysis/feedback. I've taken them all and made updates in isolated PRs which are above and all squashed.

To your direct question, I wrote this code and then passed it all through several GPTs with the criteria to find race conditions, hotspots, etc and took many (but not all) of their feedback because they seemed appropriate and logical. And, for sure, i had it flesh out the PR description (which i'm generally terrible at).

I've also tried to reduce the scope of the PR as well - including making it experimental.

Would love your feedback!

aronchick avatar Dec 17 '25 22:12 aronchick

OK great - thanks for such a quick reply!

I'll look to get a more detailed review done and then we should merge soon 😄

To your direct question, I wrote this code and then passed it all through several GPTs with the criteria to find race conditions, hotspots, etc and took many (but not all) of their feedback because they seemed appropriate and logical. And, for sure, i had it flesh out the PR description (which i'm generally terrible at).

OK - yeah I find that it is a good way to work with LLMs too 👍

jem-davies avatar Dec 17 '25 22:12 jem-davies

If there was one thing I think we could possibly strip out to get to the MVP, it would be the state tracking. I feel like this is a nice feature but if your process really does crash mid-stream, then I think it's okay to start the tail at the start of the file again and just allow people to handle their dedupe on their own. State tracking seems really advanced but it felt like something that I would want to have

aronchick avatar Dec 17 '25 23:12 aronchick

On reflection - I really think we should pull state maintenance out. It's quite complex and is a big leap over what tail -F offers.

Thoughts?

aronchick avatar Dec 18 '25 00:12 aronchick

On reflection - I really think we should pull state maintenance out. It's quite complex and is a big leap over what tail -F offers.

If you think that then I think we could take it out (at least for now) ... I can see why it would be useful.

For now maybe what we could try is to include the FilePosition struct as metadata, and then a stream pipeline could make use of a cache and implement it's own logic around discarding lines already read?

Also I think that more could come out so:

Metrics

Need to look at the metrics this input is exposing and if they conflict with ones the Bento stream engine already exposes listed here. Also I notice that in this PR we have a metricsFlusher() func - is this needed - could we take an approach similar to processor_metric.go?

EDIT: - unsure of another component that does actually emit it's own metrics - would consider removing them entirely at this time.

Logging

It's safe to assume that the *service.Logger is not nil so we can just remove nil checks around the logger:

		if logger != nil {
			logger.Warnf("Failed to load previous position: %v", err)
		}

and:

func (sfi *StreamingFileInput) logDebugf(format string, args ...interface{}) {
	if sfi.logger != nil {
		sfi.logger.Debugf(format, args...)
	}
}

I'll hold off on a finer detail review until we can decide on scope of the new streaming_file input.

jem-davies avatar Dec 18 '25 13:12 jem-davies

Ok! I've done a really hard core stripping out :)

I think it's much cleaner now - and only 600 lines. Want to look again?

aronchick avatar Dec 18 '25 18:12 aronchick

I think it's much cleaner now - and only 600 lines. Want to look again?

Ok great - I'll do a more finer review - and then we can look to merge 😄

jem-davies avatar Dec 18 '25 21:12 jem-davies

Started to do a more in-depth review but fyi I went to setup a simple config:

log.txt:

hello Alice

config.yaml:

input:
  streaming_file:
    path: ./log.txt

output:
  stdout: {}

append to the log.txt

echo "hello Bob\n" >> log.txt
ERRO Failed to read message: context deadline exceeded  @service=bento label="" path=root.input

I will continue to review but just wanted to make sure that ☝️ is right - and I am not missing something obvious?

jem-davies avatar Dec 19 '25 12:12 jem-davies

UH. No that definitely worked on my machine. Let me see if i can repro.

How are you setting up and using bento? Do you have a test app/framework you're using i can copy?

aronchick avatar Dec 19 '25 16:12 aronchick

OK. I figured it out. I was putting a ReadTimeout in for the purposes of being defensive, but that's actually silly - tail doesn't have a read timeout, it listens forever.

So dropped it and all good. Let me do some more testing, and i'll check in.

aronchick avatar Dec 19 '25 16:12 aronchick

OK. I figured it out. I was putting a ReadTimeout in for the purposes of being defensive, but that's actually silly - tail doesn't have a read timeout, it listens forever.

Ok it's working now 😄

jem-davies avatar Dec 19 '25 18:12 jem-davies

I went a little nuts and created some scripts to test in a bunch of ways (one is super aggressive - 50% drop rate is expected (5.5M rows in 3/sec ;)).

  • Stress test script: https://gist.github.com/aronchick/ea9ef3c03e4032db57727d2ce779303e
  • Simple script testing: https://gist.github.com/aronchick/f4e5b90d3cd7df7adbf479a8e3a6e926
  • Sample config file: https://gist.github.com/aronchick/6acfdb7381205d9958b9dda21590c253

aronchick avatar Dec 19 '25 18:12 aronchick

i cut a bit more of the complexity thanks to @gregfurman's comments on my other PR #624 :)

aronchick avatar Dec 24 '25 20:12 aronchick