pinot
pinot copied to clipboard
Inconsistent Row Counts from Upsert Tables
Summary
We recently found that querying Pinot upsert
enabled tables can return different numbers of rows at different times. The row count returned can be higher than, equal to, or lower than the true count, whereas we expect it to be always the same as the true one.
Setup
Versions
Pinot Version: 0.8.0 Trino Version: 362
Kafka Topic
Topic topic1
is single partitioned, and the publisher sends over 1000 messages per second.
Table Config
The table has about 200 columns, 100 metric fields, and 100 dimension fields. Column A
serves as the primary key with ~20,000 unique values. The segment flush threshold is set to 10,000, which means a new segment is produced every 10 seconds.
{
"tableName":"table1",
"schema":{
"metricFieldSpecs":[
{
"name":"B",
"dataType":"DOUBLE"
}
],
"dimensionFieldSpecs":[
{
"name":"A",
"dataType":"STRING"
}
],
"dateTimeFieldSpecs":[
{
"name":"EPOCH",
"dataType":"INT",
"format":"1:SECONDS:EPOCH",
"granularity":"1:SECONDS"
}
],
"primaryKeyColumns":[
"A"
],
"schemaName":"schema1"
},
"realtime":{
"tableName":"table1",
"tableType":"REALTIME",
"segmentsConfig":{
"schemaName":"schema1",
"timeColumnName":"EPOCH",
"replicasPerPartition":"1",
"retentionTimeUnit":"DAYS",
"retentionTimeValue":"4",
"segmentPushType":"APPEND",
"completionConfig":{
"completionMode":"DOWNLOAD"
}
},
"tableIndexConfig":{
"invertedIndexColumns":[
"A"
],
"loadMode":"MMAP",
"nullHandlingEnabled":false,
"streamConfigs":{
"realtime.segment.flush.threshold.rows":"10000",
"realtime.segment.flush.threshold.time":"96h",
"streamType":"kafka",
"stream.kafka.consumer.type":"lowLevel",
"stream.kafka.topic.name":"topic1",
"stream.kafka.decoder.class.name":"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"stream.kafka.consumer.factory.class.name":"org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.broker.list":"kafka:9092",
"stream.kafka.consumer.prop.auto.offset.reset":"largest"
}
},
"tenants":{
},
"metadata":{
},
"routing":{
"instanceSelectorType":"strictReplicaGroup"
},
"upsertConfig":{
"mode":"FULL"
}
}
}
Issues
Case 1: one or two rows are missing
Steps to reproduce
- Bump up the segment flush threshold to a large enough number like 1,000,000 to make sure there is no flush.
- Run
select count(*) from table1
in PQL repeatedly and you will periodically see numbers like 19,998 or 19,997 while we expect 20,000.
Root cause
The process of updating an upsert
table segment is not atomic, using two steps remove
and add
instead. #7844 and #7860 by @Jackie-Jiang are trying to address it.
Case 2: hundreds or thousands of rows are missing
Steps to reproduce
- Set the segment flush threshold to a small enough number like 1,000 so that segment flushes happen frequently.
- Run
select count(*) from table1
in PQL repeatedly and you will see numbers like 16,703 or 18,234 while we expect 20,000 when a segment is flushed.
Root cause
Unclear - upsert across segments not atomic?
If this is truly due to the two-step update and there is no way to achieve atomicity, I would argue we should do add
before remove
since it is easier to de-duplicate than identity and backfill the missing row(s).
Case 3: duplicates are returned via Trino
Steps to reproduce
- Continue with the setup in Case 2.
- Run query1
select count(*) from table1 where from_unixtime(EPOCH) > current_timestamp - interval '15' minute
via Trino repeatedly and you will see numbers like 20023 when you expect to see 18,000 - over 2,000 duplicates returned.
Root cause
We happened to notice that an equivalent Trino query2 select count(*) from table1 where EPOCH > to_unixtime(current_timestamp - interval '15' minute)
doesn't yield duplicates. The difference is that query2 utilizes the pushdown support but query1 doesn't. We suspect when query1 is executed, it examines segments one by one and no locking is in place. For instance, it may first pull out all the valid records from segment1, after which all the valid records' locations are updated to segment2. Now when it comes to segment2, it again then retrieves all the valid records over there. In the end, it returns a union of the records from both segment1 and segment2, which contain duplicates.
CC: @mayankshriv @Jackie-Jiang @yupeng9 @elonazoulay
Thanks a lot for the detailed report and investigation!
Regarding 3, I think it's possible if the Trino execution takes long and the segment fetch takes some time apart. There's no snapshot concept so multiple calls to pinot table cannot guarantee consistency.
@yupeng9 As discussed in Slack, after patching #7844 and #7860, the off-by-one rate in case 1 has dropped from 50% to 2% during a test with 3000 repeated queries of select count(1) from <table>
before and after. However, the bad news is that it’s not down to zero yet. Any idea on what else we should look at?
Regarding case 3, Trino perhaps needs to perform deduplication when querying multiple validDocsIndex
.
One idea on debugging is to print virtual columns of $docId
and $segmentName
of the duplicated records. This will give some insights on the internal state, and along with some logs about the ingestion activity at that time.
@yupeng9 duplicated record is for case 3. Any idea on debugging case 1 further?
So What logs should we look for? There are tons of logs with debugging turned on.
This is a very interesting thread and great analysis. I can see issue 1 is being addressed and issue 3 seems like a limitation with the trino query pattern of hitting segments. Has anybody investigated issue 2 and possible reasons?
The issue of rows missing during segment flush seem to come from the fact that there is period where there are two consuming segments (the old one being rolled into immutable and the new one). In that period queries only hit the first consuming segment, which means any records having been updated and going into the new segments are invisible.
I have seen success running with a branch that changes RealtimeSegmentSelector (https://github.com/tuor713/pinot/commit/c2bc4f5afdd29d37c10944e57817c09d22d833be) - but that may well not be the right or best fix :)
@tuor713 Within a single streaming partition, there will be up to one consuming segment at the same time. The small inconsistency is caused by the consuming segment replacing the doc from the completed segments, and the validDocIds
from the segments are not read at the same time, e.g.:
- Query engine reads
validDocIds
from the completed segment (created a copy) - Consuming segment invalidates one doc from the completed segment (not visible to the query engine because a copy/snapshot is already made), and mark the doc as valid in its own
validDocIds
- Query engine reads
validDocIds
from the consuming segment - The same doc will be double counted
In order to solve this problem, we need to make global sync - take a snapshot of all queried segments while blocking the ingestion (as shown in the fix above). The solution works, and we can avoid creating the extra IndexSegment
snapshot objects by snapshotting the validDocIds
within the FilterPlanNode
, but it can cause starvation between query and ingestion. For high QPS use case, the query can block each other, and also the ingestion.
We can make it configurable for use cases that requires 100% consistency, but 100% consistency is usually not necessary for analytical purpose. Essentially it is a trade-off between consistency and performance.
Hm that is not what I saw on (at least on mid-Dec snapshot of 0.10) - on the broker side during query routing there were multiple consuming segments. I presume that is because the new online segments get created in ideal state at the same time the old one gets moved to online and the change in the external view is happening in parallel.
Agreed the replacing doc causes issues too (I have tried a patch where the upsert hash map track both the old consuming validDocIds and the new online validDocIds and replaces both, which appears to work). However, this is an issue of duplication. The loss of records with count dropping below expected population in my tests has come from the temporary existence of two consuming segments (as seen by the broker) only the first of which is queried.
Also agreed that after fixing these two issues there are still potential inconsistencies between consuming and online segments due to lack of global lock - fortunately this is an order of magnitude smaller issue at least in the tests I have tried than the aforementioned issues.
@tuor713 I see the routing issue now. On a single server, it can have up to one consuming segment per partition; but on multiple servers serving the same segment, they might not finish sealing the segment at the same time. On the broker side, it might show as multiple consuming segments. The routing fix in your commit can solve the inconsistency, but might also cause hotspot server because all the queries will be routed to the servers which finish the segment sealing first. To address this, one solution would be to also enhance the instance selector so that it queries the consuming segment on a server only when the server is picked to serve the last completed segment.
@Jackie-Jiang Is this PR still valid, or already covered by #8392 and #7958? If latter, could we close it?
@mayankshriv The routing issue is not solved yet, so let's keep the issue open
@tuor713 I see the routing issue now. On a single server, it can have up to one consuming segment per partition; but on multiple servers serving the same segment, they might not finish sealing the segment at the same time. On the broker side, it might show as multiple consuming segments. The routing fix in your commit can solve the inconsistency, but might also cause hotspot server because all the queries will be routed to the servers which finish the segment sealing first. To address this, one solution would be to also enhance the instance selector so that it queries the consuming segment on a server only when the server is picked to serve the last completed segment.
There is more to the current design on segment replicas. Like I mentioned in https://github.com/apache/pinot/issues/7850, no only is it problematic during a segment flush, it also leads to split query views when one replica goes down.
Hi, just wondering if there are any updates on Case 2? For our use case, it would be ideal if there weren't missing rows from when a segment is being flushed.
Hi , it looks like a recent pr (#16511) for trino fixed query1
, can you verify that a recent version of trino returns correct results?
Hi , it looks like a recent pr (#16511) for trino fixed
query1
, can you verify that a recent version of trino returns correct results?
Thanks for the follow-up. I left the previous company already so I won't be able to help verify this. I can try reaching out my ex-colleagues and see if they can do it.
The routing issue should be fixed with #11978