sql icon indicating copy to clipboard operation
sql copied to clipboard

Add StreamPlan and MicroBatchExecution

Open penghuo opened this issue 3 years ago • 1 comments

  1. StreamPlan is a type of AbstractPlan which execute in continuesely
  2. MicroBatchExecution is one type of stream execution mode.

Limitations

  1. Currently, we did not consider coordination node failure recovery of stream execution.
  2. Also, we did not cover state recovery of stream execution. The state will maintain in memory.

penghuo avatar Oct 26 '22 01:10 penghuo

Micro-batch Execution State Management

Data Structure

  • offsetLog, A write-ahead-log that records the offsets that are present in each batch. In order to ensure that a given batch will always consist of the same data, we write to this log before any processing is done. Thus, the Nth record in this log indicated data that is currently being processed and the N-1th entry indicates which offsets have been durably committed to the sink.
    • getLatest(), return latest batchId and OffsetSeq.
    • get(batchId), return OffsetSeq.
  • commitLog, keep track the latest commit batchId.
    • getLatest(), return the latest commit batchId.
  • committedOffsets, **** tracks how much data we have processed and committed to the sink or state store from each input source.
  • availableOffsets, tracks the offsets that are available to be processed, but have not yet be committed to the sink.

Algorithms The MicroBatchExecution guarantee that

  • offsetLog.latest() return processing batch.
  • offsetLog.get(offsetLog.latest() - 1) return committed batch.
  • commitLog.latest() return latest committed batch.

At beginning, offsetLog and commitLog is empty.

  • At the beginning of each execution, calculate the diff between offsetLog and commitLog.
  • After collecting offset from source and before run query, MicroBatchExecution add processing offset to offsetLog.
  • After send data to sink, MicroBatchExecution add committed offset to commitLog.
offsetLog = empty
commitLog = empty

while(isActive) {
  latestBatchId = offsetLog.getLatest();
  availableOffset = offsetLog.get(latestBatchId);
  committedOffset = offsetLog.get(latestBatchId - 1);
  
  latestCommittedBatchId = commitLog.getLatest()

  if (latestBatchId == latestCommittedBatchId) {
    // last batch was successfully committed.
    currentBatchId = latestCommittedBatchId + 1
    committedOffsets ++= availableOffsets
  } else if (latestCommittedBatchId == latestBatchId - 1) {
    // last batch was not successfully committed.
    currentBatchId = latestBatchId
  } else {
    log.error("breaking loop invariant")
  }
  
  // before processing
  availableOffset = source.offset()
  offsetLog.add(currentBatchId, avaliableOffset)
  
  // processing
  
  // after sink
  commitLog.add(currentBatchId, avaliableOffset)
}

penghuo avatar Oct 26 '22 01:10 penghuo