materialize
materialize copied to clipboard
storage: implement the core bounded input reliance logic
This PR implements the core BIR logic, as described in https://github.com/MaterializeInc/materialize/pull/14314, epic: https://github.com/MaterializeInc/materialize/issues/13534. It does NOT implement the source-specific behavior to actually commit offsets, that will come later; instead, it builds on previous PR's to be able to thread the offset resumption frontier (actually, its no longer a frontier at this point) to the source implementations.
I highly recommend reading the commits separately, at least once. They are, from bottom to top, as follows (with some additional notes for each):
- The first commit is just refactoring. It moves the source stream creation into a separate function.
- To manage this (ie, remove lifetime errors),
scope
is removed fromRawSourceCreationConfig
, andBaseMetrics
is cloned more than before. This can probably be worked around, but I felt it wasn't worth it
- To manage this (ie, remove lifetime errors),
- The second commit is the core logic. It:
- Adds
OffsetCommitter
as an associated type toSourceReader
s, which are now required to return one on construction. - Drives this offset committer with a separate task, which means the core operation can be async, but our use of a
tokio::sync::watch
makes committing effectively instantaneous. This logic is implemented in the newsource::commit
module - Note that because the source reader operator is now async, we could drive the
OffsetCommitter
directly, but that would delay source ingestion based on the throughput of committing. The current scheme allows an offset committer to be slow, and "miss" new offset resumption frontier updates but always make progress (i.e. it decouples the commit sped with the interval we produce new frontiers at) cc @petrosagg - Splitting the
SourceReader
andOffsetCommitter
into 2 pieces has 2 advantages:- It allows sources that require coordination between the stream and the committing to operate (with a channel), while also allowing simpler ones like kafka that can just create a new client nice and simple!
- Cleans up the
source_reader_operator
A LOT, by allowing the mainSourceReader
to still just act as aStream
- connects everything together, and defaults ALL sources to the
LogCommitter
implementation, that just logs when it receives new frontiers
- Adds
- The third commit turns
HashMap<PartitionId, MzOffset>
's intoOffsetAntichain
's, defined in the newsource::antichain
module- This is a more clear description of what is going on, and allows us to encapsulate weird logic (like adding and subtracting from frontiers to get real offset values) in one module.
- Note that this attempts to cleanup all uses of a "source frontier" concept, but there are still some subtleties. The docs in the module attempt to describe them.
- There is an outstanding TODO to move the logic here: https://github.com/MaterializeInc/materialize/blob/main/src/storage/src/source/source_reader_pipeline.rs#L513-L522 into this module. However, I admit I don't really understand how this works, and how it interacts with
reclock_frontier
, so @aljoscha and I will have to discuss and clean that up later
- The fourth commit ensures we have initialized the
ReclockFollower
we use forsource_upper_at_frontier
in the source reader operator.- This also resolves a TODO where we were doing extra persist reads in initialization of this operator. Nice and clean!
- The fifth commit adds simple logic to avoid re-committing the same offsets
- The 6th commit compact the in-memory trace of the
ReclockOperator
after we commit. The ordering here doesn't really matter, but its convenient
Motivation
- This PR adds a known-desirable feature.
Tips for reviewer
Commits should probably be viewed separately. See above for details
Checklist
- [x] This PR has adequate test coverage / QA involvement has been duly considered.
- [ ] This PR evolves an existing
$T ⇔ Proto$T
mapping (possibly in a backwards-incompatible way) and therefore is tagged with aT-protobuf
label. - [ ] This PR includes the following user-facing behavior changes: