materialize icon indicating copy to clipboard operation
materialize copied to clipboard

storage: implement the core bounded input reliance logic

Open guswynn opened this issue 2 years ago • 0 comments

This PR implements the core BIR logic, as described in https://github.com/MaterializeInc/materialize/pull/14314, epic: https://github.com/MaterializeInc/materialize/issues/13534. It does NOT implement the source-specific behavior to actually commit offsets, that will come later; instead, it builds on previous PR's to be able to thread the offset resumption frontier (actually, its no longer a frontier at this point) to the source implementations.

I highly recommend reading the commits separately, at least once. They are, from bottom to top, as follows (with some additional notes for each):

  • The first commit is just refactoring. It moves the source stream creation into a separate function.
    • To manage this (ie, remove lifetime errors), scope is removed from RawSourceCreationConfig, and BaseMetrics is cloned more than before. This can probably be worked around, but I felt it wasn't worth it
  • The second commit is the core logic. It:
    • Adds OffsetCommitter as an associated type to SourceReaders, which are now required to return one on construction.
    • Drives this offset committer with a separate task, which means the core operation can be async, but our use of a tokio::sync::watch makes committing effectively instantaneous. This logic is implemented in the new source::commit module
    • Note that because the source reader operator is now async, we could drive the OffsetCommitter directly, but that would delay source ingestion based on the throughput of committing. The current scheme allows an offset committer to be slow, and "miss" new offset resumption frontier updates but always make progress (i.e. it decouples the commit sped with the interval we produce new frontiers at) cc @petrosagg
    • Splitting the SourceReader and OffsetCommitter into 2 pieces has 2 advantages:
      • It allows sources that require coordination between the stream and the committing to operate (with a channel), while also allowing simpler ones like kafka that can just create a new client nice and simple!
      • Cleans up the source_reader_operator A LOT, by allowing the main SourceReader to still just act as a Stream
    • connects everything together, and defaults ALL sources to the LogCommitter implementation, that just logs when it receives new frontiers
  • The third commit turns HashMap<PartitionId, MzOffset>'s into OffsetAntichain's, defined in the new source::antichain module
    • This is a more clear description of what is going on, and allows us to encapsulate weird logic (like adding and subtracting from frontiers to get real offset values) in one module.
    • Note that this attempts to cleanup all uses of a "source frontier" concept, but there are still some subtleties. The docs in the module attempt to describe them.
    • There is an outstanding TODO to move the logic here: https://github.com/MaterializeInc/materialize/blob/main/src/storage/src/source/source_reader_pipeline.rs#L513-L522 into this module. However, I admit I don't really understand how this works, and how it interacts with reclock_frontier, so @aljoscha and I will have to discuss and clean that up later
  • The fourth commit ensures we have initialized the ReclockFollower we use for source_upper_at_frontier in the source reader operator.
    • This also resolves a TODO where we were doing extra persist reads in initialization of this operator. Nice and clean!
  • The fifth commit adds simple logic to avoid re-committing the same offsets
  • The 6th commit compact the in-memory trace of the ReclockOperator after we commit. The ordering here doesn't really matter, but its convenient

Motivation

  • This PR adds a known-desirable feature.

Tips for reviewer

Commits should probably be viewed separately. See above for details

Checklist

guswynn avatar Sep 19 '22 21:09 guswynn