jocko
jocko copied to clipboard
Log truncation and committing
My understanding of Kafka is that a follower truncates its log up to the high watermark on recovery to remove any potentially uncommitted messages (this might no longer be the case with the leader epoch changes).
https://github.com/travisjeffery/jocko/blob/e2a8d10c648b5a558711dcfa20e166c4ad2d7c73/jocko/broker.go#L1166-L1169
However, the implementation of Truncate
here looks like it deletes messages from the oldest offset up to the given offset rather than from the newest.
func (l *CommitLog) Truncate(offset int64) error {
l.mu.Lock()
defer l.mu.Unlock()
var segments []*Segment
for _, segment := range l.segments {
if segment.BaseOffset < offset {
if err := segment.Delete(); err != nil {
return err
}
} else {
segments = append(segments, segment)
}
}
l.segments = segments
return nil
}
From looking through some of the code, it looks like Jocko isn't implementing the normal ISR commit "flow", i.e. in Kafka when a message is written to the log, it's not committed until the ISR has replicated.
Is this truncation in Jocko serving a different purpose, am I misunderstanding Kafka, or am I just misreading the code?
Thanks!
From what I understand, you're talking about the WAL, which as you note is used to remove uncommitted entries.
I believe this is message log truncation so that you don't fill up your hdd and crash.
Yeah @kempjeng has it, the recovery may not match up 100% yet and the replication I'm working on now. But we need that truncation is needed to periodically remove old segments.
Isn't the Cleaner
interface responsible for compacting/truncating the log for purposes of preventing unbounded disk usage, i.e. DeleteCleaner
? It appears this is what uses MaxLogBytes
to control the log size: https://github.com/travisjeffery/jocko/blob/e2a8d10c648b5a558711dcfa20e166c4ad2d7c73/commitlog/commitlog.go#L56
The problem I see is that upon becoming a follower, the broker is deleting all of its log messages because it truncates to its newest offset starting from the beginning of the log.
hw := replica.Log.NewestOffset()
if err := replica.Log.Truncate(hw); err != nil {
return protocol.ErrUnknown.WithErr(err)
}
This happens inside becomeFollower
, so it seemed like the intention was to mimic what Kafka does here which is truncating uncommitted messages from the log.
Ahhh. Yes we need to truncate from the head here rather than truncate from the tail.