FiloDB
FiloDB copied to clipboard
fix(query): correct partition-split query pushdowns
Pull Request checklist
- [x] The commit(s) message(s) follows the contribution guidelines ?
- [x] Tests for the changes have been added (for bug fixes / features) ?
- [ ] Docs have been added / updated (for bug fixes / features) ?
Currently, MultiPartitionPlanner
materialization of split-partition Aggregate
s (i.e. those with child plans that individually span multiple partitions) gives an ExecPlan
that is structurally correct, but its execute
gives an incorrect result; when a query spans a single partition split, data from the newer partition is dropped.
This PR does not address the root cause directly (still unknown as of this update), but it materializes Aggregate
s with a simpler structure that gives the correct (i.e. complete) data on execute
.
This PR also makes two additional changes:
- Split-partition
BinaryJoin
s are now materialized such that result row timestamps always align to the steps defined by a query'sstart
andstep
. - Split-partition
PeriodicSeriesWithWindowing
,SubqueryWithWindowing
,ApplyInstantFunction
,ScalarVaryingDoublePlan
, andApplyAbsentFunction
plans are now also pushed down when possible. Additionally, they are explicitly failed if they (a) contain multiple unique non-metric shard keys, or (b) contain aBinaryJoin
with an offset.
Old Aggregate
:
T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(123,456,789))
-E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))
--T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List())
---E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))
----E~PromQlRemoteExec(PromQlQueryParams(sum(test{job="app"}),123,456,306,None,false), PlannerParams(filodb,None,None,None,None,30000,1000000,100000,100000,18000000,false,86400000,86400000,false,true,false,false), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))
----E~PromQlRemoteExec(PromQlQueryParams(sum(test{job="app"}),789,456,789,None,false), PlannerParams(filodb,None,None,None,None,30000,1000000,100000,100000,18000000,false,86400000,86400000,false,true,false,false), queryEndpoint=remote1-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))
New Aggregate
:
E~MultiPartitionDistConcatExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))
-E~PromQlRemoteExec(PromQlQueryParams(sgn(test{job="app"}) + 123,123,45,3306,None,false), PlannerParams(filodb,None,None,None,None,30000,1000000,100000,100000,18000000,false,86400000,86400000,false,true,false,false), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))
-E~PromQlRemoteExec(PromQlQueryParams(sgn(test{job="app"}) + 123,3633,45,6789,None,false), PlannerParams(filodb,None,None,None,None,30000,1000000,100000,100000,18000000,false,86400000,86400000,false,true,false,false), queryEndpoint=remote1-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))
Old SubqueryWithWindowing
:
T~PeriodicSamplesMapper(start=123000, step=45000, end=6789000, window=Some(900000), functionId=Some(Rate), rawSource=false, offsetMs=None)
-E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))
--E~PromQlRemoteExec(PromQlQueryParams(rate(foo{job="app1"}[15m:30s]),123,45,3456,None,false), PlannerParams(filodb,None,None,None,None,30000,1000000,100000,100000,18000000,false,86400000,86400000,false,true,false,false), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))
--E~PromQlRemoteExec(PromQlQueryParams(rate(foo{job="app1"}[15m:30s]),3768,45,6789,None,false), PlannerParams(filodb,None,None,None,None,30000,1000000,100000,100000,18000000,false,86400000,86400000,false,true,false,false), queryEndpoint=remote1-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))
New SubqueryWithWindowing
:
E~MultiPartitionDistConcatExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))
-E~PromQlRemoteExec(PromQlQueryParams(rate(foo{job="app1"}[15m:30s] offset 10m),123,45,4056,None,false), PlannerParams(filodb,None,None,None,None,30000,1000000,100000,100000,18000000,false,86400000,86400000,false,true,false,false), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))
-E~PromQlRemoteExec(PromQlQueryParams(rate(foo{job="app1"}[15m:30s] offset 10m),5298,45,6789,None,false), PlannerParams(filodb,None,None,None,None,30000,1000000,100000,100000,18000000,false,86400000,86400000,false,true,false,false), queryEndpoint=remote1-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))
Old ApplyInstantFunction
(applies function twice):
T~InstantVectorFunctionMapper(function=Sgn)
-E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))
--E~PromQlRemoteExec(PromQlQueryParams(sgn(test{job="app"} offset 10m),0,3,5600,None,false), PlannerParams(filodb,None,None,None,None,30000,1000000,100000,100000,18000000,false,86400000,86400000,false,true,false,false), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))
--E~PromQlRemoteExec(PromQlQueryParams(sgn(test{job="app"} offset 10m),5301,3,9999,None,false), PlannerParams(filodb,None,None,None,None,30000,1000000,100000,100000,18000000,false,86400000,86400000,false,true,false,false), queryEndpoint=remote1-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))
New ApplyInstantFunction
:
E~MultiPartitionDistConcatExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))
-E~PromQlRemoteExec(PromQlQueryParams(sgn(test{job="app"} offset 10m),0,3,5600,None,false), PlannerParams(filodb,None,None,None,None,30000,1000000,100000,100000,18000000,false,86400000,86400000,false,true,false,false), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))
-E~PromQlRemoteExec(PromQlQueryParams(sgn(test{job="app"} offset 10m),5901,3,9999,None,false), PlannerParams(filodb,None,None,None,None,30000,1000000,100000,100000,18000000,false,86400000,86400000,false,true,false,false), queryEndpoint=remote1-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))
Old ScalarVaryingDoublePlan
(applies scalar function twice):
T~ScalarFunctionMapper(function=Scalar, funcParams=List())
-E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))
--E~PromQlRemoteExec(PromQlQueryParams(scalar(test{job="app"} offset 10m),0,3,5600,None,false), PlannerParams(filodb,None,None,None,None,30000,1000000,100000,100000,18000000,false,86400000,86400000,false,true,false,false), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))
--E~PromQlRemoteExec(PromQlQueryParams(scalar(test{job="app"} offset 10m),5301,3,9999,None,false), PlannerParams(filodb,None,None,None,None,30000,1000000,100000,100000,18000000,false,86400000,86400000,false,true,false,false), queryEndpoint=remote1-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))
New ScalarVaryingDoublePlan
:
E~MultiPartitionDistConcatExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))
-E~PromQlRemoteExec(PromQlQueryParams(scalar(test{job="app"} offset 10m),0,3,5600,None,false), PlannerParams(filodb,None,None,None,None,30000,1000000,100000,100000,18000000,false,86400000,86400000,false,true,false,false), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))
-E~PromQlRemoteExec(PromQlQueryParams(scalar(test{job="app"} offset 10m),5901,3,9999,None,false), PlannerParams(filodb,None,None,None,None,30000,1000000,100000,100000,18000000,false,86400000,86400000,false,true,false,false), queryEndpoint=remote1-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))
Old ApplyAbsentFunction
(applies absent twice):
T~AbsentFunctionMapper(columnFilter=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))) rangeParams=RangeParams(0,3,9999) metricColumn=__name__)
-E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))
--T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List(job))
---E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))
----E~PromQlRemoteExec(PromQlQueryParams(absent(test{job="app"} offset 10m),0,3,5600,None,false), PlannerParams(filodb,None,None,None,None,30000,1000000,100000,100000,18000000,false,86400000,86400000,false,true,false,false), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))
----E~PromQlRemoteExec(PromQlQueryParams(absent(test{job="app"} offset 10m),5301,3,9999,None,false), PlannerParams(filodb,None,None,None,None,30000,1000000,100000,100000,18000000,false,86400000,86400000,false,true,false,false), queryEndpoint=remote1-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))
New ApplyAbsentFunction
:
E~MultiPartitionDistConcatExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))
-E~PromQlRemoteExec(PromQlQueryParams(absent(test{job="app"} offset 10m),0,3,5600,None,false), PlannerParams(filodb,None,None,None,None,30000,1000000,100000,100000,18000000,false,86400000,86400000,false,true,false,false), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))
-E~PromQlRemoteExec(PromQlQueryParams(absent(test{job="app"} offset 10m),5901,3,9999,None,false), PlannerParams(filodb,None,None,None,None,30000,1000000,100000,100000,18000000,false,86400000,86400000,false,true,false,false), queryEndpoint=remote1-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))
(PeriodicSeriesWithWindowing
is structurally unchanged since pushdowns already occurred, but window size is now correctly accounted for at the partition split.)