datafusion
datafusion copied to clipboard
Exclude input plan's execution time from `elapsed_compute` statistic
Is your feature request related to a problem or challenge? Please describe what you are trying to do.
Baseline metrics have an elapsed_compute timer that counts the CPU time of one physical plan. It should not include the time that polling from inputs.
Describe the solution you'd like
I find these plans have the issue:
CoalesceBatchesExecSortMergeJoinExec
I'll move the timer after the input record batch is ready.
Additional context
An example I got:
Executor executed plan, request_id:7, plan_and_metrics: SortExec: [minute@0 ASC NULLS LAST], metrics=[output_rows=60, elapsed_compute=47.392µs, spill_count=0, spilled_bytes=0]
CoalescePartitionsExec, metrics=[output_rows=60, elapsed_compute=10.018µs, spill_count=0, spilled_bytes=0, mem_used=0]
ProjectionExec: expr=[time_bucket(cpu.timestamp,Utf8("PT60S"))@0 as minute, MAX(cpu.usage_user)@1 as max_usage_user, MAX(cpu.usage_system)@2 as max_usage_system, MAX(cpu.usage_idle)@3 as max_usage_idle, MAX(cpu.usage_nice)@4 as max_usage_nice, MAX(cpu.usage_iowait)@5 as max_usage_iowait], metrics=[output_rows=60, elapsed_compute=3.225µs, spill_count=0, spilled_bytes=0, mem_used=0]
AggregateExec: mode=FinalPartitioned, gby=[time_bucket(cpu.timestamp,Utf8("PT60S"))@0 as time_bucket(cpu.timestamp,Utf8("PT60S"))], aggr=[MAX(cpu.usage_user), MAX(cpu.usage_system), MAX(cpu.usage_idle), MAX(cpu.usage_nice), MAX(cpu.usage_iowait)], metrics=[output_rows=60, elapsed_compute=172.435µs, spill_count=0, spilled_bytes=0, mem_used=0]
CoalesceBatchesExec: target_batch_size=4096, metrics=[output_rows=60, elapsed_compute=91.765µs, spill_count=0, spilled_bytes=0, mem_used=0]
RepartitionExec: partitioning=Hash([Column { name: "time_bucket(cpu.timestamp,Utf8(\"PT60S\"))", index: 0 }], 6), metrics=[repart_time{inputPartition=5}=33.112µs, send_time{inputPartition=5}=26.289µs, fetch_time{inputPartition=5}=147.221616ms]
AggregateExec: mode=Partial, gby=[time_bucket(timestamp@0, PT60S) as time_bucket(cpu.timestamp,Utf8("PT60S"))], aggr=[MAX(cpu.usage_user), MAX(cpu.usage_system), MAX(cpu.usage_idle), MAX(cpu.usage_nice), MAX(cpu.usage_iowait)], metrics=[output_rows=60, elapsed_compute=334.918µs, spill_count=0, spilled_bytes=0, mem_used=0]
CoalesceBatchesExec: target_batch_size=4096, metrics=[output_rows=480, elapsed_compute=73.715112ms, spill_count=0, spilled_bytes=0, mem_used=0]
FilterExec: hostname@1 = host_804 OR hostname@1 = host_3829 OR hostname@1 = host_2786 OR hostname@1 = host_3242 OR hostname@1 = host_2509 OR hostname@1 = host_648 OR hostname@1 = host_204 OR hostname@1 = host_1769 AND timestamp@0 >= 1662349096000 AND timestamp@0 < 1662352696000, metrics=[output_rows=480, elapsed_compute=70.708739ms, spill_count=0, spilled_bytes=0, mem_used=0]
ScanTable: table=cpu, parallelism=8, order=None, , metrics=[]
Here CoalesceBatchesExec takes ~70ms. It includes the child node's execution time.