pinot icon indicating copy to clipboard operation
pinot copied to clipboard

[partial-upsert] Ensuring Data Consistency after Rebalance

Open ankitsultana opened this issue 1 year ago • 2 comments

Issue Description

(I haven't spent time reproducing this. The following is based on my understanding.)

Partial Upsert tables merge the incoming event with the existing latest version of the record.

Say we have two replicas of consuming segments: S0 and S1.

Say the segments with the previous sequence id for this segment in the replicas are: P0 and P1.

While a rebalance is going on, say P0 gets moved to the target server before P1, and between that time we had a record come to S0 which needed to be read from P0.

If a segment commit happens before the consuming segments were moved, we will end up with S0, S1 having different data.

Discussion

Are there any guardrails to prevent this?

The main check I know we have is the allSegmentsLoaded check, but that is applicable only for new consuming segments.

If the above is true, I'd suggest that we add a check in the rebalance code, which makes the rebalance a no-op/return-an-error until the user pauses consumption.

ankitsultana avatar Feb 11 '24 01:02 ankitsultana

Hey @ankitsultana I was looking into this.

So based on your example:


Say the segments with the previous sequence id for this segment in the replicas are: P0 and P1.

While a rebalance is going on, say P0 gets moved to the target server before P1, and between that time we had a record come to S0 which needed to be read from P0.

If a segment commit happens before the consuming segments were moved, we will end up with S0, S1 having different data.

When we rebalance with the includeConsuming option set to true, according to Pinot's official documentation:

CONSUMING segments are rebalanced only if this is set to true.
Moving a CONSUMING segment involves dropping the data consumed so far on old server, and re-consuming on the new server. 

In the case of partial-upsert, an entire partition will be moved to another node, not just a few segments. As you mentioned, allSegmentsLoaded will prevent the consumption from starting on the new node until all the old segments for the partition are available, ensuring the data is re-consumed properly.

Moreover, since we will rebalance at the replica level during a NoDowntime rebalance, once the first replica is in a stable consuming state, we will move to the next replica using the same logic. If a segment commit happens during this process, it should not result in different data because the allSegmentsLoaded condition will prevent any consumption inconsistencies.

But please let me know if there are any edge cases I might have missed. I haven't gone through the rebalance code in detail yet, so my understanding is based on documentation and theoretical knowledge.

tibrewalpratik17 avatar Jun 07 '24 10:06 tibrewalpratik17

In the case of partial-upsert, an entire partition will be moved to another node, not just a few segments. As you mentioned, allSegmentsLoaded will prevent the consumption from starting on the new node until all the old segments for the partition are available, ensuring the data is re-consumed properly.

There is a scenario in partial-upserts where some segments of a partition are moved during a rebalance, and the consuming segment continues consumption. This results in inconsistent data because it misses previous record references. If, instead of this consuming segment being rebalanced and reconsume entire data on new node, it commits the data, we would end up with different data across replicas.

Thinking out loud, I can think of few solutions to this:

  • Approach 1: Ensure that segments are not committed during an ongoing rebalance. Whenever a server reaches the threshold for committing a segment (the endCriteriaReached method), it should check for an ongoing rebalance for the table and make a hold call to prevent committing. The hold method will sleep the thread for a certain period and then check again if the rebalance has finished. Note: This approach requires server-to-controller communication (only for partial-upsert tables) to determine if a rebalance is in progress when commit criteria is met everytime. Ingestion stays at the same offset in that partition until further asked to change state.
  • Approach 2: Similar to Approach 1, but the controller handles the holding logic in the SegmentCompletionManager#holdingConsumed method. Until a rebalance is completed, the controller will ask the servers to hold committing. Note: It is unclear to me what happens if the consuming segment is relocated to a new instance and the controller then asks the winner to proceed with the commit job. Will it result in a No-Op operation? Even in this approach, ingestion stays at the same offset in that partition until notified by controller to change state.
  • Approach 3: Pause consumption for partial-upsert tables (pausing consumption triggers a force-commit as well). Then, perform the rebalance followed by resuming consumption. This entire process should be orchestrated within the rebalance job.

Approach 1 and Approach 2: These approaches will only come into play when a commit is happening at a partition-level during an ongoing rebalance. This minimizes the likelihood of ingestion lag during or after the rebalance operation.

Approach 3: This approach will cause ingestion lag across all partitions every time a rebalance is triggered for a partial-upsert table, as it pauses and resumes ingestion during the rebalance process. Additionally, it requires the rebalance operation to be more intrusive, as it needs to orchestrate the entire flow of pausing consumption, ensuring the current consuming segments are committed, and then resuming consumption after the rebalance is complete.

IMO approach 2 might be neater, as it allows the controller to solely decide when to allow or hold commits. We could further optimize approach 2 by checking if the partition, of the segment for which the commit request is received, is planned to be rebalanced to a new node or will remain on the same instance. If it is staying on the same instance, we can proceed with the commit.

cc @Jackie-Jiang @ankitsultana what are your thoughts?

tibrewalpratik17 avatar Jun 12 '24 12:06 tibrewalpratik17