sql
sql copied to clipboard
Add StreamPlan and MicroBatchExecution
- StreamPlan is a type of AbstractPlan which execute in continuesely
- MicroBatchExecution is one type of stream execution mode.
Limitations
- Currently, we did not consider coordination node failure recovery of stream execution.
- Also, we did not cover state recovery of stream execution. The state will maintain in memory.
Micro-batch Execution State Management
Data Structure
- offsetLog, A write-ahead-log that records the offsets that are present in each batch. In order to ensure that a given batch will always consist of the same data, we write to this log before any processing is done. Thus, the Nth record in this log indicated data that is currently being processed and the N-1th entry indicates which offsets have been durably committed to the sink.
- getLatest(), return latest batchId and OffsetSeq.
- get(batchId), return OffsetSeq.
- commitLog, keep track the latest commit batchId.
- getLatest(), return the latest commit batchId.
- committedOffsets, **** tracks how much data we have processed and committed to the sink or state store from each input source.
- availableOffsets, tracks the offsets that are available to be processed, but have not yet be committed to the sink.
Algorithms The MicroBatchExecution guarantee that
- offsetLog.latest() return processing batch.
- offsetLog.get(offsetLog.latest() - 1) return committed batch.
- commitLog.latest() return latest committed batch.
At beginning, offsetLog and commitLog is empty.
- At the beginning of each execution, calculate the diff between offsetLog and commitLog.
- After collecting offset from source and before run query, MicroBatchExecution add processing offset to offsetLog.
- After send data to sink, MicroBatchExecution add committed offset to commitLog.
offsetLog = empty
commitLog = empty
while(isActive) {
latestBatchId = offsetLog.getLatest();
availableOffset = offsetLog.get(latestBatchId);
committedOffset = offsetLog.get(latestBatchId - 1);
latestCommittedBatchId = commitLog.getLatest()
if (latestBatchId == latestCommittedBatchId) {
// last batch was successfully committed.
currentBatchId = latestCommittedBatchId + 1
committedOffsets ++= availableOffsets
} else if (latestCommittedBatchId == latestBatchId - 1) {
// last batch was not successfully committed.
currentBatchId = latestBatchId
} else {
log.error("breaking loop invariant")
}
// before processing
availableOffset = source.offset()
offsetLog.add(currentBatchId, avaliableOffset)
// processing
// after sink
commitLog.add(currentBatchId, avaliableOffset)
}