FiloDB icon indicating copy to clipboard operation
FiloDB copied to clipboard

fix(query): correct partition-split query pushdowns

Open alextheimer opened this issue 2 years ago • 0 comments

Pull Request checklist

  • [x] The commit(s) message(s) follows the contribution guidelines ?
  • [x] Tests for the changes have been added (for bug fixes / features) ?
  • [ ] Docs have been added / updated (for bug fixes / features) ?

Currently, MultiPartitionPlanner materialization of split-partition Aggregates (i.e. those with child plans that individually span multiple partitions) gives an ExecPlan that is structurally correct, but its execute gives an incorrect result; when a query spans a single partition split, data from the newer partition is dropped.

This PR does not address the root cause directly (still unknown as of this update), but it materializes Aggregates with a simpler structure that gives the correct (i.e. complete) data on execute.

This PR also makes two additional changes:

  • Split-partition BinaryJoins are now materialized such that result row timestamps always align to the steps defined by a query's start and step.
  • Split-partition PeriodicSeriesWithWindowing, SubqueryWithWindowing, ApplyInstantFunction, ScalarVaryingDoublePlan, and ApplyAbsentFunction plans are now also pushed down when possible. Additionally, they are explicitly failed if they (a) contain multiple unique non-metric shard keys, or (b) contain a BinaryJoin with an offset.

Old Aggregate:

T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(123,456,789))
-E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))
--T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List())
---E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))
----E~PromQlRemoteExec(PromQlQueryParams(sum(test{job="app"}),123,456,306,None,false), PlannerParams(filodb,None,None,None,None,30000,1000000,100000,100000,18000000,false,86400000,86400000,false,true,false,false), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))
----E~PromQlRemoteExec(PromQlQueryParams(sum(test{job="app"}),789,456,789,None,false), PlannerParams(filodb,None,None,None,None,30000,1000000,100000,100000,18000000,false,86400000,86400000,false,true,false,false), queryEndpoint=remote1-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))

New Aggregate:

E~MultiPartitionDistConcatExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))
-E~PromQlRemoteExec(PromQlQueryParams(sgn(test{job="app"}) + 123,123,45,3306,None,false), PlannerParams(filodb,None,None,None,None,30000,1000000,100000,100000,18000000,false,86400000,86400000,false,true,false,false), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))
-E~PromQlRemoteExec(PromQlQueryParams(sgn(test{job="app"}) + 123,3633,45,6789,None,false), PlannerParams(filodb,None,None,None,None,30000,1000000,100000,100000,18000000,false,86400000,86400000,false,true,false,false), queryEndpoint=remote1-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))

Old SubqueryWithWindowing:

T~PeriodicSamplesMapper(start=123000, step=45000, end=6789000, window=Some(900000), functionId=Some(Rate), rawSource=false, offsetMs=None)
-E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))
--E~PromQlRemoteExec(PromQlQueryParams(rate(foo{job="app1"}[15m:30s]),123,45,3456,None,false), PlannerParams(filodb,None,None,None,None,30000,1000000,100000,100000,18000000,false,86400000,86400000,false,true,false,false), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))
--E~PromQlRemoteExec(PromQlQueryParams(rate(foo{job="app1"}[15m:30s]),3768,45,6789,None,false), PlannerParams(filodb,None,None,None,None,30000,1000000,100000,100000,18000000,false,86400000,86400000,false,true,false,false), queryEndpoint=remote1-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))

New SubqueryWithWindowing:

E~MultiPartitionDistConcatExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))
-E~PromQlRemoteExec(PromQlQueryParams(rate(foo{job="app1"}[15m:30s] offset 10m),123,45,4056,None,false), PlannerParams(filodb,None,None,None,None,30000,1000000,100000,100000,18000000,false,86400000,86400000,false,true,false,false), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))
-E~PromQlRemoteExec(PromQlQueryParams(rate(foo{job="app1"}[15m:30s] offset 10m),5298,45,6789,None,false), PlannerParams(filodb,None,None,None,None,30000,1000000,100000,100000,18000000,false,86400000,86400000,false,true,false,false), queryEndpoint=remote1-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))

Old ApplyInstantFunction (applies function twice):

T~InstantVectorFunctionMapper(function=Sgn)
-E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))
--E~PromQlRemoteExec(PromQlQueryParams(sgn(test{job="app"} offset 10m),0,3,5600,None,false), PlannerParams(filodb,None,None,None,None,30000,1000000,100000,100000,18000000,false,86400000,86400000,false,true,false,false), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))
--E~PromQlRemoteExec(PromQlQueryParams(sgn(test{job="app"} offset 10m),5301,3,9999,None,false), PlannerParams(filodb,None,None,None,None,30000,1000000,100000,100000,18000000,false,86400000,86400000,false,true,false,false), queryEndpoint=remote1-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))

New ApplyInstantFunction:

E~MultiPartitionDistConcatExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))
-E~PromQlRemoteExec(PromQlQueryParams(sgn(test{job="app"} offset 10m),0,3,5600,None,false), PlannerParams(filodb,None,None,None,None,30000,1000000,100000,100000,18000000,false,86400000,86400000,false,true,false,false), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))
-E~PromQlRemoteExec(PromQlQueryParams(sgn(test{job="app"} offset 10m),5901,3,9999,None,false), PlannerParams(filodb,None,None,None,None,30000,1000000,100000,100000,18000000,false,86400000,86400000,false,true,false,false), queryEndpoint=remote1-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))

Old ScalarVaryingDoublePlan (applies scalar function twice):

T~ScalarFunctionMapper(function=Scalar, funcParams=List())
-E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))
--E~PromQlRemoteExec(PromQlQueryParams(scalar(test{job="app"} offset 10m),0,3,5600,None,false), PlannerParams(filodb,None,None,None,None,30000,1000000,100000,100000,18000000,false,86400000,86400000,false,true,false,false), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))
--E~PromQlRemoteExec(PromQlQueryParams(scalar(test{job="app"} offset 10m),5301,3,9999,None,false), PlannerParams(filodb,None,None,None,None,30000,1000000,100000,100000,18000000,false,86400000,86400000,false,true,false,false), queryEndpoint=remote1-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))

New ScalarVaryingDoublePlan:

E~MultiPartitionDistConcatExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))
-E~PromQlRemoteExec(PromQlQueryParams(scalar(test{job="app"} offset 10m),0,3,5600,None,false), PlannerParams(filodb,None,None,None,None,30000,1000000,100000,100000,18000000,false,86400000,86400000,false,true,false,false), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))
-E~PromQlRemoteExec(PromQlQueryParams(scalar(test{job="app"} offset 10m),5901,3,9999,None,false), PlannerParams(filodb,None,None,None,None,30000,1000000,100000,100000,18000000,false,86400000,86400000,false,true,false,false), queryEndpoint=remote1-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))

Old ApplyAbsentFunction (applies absent twice):

T~AbsentFunctionMapper(columnFilter=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))) rangeParams=RangeParams(0,3,9999) metricColumn=__name__)
-E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))
--T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List(job))
---E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))
----E~PromQlRemoteExec(PromQlQueryParams(absent(test{job="app"} offset 10m),0,3,5600,None,false), PlannerParams(filodb,None,None,None,None,30000,1000000,100000,100000,18000000,false,86400000,86400000,false,true,false,false), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))
----E~PromQlRemoteExec(PromQlQueryParams(absent(test{job="app"} offset 10m),5301,3,9999,None,false), PlannerParams(filodb,None,None,None,None,30000,1000000,100000,100000,18000000,false,86400000,86400000,false,true,false,false), queryEndpoint=remote1-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))

New ApplyAbsentFunction:

E~MultiPartitionDistConcatExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))
-E~PromQlRemoteExec(PromQlQueryParams(absent(test{job="app"} offset 10m),0,3,5600,None,false), PlannerParams(filodb,None,None,None,None,30000,1000000,100000,100000,18000000,false,86400000,86400000,false,true,false,false), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))
-E~PromQlRemoteExec(PromQlQueryParams(absent(test{job="app"} offset 10m),5901,3,9999,None,false), PlannerParams(filodb,None,None,None,None,30000,1000000,100000,100000,18000000,false,86400000,86400000,false,true,false,false), queryEndpoint=remote1-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))

(PeriodicSeriesWithWindowing is structurally unchanged since pushdowns already occurred, but window size is now correctly accounted for at the partition split.)

alextheimer avatar Aug 19 '22 06:08 alextheimer