pinot icon indicating copy to clipboard operation
pinot copied to clipboard

Realtime servers reporting lag for partitions they don't own after rebalancing w/o including consuming segments

Open priyen opened this issue 2 years ago • 17 comments

The scenario is as follows, assuming we have 3 replicas for the table:

  • increase instances / replica config by some number
  • a rebalance is kicked off with reAssignInstances=True, minReplicas=2, and includeConsuming=False, and downtime=False
  • this should cause all sealed segments to move appropriately
  • finally, once a consuming segment seals, (consuming to online), we notice that the ingestion delay tracker (pinot.server.realtime_ingestion_delay) metric continues to rise from 0. Tracking the code, we determined this happens when the segment is sealed, but is no longer under ownership of that instance, and so it is also dropped. In the code, it is marked for "verification" as part of the transition message handling. At some point, the background thread will realize this partition is inactive, and stop tracking the lag. This takes 10~ mins, so we see a increasing lag over time from the moment this transition happens.

Relevant code - https://github.com/apache/pinot/blob/399f033ec3917df2bc478b5904406a95e0bc7258/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java#L91

Desired behaviour - lag tracking is stopped the moment the partition is transitioned/dropped from said instance. Right now, that function call simply marks the partition for verification

cc @jugomezv cc @jadami-stripe

priyen avatar Aug 28 '23 17:08 priyen

I had tested rebalancing and the expectation is that after 10-15 minutes the lag will stop reporting from server where the partition left, and that's what I saw in my experiments but let me look into this

jugomezv avatar Sep 05 '23 21:09 jugomezv

OK I recall the original reason for this design: when a partition is moved the only way for us to assure that the partition has moved is for us to go to the controller and verify. We could do this on receiving each drop but this will cause significant overhead in a highly fluid situation, so we decided to delay and batch such verifications in one shot as implemented now. The direct consequence is that you will see metrics emitted and ramping up for partitions that have moved during 10-15 minutes. Also such delay helps us avoid race conditions like: we get the dropped event we remove the partition but we then get another measure for that partition before it is dropped, the consequence of this will be a metric that ever ramps and never moves. Now keeping this in mind I am open to other suggestions you may have to handle this situations

jugomezv avatar Sep 06 '23 17:09 jugomezv

Doesn't calling updateIngestionDelay add the metric back for the partition group id? So if we were to remove it immediately in onConsumingToOnline, then we should see it come back as soon as consumption starts again. Though the downside here is if that process breaks, then we won't see the ingestion lag rise anymore?

jadami10 avatar Sep 07 '23 14:09 jadami10

The issue this tries to address is that we can't ensure there will not be updateIngestionDealy after the drop event. So the consequence of processing the drop and then getting and update will be a leftover metric that ever increases. The work around will be to confirm with the controller sooner but that will increase overhead specially in multi tenant servers that have many tables

jugomezv avatar Sep 08 '23 18:09 jugomezv

Hmmm, so if i'm understanding the issue, the events are:

  • segment goes from consuming -> online which happens every time it seals
  • at this point, several things might happen
    • the server keeps consuming, and we keep updating the ingestion delay (happy path)
    • the partition has moved, but we keep updating ingestion delay (our issue)
    • the server should keep consuming, but something is broken and it's not consuming anymore

and the issue is we can't distinguish between 2 and 3 without confirming with the controller. Though I still need to go look at the code for what happens during a rebalance to understand why the same helix message can't be threaded through here.

jadami10 avatar Sep 08 '23 19:09 jadami10

That's correct Johan!

jugomezv avatar Sep 08 '23 20:09 jugomezv

The current solution should be able to eventually remove the lag, but could take up to 15 minutes (10 minutes timeout + 5 minutes scheduling delay). There are 2 potential enhancements:

  1. Make timeout and scheduling delay configurable (both cluster level and table level)
  2. Add a mode to directly pull ideal state so that it doesn't need to do async verification

@priyen @jadami10 which solution do you think suits your use case better? Or maybe both of them are needed?

Jackie-Jiang avatar Jun 05 '24 21:06 Jackie-Jiang

@Jackie-Jiang, I wasn't able to track this down, but do servers consistently parse changes between idealstate/external view? Is there a way to do it when that happens and have it all a removePartition function on IngestionDelayTracker.java?

jadami10 avatar Jun 05 '24 22:06 jadami10

Server doesn't read IS on its own. Controller periodically scan all IS/EV and send messages to servers. In order to detect that the next segment doesn't come to this server, server will need to read the IS for that table. Basically that translates to one IS read per segment commit if we do it in the synchronize fashion.

Jackie-Jiang avatar Jun 05 '24 22:06 Jackie-Jiang

I wish there was a smaller ZK node we could look at. But given that we (and likely others) have use cases that want to meet SLC thresholds < 30s, then that's roughly how quickly we'd want the ingestion lag metric to reflect that a partition has been rebalanced away.

  1. is that possible? Can we just do this in the rebalance case?
  2. should we be looking at IS or EV in the case the a rebalance fails and things are in an inconsistent state for example.

jadami10 avatar Jun 05 '24 22:06 jadami10

  1. What is the typical freshness lag during segment commit? If we don't want to read IS for every segment commit, we need to configure a timeout longer than the common freshness lag so that it doesn't always timeout.
  2. We should look at IS to know if server still owns the segment

After a second thought, I feel I missed an important point. After rebalancing without consuming segments, when the current consuming segment commits, does it remain on the current server, or is moved to the new server? If it moves to the new server, it should trigger the CONSUMING -> DROPPED state transition, which does not need to wait for the 10 minutes timeout.

Jackie-Jiang avatar Jun 06 '24 00:06 Jackie-Jiang

The root issue might already be solved with #12351

Jackie-Jiang avatar Jun 06 '24 00:06 Jackie-Jiang

@Jackie-Jiang I just tested on 1.1, and we still see the issue @priyen-stripe reported. It's pretty easy to repro:

  • rebalance (do not include consuming)
  • force commit

you'll see pinot reports ingestion lag linearly going up

jadami10 avatar Jun 07 '24 14:06 jadami10

@jadami10 After the force commit, does the committed segment stay on the old servers, or moved to the new servers directly?

Jackie-Jiang avatar Jun 11 '24 17:06 Jackie-Jiang

@Jackie-Jiang , after force commit the committed segment remains on the same server unless another rebalance is triggered. To reproduce this issue, you rebalance w/o consuming, and then force commit, and servers who just sealed their segments will now emit lag that is not real lag

priyen avatar Jun 11 '24 20:06 priyen

I see. I think this is the root cause. Ideally when segment got committed, it should be moved directly to the new servers. Let me check the history and see if we intentionally keep the segment not moved after commit

Jackie-Jiang avatar Jun 11 '24 23:06 Jackie-Jiang

I think the reason why controller does not directly move committing segment is to avoid segment being unavailable during the move. Segment move should always be triggered by table rebalancer to honor the no-downtime segment movement.

I can think of one potential solution:

  • When controller detects that the new consuming segment is assigned to a different set of servers of the committing segment
  • Assign ONLINE state to new servers without removing CONSUMING state from the old servers
  • After the EV converges, remove CONSUMING state from the old servers

With the above solution, we also need to revisit the realtime segment validator so that it can recover from failures because segment commit is no longer done in one shot, and it might not correctly honor the table replication because there will be more replicas during segment commit

Jackie-Jiang avatar Jun 12 '24 19:06 Jackie-Jiang