FiloDB icon indicating copy to clipboard operation
FiloDB copied to clipboard

fix(query): remove unnecessary plans from materialized aggregations/joins

Open alextheimer opened this issue 10 months ago • 0 comments

Pull Request checklist

  • [x] The commit(s) message(s) follows the contribution guidelines ?
  • [x] Tests for the changes have been added (for bug fixes / features) ?
  • [ ] Docs have been added / updated (for bug fixes / features) ?

Currently, unnecessary ReduceAggregateExecs and DistConcatExecs are included with materialized joins and aggregations. For example, consider:

sum (
  sum by (tschemaLabel) (
    foo{_ws_="demo", _ns_=~"both.*"}
  )
)

Currently, the plan is materialized as:

T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(1634777230,10,1634777330))
-E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0)))
--T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List())
---E~MultiPartitionDistConcatExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0)))
----E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1804467972],raw)
-----E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1804467972],raw)
------T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List(tschemaLabel))
-------T~PeriodicSamplesMapper(start=1634777230000, step=10000, end=1634777330000, window=None, functionId=None, rawSource=true, offsetMs=None)
--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1634776930000,1634777330000), filters=List(ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(local))), colName=None, schema=None) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0)))
-----E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1804467972],raw)
------T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List(tschemaLabel))
-------T~PeriodicSamplesMapper(start=1634777230000, step=10000, end=1634777330000, window=None, functionId=None, rawSource=true, offsetMs=None)
--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634776930000,1634777330000), filters=List(ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(local))), colName=None, schema=None) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0)))
----E~PromQlRemoteExec(PromQlQueryParams(sum(foo{_ws_="demo",_ns_="remote"}) by (tschemaLabel),1634777230,10,1634777330,None,false), PlannerParams(filodb,None,None,None,Some(FunctionalTargetSchemaProvider(~)),60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,true,true,false,false,true,10,false), queryEndpoint=remote-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0)))

The plan includes unnecessary aggregation plans and concatenations, which can artificially inflate a query's QueryStats values (i.e. resultBytes).

This PR applies two fixes:

  • In ShardKeyRegexPlanner and MultiPartitionPlanner, materialize join/aggregation inner plans with walkLogicalPlanTree instead of materialize. This allows more than one plan to be returned from the inner plan's materialization; a concatenation plan is no longer required.
  • In DefaultPlanner::addAggregator, perform one additional check: if the list of plans to be aggregated only contains one plan, skip the LocalPartitionReduceAggregateExec wrapper.

After this PR, the plan will instead be materialized as:

T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(1634777230,10,1634777330))
-E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0)))
--T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List())
---E~LocalPartitionDistConcatExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0)))
----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List(tschemaLabel))
-----T~PeriodicSamplesMapper(start=1634777230000, step=10000, end=1634777330000, window=None, functionId=None, rawSource=true, offsetMs=None)
------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1634776930000,1634777330000), filters=List(ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(local))), colName=None, schema=None) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0)))
----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List(tschemaLabel))
-----T~PeriodicSamplesMapper(start=1634777230000, step=10000, end=1634777330000, window=None, functionId=None, rawSource=true, offsetMs=None)
------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634776930000,1634777330000), filters=List(ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(local))), colName=None, schema=None) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0)))
--T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List())
---E~PromQlRemoteExec(PromQlQueryParams(sum(foo{_ws_="demo",_ns_="remote"}) by (tschemaLabel),1634777230,10,1634777330,None,false), PlannerParams(filodb,None,None,None,Some(FunctionalTargetSchemaProvider(~)),60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,true,true,false,false,true,10,false), queryEndpoint=remote-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,

Additional Fixes

  • Correctly apply RVTs to RemoteExec plans.

alextheimer avatar Mar 30 '24 07:03 alextheimer