FiloDB
FiloDB copied to clipboard
fix(query): remove unnecessary plans from materialized aggregations/joins
Pull Request checklist
- [x] The commit(s) message(s) follows the contribution guidelines ?
- [x] Tests for the changes have been added (for bug fixes / features) ?
- [ ] Docs have been added / updated (for bug fixes / features) ?
Currently, unnecessary ReduceAggregateExec
s and DistConcatExec
s are included with materialized joins and aggregations. For example, consider:
sum (
sum by (tschemaLabel) (
foo{_ws_="demo", _ns_=~"both.*"}
)
)
Currently, the plan is materialized as:
T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(1634777230,10,1634777330))
-E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0)))
--T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List())
---E~MultiPartitionDistConcatExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0)))
----E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1804467972],raw)
-----E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1804467972],raw)
------T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List(tschemaLabel))
-------T~PeriodicSamplesMapper(start=1634777230000, step=10000, end=1634777330000, window=None, functionId=None, rawSource=true, offsetMs=None)
--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1634776930000,1634777330000), filters=List(ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(local))), colName=None, schema=None) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0)))
-----E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1804467972],raw)
------T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List(tschemaLabel))
-------T~PeriodicSamplesMapper(start=1634777230000, step=10000, end=1634777330000, window=None, functionId=None, rawSource=true, offsetMs=None)
--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634776930000,1634777330000), filters=List(ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(local))), colName=None, schema=None) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0)))
----E~PromQlRemoteExec(PromQlQueryParams(sum(foo{_ws_="demo",_ns_="remote"}) by (tschemaLabel),1634777230,10,1634777330,None,false), PlannerParams(filodb,None,None,None,Some(FunctionalTargetSchemaProvider(~)),60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,true,true,false,false,true,10,false), queryEndpoint=remote-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0)))
The plan includes unnecessary aggregation plans and concatenations, which can artificially inflate a query's QueryStats
values (i.e. resultBytes
).
This PR applies two fixes:
- In
ShardKeyRegexPlanner
andMultiPartitionPlanner
, materialize join/aggregation inner plans withwalkLogicalPlanTree
instead ofmaterialize
. This allows more than one plan to be returned from the inner plan's materialization; a concatenation plan is no longer required. - In
DefaultPlanner::addAggregator
, perform one additional check: if the list of plans to be aggregated only contains one plan, skip theLocalPartitionReduceAggregateExec
wrapper.
After this PR, the plan will instead be materialized as:
T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(1634777230,10,1634777330))
-E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0)))
--T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List())
---E~LocalPartitionDistConcatExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0)))
----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List(tschemaLabel))
-----T~PeriodicSamplesMapper(start=1634777230000, step=10000, end=1634777330000, window=None, functionId=None, rawSource=true, offsetMs=None)
------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1634776930000,1634777330000), filters=List(ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(local))), colName=None, schema=None) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0)))
----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List(tschemaLabel))
-----T~PeriodicSamplesMapper(start=1634777230000, step=10000, end=1634777330000, window=None, functionId=None, rawSource=true, offsetMs=None)
------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634776930000,1634777330000), filters=List(ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(local))), colName=None, schema=None) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0)))
--T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List())
---E~PromQlRemoteExec(PromQlQueryParams(sum(foo{_ws_="demo",_ns_="remote"}) by (tschemaLabel),1634777230,10,1634777330,None,false), PlannerParams(filodb,None,None,None,Some(FunctionalTargetSchemaProvider(~)),60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,true,true,false,false,true,10,false), queryEndpoint=remote-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,
Additional Fixes
- Correctly apply RVTs to
RemoteExec
plans.