pinot icon indicating copy to clipboard operation
pinot copied to clipboard

Upsert table intermittently misses out on records during concurrent ingestion updates

Open tibrewalpratik17 opened this issue 2 years ago • 11 comments

We have an partial-upsert table where the number of updates per key is pretty high in a short period of time (within an hour we get 1000s of updates for a key).

Between this, if we are querying for that particular primary key, we see no response from Pinot intermittently. I saw, this coincides with an update received for that primary key (query is received within 1 second of a new record for that key).

After few seconds, the record comes up again in the query response and everything works fine until there is another overlap of query-time and ingestion-time.

I suspect it might be happening because we update DocID by removing it first and then adding it again. https://github.com/apache/pinot/blob/168408aa8d0f94de3004abe6e49b6263cef24186/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java#L303-L308

And it might be a race condition where query is received between these 2 actions.

Is this expected behaviour? Is there a way we can guarantee atleast the older record (if not newer) during this time?

One of the possible solution can be to have read lock on validDocIds before updating and use the same lock in FilterPlanNode - https://github.com/apache/pinot/blob/168408aa8d0f94de3004abe6e49b6263cef24186/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java#L88-L99 But this can become a major issue during high-qps / high-ingestion scenario where the other thread gets starved.

One more approach can be to get a clone of validDocIds before updating and then update (add / remove) the docId in the clone and then copy it back to the original validDocID. This can work for updates in the present consuming segment but might not work for updates where previous record was in an old immutable segment. This will also have memory implications as we will have to maintain a clone of validDocID at every message. What if we do this in batches? We can reduce memory footprint but the implementation can become quite intrusive.

tibrewalpratik17 avatar Mar 18 '24 13:03 tibrewalpratik17

I don't think we can do any of the suggested approaches since it won't scale. The gap between add/remove should be on the order of microseconds afaict. Let's confirm if this is indeed the reason for result mismatch

ankitsultana avatar Mar 18 '24 14:03 ankitsultana

I don't think we can do any of the suggested approaches since it won't scale.

Agreed! Any other solution you can think of? Ideally, we should give back older record if not the latest. Not giving anything is what makes this issue more problematic.

The gap between add/remove should be on the order of microseconds afaict.

Yeah! But these can take more time as the updates are "synchronized". If any other thread of another partition tries to update the bitmap it can take higher time between which there can be a snapshot taken in FilterPlanNode.

Let's confirm if this is indeed the reason for result mismatch

Note this is not result mismatch but MISSING results to be more precise. We are getting records for the primary key usually but intermittently there are NO records instead of stale records / mismatched records. And according to my preliminary investigation it is happening during concurrent updates.

tibrewalpratik17 avatar Mar 18 '24 14:03 tibrewalpratik17

can you check if it aligned with segment commit time (replace consuming segment with sealed segment)? it can be addressed by #11964

deemoliu avatar Mar 18 '24 16:03 deemoliu

can you check if it aligned with segment commit time (replace consuming segment with sealed segment)? it can be addressed by https://github.com/apache/pinot/pull/11964

No the segment commit time doesn't align with this. We are observing this throughout segment consumption cycle itself. Note: I didn't see it happening at all during segment commit time.

tibrewalpratik17 avatar Mar 18 '24 16:03 tibrewalpratik17

cc @klsince

Jackie-Jiang avatar Mar 19 '24 19:03 Jackie-Jiang

Link this up with a related issue: https://github.com/apache/pinot/issues/11948. We used to miss data while starting a new consuming segment, but after fixing that, we found under-count or over-count issue like the one reported here.

There is another cause to this issue where queries might miss some data/PKs under data ingestion. The query processes segments (mutable and many immutable ones) in a non-deterministic order:

  1. if processing mutable segment firstly, the query might see less valid docs (under-count), as new data ingestion can invalidate docs in immutable segments before they get processed later
  2. if processing mutable segment last, the query might see more valid docs (over-count), as PKs of some newly ingested data may be processed while the query processes immutable segments earlier.

To solve this, we're thinking about to provide a consistent table view for queries for upsert tables. In general, we can take copy of bitmaps to form the consistent view for queries to use, while keeping ingesting data. Similar with what's proposed above.

We can take the copy on read path or write path:

  1. query threads to lock the consuming thread (and helix threads that are adding/replacing segments) and take copy of validDocId bitmaps, then release the lock and process the segments with bitmap copies. Pros: fresh view; Cons: costly and blocks ingestion as the query rate becomes higher.

  2. consuming thread (and also the helix threads) take copy of validDocId bitmaps, then atomically replace the old set of bitmaps with the new set. Queries use the bitmap copy to process segments. We can take copy periodically to amortize cost but sacrifice some data freshness, or take copy at handling individual record if we can do the copy very efficiently (only swap the updated 1 or 2 bitmaps into the consistent view atomically).

Those options are not exclusive. We may add all of them and use per need (query heavy or ingest heavy or strict freshness etc.)

klsince avatar Mar 19 '24 20:03 klsince

To solve this, we're thinking about to provide a consistent table view for queries for upsert tables. In general, we can take copy of bitmaps to form the consistent view for queries to use, while keeping ingesting data. Similar with what's proposed above. We can take the copy on read path or write path: query threads to lock the consuming thread (and helix threads that are adding/replacing segments) and take copy of validDocId bitmaps, then release the lock and process the segments with bitmap copies. Pros: fresh view; Cons: costly and blocks ingestion as the query rate becomes higher. consuming thread (and also the helix threads) take copy of validDocId bitmaps, then atomically replace the old set of bitmaps with the new set. Queries use the bitmap copy to process segments. We can take copy periodically to amortize cost but sacrifice some data freshness, or take copy at handling individual record if we can do the copy very efficiently (only swap the updated 1 or 2 bitmaps into the consistent view atomically). Those options are not exclusive. We may add all of them and use per need (query heavy or ingest heavy or strict freshness etc.)

@klsince do you think we have any other solutions? As called out above, both these solutions might not scale at high-qps / high-ingestion-throughput :')

tibrewalpratik17 avatar Mar 19 '24 21:03 tibrewalpratik17

for high-qps and high-ingest, we could refresh the copy of bitmaps periodically e.g. at seconds level vs. refreshing the bitmap copy at individual record or individual query, to amortize the cost of copying bitmaps, but with drawback of loss of data freshness at seconds level. I assume this is like what you meant by 'in batches' in your issue description.

could you help elaborate the concern on scaling in this way? cc @ankitsultana as I see you have raised the concern earlier on. thanks!

klsince avatar Mar 19 '24 22:03 klsince

for high-qps and high-ingest, we could refresh the copy of bitmaps periodically e.g. at seconds level vs. refreshing the bitmap copy at individual record or individual query, to amortize the cost of copying bitmaps, but with drawback of loss of data freshness at seconds level. I assume this is like what you meant by 'in batches' in your issue description. could you help elaborate the concern on scaling in this way?

Freshness of few seconds should still be okay i guess. Yes "in batches" is very similar to this. But we should ensure we are refreshing the copy before addRecord workflow so that we don't take a snapshot in intermittent state. Maybe a read-lock might help but not sure if it might lead to huge scalability challenges.

tibrewalpratik17 avatar Mar 19 '24 22:03 tibrewalpratik17

If let consuming thread take copy of bitmaps periodically, then we don't need locks between query threads and consuming threads. The consuming thread pauses periodically to take the copy of bitmaps as it's the only one modifying the bitmaps, and no need to take any locks, it just create a new map of bitmaps and replace the old map atomically (using volatile variable).

But a challenge is that some helix threads can also modify bitmaps when replacing/adding segments. So we need to coordinate those 'writer' threads with a R/W lock, so that only one writer thread can take W lock to take the copy of bitmaps, blocking other writer threads from modifying the bitmaps; but all writer threads can proceed in parallel if no copy is being taken. Most of the time, it'd just be the consuming thread to refresh the copy of bitmaps.

We can also make both options work together, i.e. taking copy at query time for better data freshness and taking copy on writer threads periodically for efficiency. We may add a query option for a query to specify the freshness SLA (e.g. 0 for no tolerance, 1sec for some tolerance etc.). If the copy of bitmaps is not fresh enough, the query thread can take the W lock to refresh the copy of bitmaps immediately, which blocks the ingestion a bit (as tested, copying a map of 1000 entries took about 60us/op). Upon high qps, queries could share the copy taken by other query, to amortize the cost a bit.

klsince avatar Mar 19 '24 22:03 klsince

The approach of providing a consistent view to queries looks good to me. Though the implementation would require a bit of redesign of the Upsert handling code right? @klsince

ankitsultana avatar Mar 21 '24 20:03 ankitsultana

Closing this for now as the fix #12976 has been merged. Will test in our clusters and reopen if needed.

tibrewalpratik17 avatar May 27 '24 13:05 tibrewalpratik17