datafusion
datafusion copied to clipboard
Sketch for aggregation intermediate results blocked management
Which issue does this PR close?
Part of #11931 , part of https://github.com/apache/datafusion/issues/7065
Rationale for this change
Detail can see #11931
Are these changes tested?
By exist tests.
Are there any user-facing changes?
Two functions are added to GroupValues and GroupAccumulator trait.
/// Returns `true` if this group values supports blocked mode.
fn supports_blocked_mode(&self) -> bool;
/// Switch the group values/accumulators to flat or blocked mode.
///
/// After switching mode, all data in previous mode will be cleared.
fn switch_to_mode(&mut self, mode: GroupStatesMode) -> Result<()>;
Thank you @Rachelint -- I hope to look at this more carefully tomorrow
The benchmark after impl blocked version GroupValuesRows and, blocked version count, sum, avg accumulators.
It make about 20% queries in clickbench 1.10~1.24x faster.
And after we impl blocked version for all group values and accumulators, more queries can be faster!
Comparing main and sketch-blocked-aggr-state-management
--------------------
Benchmark clickbench_1.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query ┃ main ┃ sketch-blocked-aggr-state-management ┃ Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 0 │ 0.72ms │ 0.69ms │ no change │
│ QQuery 1 │ 67.71ms │ 69.75ms │ no change │
│ QQuery 2 │ 169.92ms │ 170.43ms │ no change │
│ QQuery 3 │ 185.76ms │ 186.77ms │ no change │
│ QQuery 4 │ 1623.50ms │ 1649.09ms │ no change │
│ QQuery 5 │ 1603.76ms │ 1622.42ms │ no change │
│ QQuery 6 │ 61.56ms │ 62.45ms │ no change │
│ QQuery 7 │ 67.97ms │ 70.10ms │ no change │
│ QQuery 8 │ 2332.37ms │ 2407.51ms │ no change │
│ QQuery 9 │ 1952.18ms │ 2010.02ms │ no change │
│ QQuery 10 │ 551.12ms │ 549.55ms │ no change │
│ QQuery 11 │ 606.73ms │ 634.02ms │ no change │
│ QQuery 12 │ 1822.70ms │ 1779.43ms │ no change │
│ QQuery 13 │ 3414.56ms │ 3445.55ms │ no change │
│ QQuery 14 │ 2595.05ms │ 2309.24ms │ +1.12x faster │
│ QQuery 15 │ 1836.69ms │ 1845.98ms │ no change │
│ QQuery 16 │ 5056.23ms │ 4432.96ms │ +1.14x faster │
│ QQuery 17 │ 4903.18ms │ 4182.80ms │ +1.17x faster │
│ QQuery 18 │ 10383.16ms │ 8529.29ms │ +1.22x faster │
│ QQuery 19 │ 152.06ms │ 152.31ms │ no change │
│ QQuery 20 │ 3364.93ms │ 3370.44ms │ no change │
│ QQuery 21 │ 3972.92ms │ 3974.43ms │ no change │
│ QQuery 22 │ 9594.65ms │ 9596.55ms │ no change │
│ QQuery 23 │ 23807.10ms │ 23718.25ms │ no change │
│ QQuery 24 │ 1163.30ms │ 1187.58ms │ no change │
│ QQuery 25 │ 1055.71ms │ 1061.81ms │ no change │
│ QQuery 26 │ 1366.61ms │ 1363.28ms │ no change │
│ QQuery 27 │ 4821.72ms │ 4843.56ms │ no change │
│ QQuery 28 │ 23710.55ms │ 22313.88ms │ +1.06x faster │
│ QQuery 29 │ 910.31ms │ 932.51ms │ no change │
│ QQuery 30 │ 2092.15ms │ 2028.27ms │ no change │
│ QQuery 31 │ 2328.77ms │ 2189.39ms │ +1.06x faster │
│ QQuery 32 │ 8687.27ms │ 7065.45ms │ +1.23x faster │
│ QQuery 33 │ 9701.59ms │ 9577.62ms │ no change │
│ QQuery 34 │ 9628.66ms │ 9658.56ms │ no change │
│ QQuery 35 │ 3116.66ms │ 2809.27ms │ +1.11x faster │
│ QQuery 36 │ 261.19ms │ 266.92ms │ no change │
│ QQuery 37 │ 173.76ms │ 171.29ms │ no change │
│ QQuery 38 │ 162.87ms │ 161.34ms │ no change │
│ QQuery 39 │ 855.42ms │ 690.67ms │ +1.24x faster │
│ QQuery 40 │ 62.04ms │ 62.04ms │ no change │
│ QQuery 41 │ 57.62ms │ 54.97ms │ no change │
│ QQuery 42 │ 72.14ms │ 70.00ms │ no change │
└──────────────┴────────────┴──────────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━┓
┃ Benchmark Summary ┃ ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━┩
│ Total Time (main) │ 150354.87ms │
│ Total Time (sketch-blocked-aggr-state-management) │ 143278.44ms │
│ Average Time (main) │ 3496.62ms │
│ Average Time (sketch-blocked-aggr-state-management) │ 3332.06ms │
│ Queries Faster │ 9 │
│ Queries Slower │ 0 │
│ Queries with No Change │ 34 │
└─────────────────────────────────────────────────────┴─────────────┘
--------------------
Benchmark clickbench_partitioned.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query ┃ main ┃ sketch-blocked-aggr-state-management ┃ Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 0 │ 2.03ms │ 2.10ms │ no change │
│ QQuery 1 │ 57.64ms │ 57.78ms │ no change │
│ QQuery 2 │ 154.25ms │ 163.80ms │ 1.06x slower │
│ QQuery 3 │ 167.63ms │ 173.56ms │ no change │
│ QQuery 4 │ 1672.39ms │ 1649.33ms │ no change │
│ QQuery 5 │ 1510.56ms │ 1527.82ms │ no change │
│ QQuery 6 │ 50.93ms │ 48.64ms │ no change │
│ QQuery 7 │ 58.24ms │ 59.35ms │ no change │
│ QQuery 8 │ 2390.67ms │ 2432.79ms │ no change │
│ QQuery 9 │ 1931.85ms │ 1994.58ms │ no change │
│ QQuery 10 │ 533.01ms │ 532.21ms │ no change │
│ QQuery 11 │ 586.52ms │ 589.53ms │ no change │
│ QQuery 12 │ 1702.88ms │ 1716.73ms │ no change │
│ QQuery 13 │ 3282.15ms │ 3287.57ms │ no change │
│ QQuery 14 │ 2462.18ms │ 2199.73ms │ +1.12x faster │
│ QQuery 15 │ 1866.22ms │ 1876.52ms │ no change │
│ QQuery 16 │ 4925.49ms │ 4330.47ms │ +1.14x faster │
│ QQuery 17 │ 4805.47ms │ 4062.59ms │ +1.18x faster │
│ QQuery 18 │ 9854.37ms │ 8161.64ms │ +1.21x faster │
│ QQuery 19 │ 141.73ms │ 139.98ms │ no change │
│ QQuery 20 │ 3663.83ms │ 3654.61ms │ no change │
│ QQuery 21 │ 4188.63ms │ 4192.88ms │ no change │
│ QQuery 22 │ 9622.93ms │ 9588.40ms │ no change │
│ QQuery 23 │ 21982.07ms │ 22106.21ms │ no change │
│ QQuery 24 │ 1063.25ms │ 1063.79ms │ no change │
│ QQuery 25 │ 853.54ms │ 858.32ms │ no change │
│ QQuery 26 │ 1238.92ms │ 1249.95ms │ no change │
│ QQuery 27 │ 5148.86ms │ 5144.44ms │ no change │
│ QQuery 28 │ 22077.36ms │ 21721.22ms │ no change │
│ QQuery 29 │ 835.61ms │ 844.81ms │ no change │
│ QQuery 30 │ 2009.91ms │ 1917.08ms │ no change │
│ QQuery 31 │ 2223.99ms │ 2074.39ms │ +1.07x faster │
│ QQuery 32 │ 8526.46ms │ 7018.38ms │ +1.21x faster │
│ QQuery 33 │ 9774.11ms │ 9725.42ms │ no change │
│ QQuery 34 │ 9496.05ms │ 9650.09ms │ no change │
│ QQuery 35 │ 3134.31ms │ 2859.13ms │ +1.10x faster │
│ QQuery 36 │ 240.50ms │ 251.96ms │ no change │
│ QQuery 37 │ 111.59ms │ 111.42ms │ no change │
│ QQuery 38 │ 137.54ms │ 140.59ms │ no change │
│ QQuery 39 │ 807.76ms │ 664.45ms │ +1.22x faster │
│ QQuery 40 │ 52.18ms │ 52.43ms │ no change │
│ QQuery 41 │ 46.33ms │ 47.65ms │ no change │
│ QQuery 42 │ 61.40ms │ 62.31ms │ no change │
└──────────────┴────────────┴──────────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━┓
┃ Benchmark Summary ┃ ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━┩
│ Total Time (main) │ 145453.36ms │
│ Total Time (sketch-blocked-aggr-state-management) │ 140006.65ms │
│ Average Time (main) │ 3382.64ms │
│ Average Time (sketch-blocked-aggr-state-management) │ 3255.97ms │
│ Queries Faster │ 8 │
│ Queries Slower │ 1 │
│ Queries with No Change │ 34 │
└─────────────────────────────────────────────────────┴─────────────┘
Note: Skipping /home/db/datafusion/benchmarks/results/main/clickbench_partitioned.json.bak as /home/db/datafusion/benchmarks/results/sketch-blocked-aggr-state-management/clickbench_partitioned.json.bak does not exist
--------------------
Benchmark tpch_mem_sf1.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query ┃ main ┃ sketch-blocked-aggr-state-management ┃ Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1 │ 208.92ms │ 214.15ms │ no change │
│ QQuery 2 │ 31.25ms │ 31.41ms │ no change │
│ QQuery 3 │ 83.52ms │ 83.58ms │ no change │
│ QQuery 4 │ 57.93ms │ 60.68ms │ no change │
│ QQuery 5 │ 122.64ms │ 121.96ms │ no change │
│ QQuery 6 │ 12.89ms │ 12.74ms │ no change │
│ QQuery 7 │ 250.84ms │ 250.33ms │ no change │
│ QQuery 8 │ 26.35ms │ 25.97ms │ no change │
│ QQuery 9 │ 118.93ms │ 119.39ms │ no change │
│ QQuery 10 │ 117.13ms │ 116.33ms │ no change │
│ QQuery 11 │ 57.04ms │ 56.61ms │ no change │
│ QQuery 12 │ 34.96ms │ 35.79ms │ no change │
│ QQuery 13 │ 77.05ms │ 77.11ms │ no change │
│ QQuery 14 │ 15.60ms │ 14.86ms │ no change │
│ QQuery 15 │ 23.89ms │ 24.24ms │ no change │
│ QQuery 16 │ 36.37ms │ 37.05ms │ no change │
│ QQuery 17 │ 173.75ms │ 179.35ms │ no change │
│ QQuery 18 │ 491.96ms │ 490.21ms │ no change │
│ QQuery 19 │ 36.08ms │ 36.41ms │ no change │
│ QQuery 20 │ 79.92ms │ 69.56ms │ +1.15x faster │
│ QQuery 21 │ 284.69ms │ 285.27ms │ no change │
│ QQuery 22 │ 19.66ms │ 19.05ms │ no change │
└──────────────┴──────────┴──────────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary ┃ ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (main) │ 2361.37ms │
│ Total Time (sketch-blocked-aggr-state-management) │ 2362.03ms │
│ Average Time (main) │ 107.34ms │
│ Average Time (sketch-blocked-aggr-state-management) │ 107.37ms │
│ Queries Faster │ 1 │
│ Queries Slower │ 0 │
│ Queries with No Change │ 21 │
└─────────────────────────────────────────────────────┴───────────┘
Note: Skipping /home/db/datafusion/benchmarks/results/main/tpch_sf10.json as /home/db/datafusion/benchmarks/results/sketch-blocked-aggr-state-management/tpch_sf10.json does not exist
--------------------
Benchmark tpch_sf1.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query ┃ main ┃ sketch-blocked-aggr-state-management ┃ Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1 │ 291.46ms │ 292.07ms │ no change │
│ QQuery 2 │ 44.78ms │ 46.78ms │ no change │
│ QQuery 3 │ 109.87ms │ 113.97ms │ no change │
│ QQuery 4 │ 60.18ms │ 60.21ms │ no change │
│ QQuery 5 │ 196.33ms │ 192.04ms │ no change │
│ QQuery 6 │ 57.52ms │ 55.79ms │ no change │
│ QQuery 7 │ 305.37ms │ 301.56ms │ no change │
│ QQuery 8 │ 124.96ms │ 125.75ms │ no change │
│ QQuery 9 │ 226.72ms │ 228.80ms │ no change │
│ QQuery 10 │ 196.97ms │ 193.90ms │ no change │
│ QQuery 11 │ 32.67ms │ 31.29ms │ no change │
│ QQuery 12 │ 85.23ms │ 83.33ms │ no change │
│ QQuery 13 │ 128.19ms │ 125.37ms │ no change │
│ QQuery 14 │ 78.30ms │ 79.75ms │ no change │
│ QQuery 15 │ 107.56ms │ 108.60ms │ no change │
│ QQuery 16 │ 42.01ms │ 43.48ms │ no change │
│ QQuery 17 │ 274.14ms │ 282.84ms │ no change │
│ QQuery 18 │ 453.89ms │ 451.47ms │ no change │
│ QQuery 19 │ 141.27ms │ 140.59ms │ no change │
│ QQuery 20 │ 134.95ms │ 123.06ms │ +1.10x faster │
│ QQuery 21 │ 295.07ms │ 297.03ms │ no change │
│ QQuery 22 │ 25.33ms │ 25.97ms │ no change │
└──────────────┴──────────┴──────────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary ┃ ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (main) │ 3412.78ms │
│ Total Time (sketch-blocked-aggr-state-management) │ 3403.63ms │
│ Average Time (main) │ 155.13ms │
│ Average Time (sketch-blocked-aggr-state-management) │ 154.71ms │
│ Queries Faster │ 1 │
│ Queries Slower │ 0 │
│ Queries with No Change │ 21 │
└─────────────────────────────────────────────────────┴───────────┘
~Does this improve memory usage~ I forgot it is sketch
What is the difference between blocked approach and Emit::First with block size? At the end, there are only AllBlocks and FirstBlocks?
@jayzhan211 Yes, still not any detailed blocked impls now, and just make benchmark for ensuring the sketch will not decrease the performance.
I guess the blocked apporach may be not related much to Emit::First. And their relationship may be that the blocked emission is just something can be used to impl the optimization mentioned in #9562 .
Actually the blocked approach's main traget is that it manages the data block by block in the GroupValues and GroupAccumulator(need the new blocked impl for reaching this), which can avoid the copying cost compared to the single vector approach similar as #7065 .
I think I'm not so familiar with the Emit::First and there is no block implementation done yet. Could we emit every block size of values we have? Something like Emit::First(block size).
We have emit_early_if_necessary that do First(n) emission when condition met.
fn emit_early_if_necessary(&mut self) -> Result<()> {
if self.group_values.len() >= self.batch_size
&& matches!(self.group_ordering, GroupOrdering::None)
&& matches!(self.mode, AggregateMode::Partial)
&& self.update_memory_reservation().is_err()
{
let n = self.group_values.len() / self.batch_size * self.batch_size;
let batch = self.emit(EmitTo::First(n), false)?;
self.exec_state = ExecutionState::ProducingOutput(batch);
}
Ok(())
}
If we emit every block size we accumulated, is it something similar to the block approach? If not, what is the difference?
Upd: One difference I can think of is that in block approach, we have all the accumulated values, and we can optimize it based on all the values we have, while in Emit::First mode, we early emit partial values, therefore, we loss the change if we want to do optimization based on all the values 🤔 ?
I think I'm not so familiar with the Emit::First and there is no block implementation done yet. Could we emit every block size of values we have? Something like Emit::First(block size).
We have
emit_early_if_necessarythat do First(n) emission when condition met.fn emit_early_if_necessary(&mut self) -> Result<()> { if self.group_values.len() >= self.batch_size && matches!(self.group_ordering, GroupOrdering::None) && matches!(self.mode, AggregateMode::Partial) && self.update_memory_reservation().is_err() { let n = self.group_values.len() / self.batch_size * self.batch_size; let batch = self.emit(EmitTo::First(n), false)?; self.exec_state = ExecutionState::ProducingOutput(batch); } Ok(()) }If we emit every block size we accumulated, is it something similar to the block approach? If not, what is the difference?
Upd: One difference I can think of is that in block approach, we have all the accumulated values, and we can optimize it based on all the values we have, while in Emit::First mode, we early emit partial values, therefore, we loss the change if we want to do optimization based on all the values 🤔 ?
Ok, I think I got it now, if we constantly emit first n when the group len just equal to batch size, it is actually equal to blocked approach.
But emit first n will be just triggered in some special cases in my knowledge:
- Streaming aggr (the really constantly
emit first ncase, and I forced to disable blocked mode in this case) - In
Partialoperator if found the memory exceeded limit (emit_early_if_necessary)
And in others, we need to poll to end, then emit all and use slice method to split it and return now:
- For example, in the
FinalPartitionedoperator - In the
Partialoperator when memory under the memory limit - other cases...
And in such cases, blocked approach may be effecient for both memory and cpu as stated above?
~Does this improve memory usage~ I forgot it is sketch
What is the difference between blocked approach and Emit::First with block size? At the end, there are only AllBlocks and FirstBlocks?
I think maybe we should keep them both, the blocked emission is just optimized version of Emit::All and Emit::First to call, when we found the blocked mode is enabled.
The flat mode impls should be kept, because some abilities are expensive to support in blocked mode (such as emit exact first n needed in streaming). But it may be easy to keep them both, because flat impl is just the special case of blocked impl which just always holds a single big block.
When will the blocked mode be enabled maybe can see in: https://github.com/apache/datafusion/issues/11931#issuecomment-2283176521
I think I'm not so familiar with the Emit::First and there is no block implementation done yet. Could we emit every block size of values we have? Something like Emit::First(block size). We have
emit_early_if_necessarythat do First(n) emission when condition met.fn emit_early_if_necessary(&mut self) -> Result<()> { if self.group_values.len() >= self.batch_size && matches!(self.group_ordering, GroupOrdering::None) && matches!(self.mode, AggregateMode::Partial) && self.update_memory_reservation().is_err() { let n = self.group_values.len() / self.batch_size * self.batch_size; let batch = self.emit(EmitTo::First(n), false)?; self.exec_state = ExecutionState::ProducingOutput(batch); } Ok(()) }If we emit every block size we accumulated, is it something similar to the block approach? If not, what is the difference? Upd: One difference I can think of is that in block approach, we have all the accumulated values, and we can optimize it based on all the values we have, while in Emit::First mode, we early emit partial values, therefore, we loss the change if we want to do optimization based on all the values 🤔 ?
Ok, I think I got it now, if we constantly
emit first nwhen the group len just equal tobatch size, it is actually equal to blocked approach.But
emit first nwill be just triggered in some special cases in my knowledge:
- Streaming aggr (the really constantly
emit first ncase, and I forced to disable blocked mode in this case)- In
Partialoperator if found the memory exceeded limit (emit_early_if_necessary)And in others, we need to poll to end, then
emit alland useslicemethod to split it and return now:
- For example, in the
FinalPartitionedoperator- In the
Partialoperator when memory under the memory limit- other cases...
And in such cases, blocked approach may be effecient for both memory and cpu as stated above?
I still don't understand this FirstBlocks variant, if partial aggregate runs out of memory, output first N blocks and let final aggregation process them first looks like won't help much regarding total memory usage (since it's high cardinality aggregation)
Is it related to the spilling implementation? I will check it later.
Also thanks for pushing this forward, I think this approach is promising for performance
@2010YOUY01 make sense, it seems emit_early_if_necessary function is actually introduced in the spilling pr #7400.
I am checking the related codes about memory control, too.
But the FirstBlocks is just the blocked version First, because it is too expansive to emit exact first n groups in blocked impls.
For example:
block size = 4,blocks=2, and emitfirst 3 groups,- After emitting,
groups in first block = 1,groups in second block = 4 - And for keep the logic correct, we need to move groups to fill the first blocks to make:
groups in first block = 4,groups in second block = 1
Such a groups movement will obviously lead to much cpu cost...
Actually we can remove the FirstBlocks and AllBlocks mode, and we impl the related logics just out of GroupValues and GroupAccumultor.
For example, in emit_early_if_necessary, we check if the GroupValues and GroupAccumultor are in blocked modes first. If so, we get the block_size, do the aligned work, and finally pass the aligned results in First(aligned) to them
But I think if we impl like this, it will be so confused, and for making it clear, I introduce the two new blocked emission mode FirstBlocks and AllBlocks.
THank you @Rachelint -- I took a look at this PR and here is some feedback:
1. I think it is important to spend time actually showing this approach makes some queries faster (e.g. we should try and update one accumulator and one implementation of groups to show it makes a difference) 2. I think it is important to actually chunk saving the intermediate state (e.g. in a `Vec<...>` rather than `...` to realize the benefit of this chunked approach 3. Thank you for working on this. Very cool
Thanks, I have finished a blocked style GroupValuesRows impl today, working on impl blocked style GroupAccumulator.
@2010YOUY01 After checking the codes about memory contorl, I think I got it.
emit_early_if_necessaryis used inPartial- and
spill_previous_if_necessaryis used in the final phases
They all serve for the spilling. And the logic may be like this:
- After reaching the memory limit, force the
Partialto submit batches toFinalas soon as possible - And the
Finalwill spill them to disk for avoid oom - After all batches are submitted to
Final, theFinalmerged the spilled batches and in-memory batches to get the final results (in streaming agg way, batches will be sorted before spilling).
@2010YOUY01 After checking the codes about memory contorl, I think I got it.
emit_early_if_necessaryis used inPartial- and
spill_previous_if_necessaryis used in the final phasesThey all serve for the spilling. And the logic may be like this:
- After reaching the memory limit, force the
Partialto submit batches toFinalas soon as possible- And the
Finalwill spill them to disk for avoid oom- After all batches are submitted to
Final, theFinalmerged the spilled batches and in-memory batches to get the final results (in streaming agg way, batches will be sorted before spilling).
Thanks, now I figured out the high-level idea of spilling in aggregation and how emit works in its implementation.
However there exists other code that does early emit in aggregation, and I'm still trying to figure out how they work, do you have any pointer for that? I'm guessing it's used in streaming aggregation or some pushed-down limits https://github.com/apache/datafusion/blob/482ef4551a4828825da8deb29d222fa82e1cfaa9/datafusion/physical-plan/src/aggregates/row_hash.rs#L605-L611
@2010YOUY01 After checking the codes about memory contorl, I think I got it.
emit_early_if_necessaryis used inPartial- and
spill_previous_if_necessaryis used in the final phasesThey all serve for the spilling. And the logic may be like this:
- After reaching the memory limit, force the
Partialto submit batches toFinalas soon as possible- And the
Finalwill spill them to disk for avoid oom- After all batches are submitted to
Final, theFinalmerged the spilled batches and in-memory batches to get the final results (in streaming agg way, batches will be sorted before spilling).Thanks, now I figured out the high-level idea of spilling in aggregation and how
emitworks in its implementation.However there exists other code that does early emit in aggregation, and I'm still trying to figure out how they work, do you have any pointer for that? I'm guessing it's used in streaming aggregation or some pushed-down limits
https://github.com/apache/datafusion/blob/482ef4551a4828825da8deb29d222fa82e1cfaa9/datafusion/physical-plan/src/aggregates/row_hash.rs#L605-L611
Yes, you are right, there are two early emission cases, one is for spilling mentioned above, and another here is about streaming.
Hi @Rachelint -- please let me know if/when this PR is ready for another look. I think your plan as I understand it is to get this idea working enough to show performance improvements -- I was planning to take another look at that point
Hi @Rachelint -- please let me know if/when this PR is ready for another look. I think your plan as I understand it is to get this idea working enough to show performance improvements -- I was planning to take another look at that point
Thanks, I have finished the blocked version GroupValuesRows, and pass most of the tests(the spill test failed, but I check it and found it is not about logic correctness).
I am still working on impl the blocked GroupAccumulators for showing the performance improvement, maybe need another 1~2 days.
@alamb @jayzhan211 @2010YOUY01 I have finish the blocked version common accumulators(avg, count, prim_op) yesterday. Following is the benchmark result.
It can make some queries(especially in clickbench) faster 1.1x~1.2x.
However, some queries are slightly slower (like q1 in tpch_mem, q9 in clickbench), some reasons I can think are:
- in
tpch q1, the dataset is too small with only one block, and the indexed op inVecDequeis a bit more expansive than the originalVec. - in
clickbench q9, because the blocked version for relatedGroupValuesis not impl now, and lead to the similar problem liketpch q1. - the constantly computation about
block_idandblock offset(we can only overcome it through modify the group_index from &[usize] to &[GroupIndex]).
I am working on profile to make the problem more clear.
Upd
According to the flamegraph, the index op of VecDeque seems the bottleneck, and in the single block case (tpch q1 is due to the small input data), it is not as effecient as the Vec.
I am doing some optimzation works for the single block case, and I will check it again soon.
latest benchmark resutls:
Comparing main and sketch-blocked-aggr-state-management
--------------------
Benchmark clickbench_1.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query ┃ main ┃ sketch-blocked-aggr-state-management ┃ Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 0 │ 0.72ms │ 0.69ms │ no change │
│ QQuery 1 │ 67.71ms │ 69.75ms │ no change │
│ QQuery 2 │ 169.92ms │ 170.43ms │ no change │
│ QQuery 3 │ 185.76ms │ 186.77ms │ no change │
│ QQuery 4 │ 1623.50ms │ 1649.09ms │ no change │
│ QQuery 5 │ 1603.76ms │ 1622.42ms │ no change │
│ QQuery 6 │ 61.56ms │ 62.45ms │ no change │
│ QQuery 7 │ 67.97ms │ 70.10ms │ no change │
│ QQuery 8 │ 2332.37ms │ 2407.51ms │ no change │
│ QQuery 9 │ 1952.18ms │ 2010.02ms │ no change │
│ QQuery 10 │ 551.12ms │ 549.55ms │ no change │
│ QQuery 11 │ 606.73ms │ 634.02ms │ no change │
│ QQuery 12 │ 1822.70ms │ 1779.43ms │ no change │
│ QQuery 13 │ 3414.56ms │ 3445.55ms │ no change │
│ QQuery 14 │ 2595.05ms │ 2309.24ms │ +1.12x faster │
│ QQuery 15 │ 1836.69ms │ 1845.98ms │ no change │
│ QQuery 16 │ 5056.23ms │ 4432.96ms │ +1.14x faster │
│ QQuery 17 │ 4903.18ms │ 4182.80ms │ +1.17x faster │
│ QQuery 18 │ 10383.16ms │ 8529.29ms │ +1.22x faster │
│ QQuery 19 │ 152.06ms │ 152.31ms │ no change │
│ QQuery 20 │ 3364.93ms │ 3370.44ms │ no change │
│ QQuery 21 │ 3972.92ms │ 3974.43ms │ no change │
│ QQuery 22 │ 9594.65ms │ 9596.55ms │ no change │
│ QQuery 23 │ 23807.10ms │ 23718.25ms │ no change │
│ QQuery 24 │ 1163.30ms │ 1187.58ms │ no change │
│ QQuery 25 │ 1055.71ms │ 1061.81ms │ no change │
│ QQuery 26 │ 1366.61ms │ 1363.28ms │ no change │
│ QQuery 27 │ 4821.72ms │ 4843.56ms │ no change │
│ QQuery 28 │ 23710.55ms │ 22313.88ms │ +1.06x faster │
│ QQuery 29 │ 910.31ms │ 932.51ms │ no change │
│ QQuery 30 │ 2092.15ms │ 2028.27ms │ no change │
│ QQuery 31 │ 2328.77ms │ 2189.39ms │ +1.06x faster │
│ QQuery 32 │ 8687.27ms │ 7065.45ms │ +1.23x faster │
│ QQuery 33 │ 9701.59ms │ 9577.62ms │ no change │
│ QQuery 34 │ 9628.66ms │ 9658.56ms │ no change │
│ QQuery 35 │ 3116.66ms │ 2809.27ms │ +1.11x faster │
│ QQuery 36 │ 261.19ms │ 266.92ms │ no change │
│ QQuery 37 │ 173.76ms │ 171.29ms │ no change │
│ QQuery 38 │ 162.87ms │ 161.34ms │ no change │
│ QQuery 39 │ 855.42ms │ 690.67ms │ +1.24x faster │
│ QQuery 40 │ 62.04ms │ 62.04ms │ no change │
│ QQuery 41 │ 57.62ms │ 54.97ms │ no change │
│ QQuery 42 │ 72.14ms │ 70.00ms │ no change │
└──────────────┴────────────┴──────────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━┓
┃ Benchmark Summary ┃ ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━┩
│ Total Time (main) │ 150354.87ms │
│ Total Time (sketch-blocked-aggr-state-management) │ 143278.44ms │
│ Average Time (main) │ 3496.62ms │
│ Average Time (sketch-blocked-aggr-state-management) │ 3332.06ms │
│ Queries Faster │ 9 │
│ Queries Slower │ 0 │
│ Queries with No Change │ 34 │
└─────────────────────────────────────────────────────┴─────────────┘
--------------------
Benchmark clickbench_partitioned.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query ┃ main ┃ sketch-blocked-aggr-state-management ┃ Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 0 │ 2.03ms │ 2.10ms │ no change │
│ QQuery 1 │ 57.64ms │ 57.78ms │ no change │
│ QQuery 2 │ 154.25ms │ 163.80ms │ 1.06x slower │
│ QQuery 3 │ 167.63ms │ 173.56ms │ no change │
│ QQuery 4 │ 1672.39ms │ 1649.33ms │ no change │
│ QQuery 5 │ 1510.56ms │ 1527.82ms │ no change │
│ QQuery 6 │ 50.93ms │ 48.64ms │ no change │
│ QQuery 7 │ 58.24ms │ 59.35ms │ no change │
│ QQuery 8 │ 2390.67ms │ 2432.79ms │ no change │
│ QQuery 9 │ 1931.85ms │ 1994.58ms │ no change │
│ QQuery 10 │ 533.01ms │ 532.21ms │ no change │
│ QQuery 11 │ 586.52ms │ 589.53ms │ no change │
│ QQuery 12 │ 1702.88ms │ 1716.73ms │ no change │
│ QQuery 13 │ 3282.15ms │ 3287.57ms │ no change │
│ QQuery 14 │ 2462.18ms │ 2199.73ms │ +1.12x faster │
│ QQuery 15 │ 1866.22ms │ 1876.52ms │ no change │
│ QQuery 16 │ 4925.49ms │ 4330.47ms │ +1.14x faster │
│ QQuery 17 │ 4805.47ms │ 4062.59ms │ +1.18x faster │
│ QQuery 18 │ 9854.37ms │ 8161.64ms │ +1.21x faster │
│ QQuery 19 │ 141.73ms │ 139.98ms │ no change │
│ QQuery 20 │ 3663.83ms │ 3654.61ms │ no change │
│ QQuery 21 │ 4188.63ms │ 4192.88ms │ no change │
│ QQuery 22 │ 9622.93ms │ 9588.40ms │ no change │
│ QQuery 23 │ 21982.07ms │ 22106.21ms │ no change │
│ QQuery 24 │ 1063.25ms │ 1063.79ms │ no change │
│ QQuery 25 │ 853.54ms │ 858.32ms │ no change │
│ QQuery 26 │ 1238.92ms │ 1249.95ms │ no change │
│ QQuery 27 │ 5148.86ms │ 5144.44ms │ no change │
│ QQuery 28 │ 22077.36ms │ 21721.22ms │ no change │
│ QQuery 29 │ 835.61ms │ 844.81ms │ no change │
│ QQuery 30 │ 2009.91ms │ 1917.08ms │ no change │
│ QQuery 31 │ 2223.99ms │ 2074.39ms │ +1.07x faster │
│ QQuery 32 │ 8526.46ms │ 7018.38ms │ +1.21x faster │
│ QQuery 33 │ 9774.11ms │ 9725.42ms │ no change │
│ QQuery 34 │ 9496.05ms │ 9650.09ms │ no change │
│ QQuery 35 │ 3134.31ms │ 2859.13ms │ +1.10x faster │
│ QQuery 36 │ 240.50ms │ 251.96ms │ no change │
│ QQuery 37 │ 111.59ms │ 111.42ms │ no change │
│ QQuery 38 │ 137.54ms │ 140.59ms │ no change │
│ QQuery 39 │ 807.76ms │ 664.45ms │ +1.22x faster │
│ QQuery 40 │ 52.18ms │ 52.43ms │ no change │
│ QQuery 41 │ 46.33ms │ 47.65ms │ no change │
│ QQuery 42 │ 61.40ms │ 62.31ms │ no change │
└──────────────┴────────────┴──────────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━┓
┃ Benchmark Summary ┃ ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━┩
│ Total Time (main) │ 145453.36ms │
│ Total Time (sketch-blocked-aggr-state-management) │ 140006.65ms │
│ Average Time (main) │ 3382.64ms │
│ Average Time (sketch-blocked-aggr-state-management) │ 3255.97ms │
│ Queries Faster │ 8 │
│ Queries Slower │ 1 │
│ Queries with No Change │ 34 │
└─────────────────────────────────────────────────────┴─────────────┘
Note: Skipping /home/db/datafusion/benchmarks/results/main/clickbench_partitioned.json.bak as /home/db/datafusion/benchmarks/results/sketch-blocked-aggr-state-management/clickbench_partitioned.json.bak does not exist
--------------------
Benchmark tpch_mem_sf1.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query ┃ main ┃ sketch-blocked-aggr-state-management ┃ Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1 │ 208.92ms │ 214.15ms │ no change │
│ QQuery 2 │ 31.25ms │ 31.41ms │ no change │
│ QQuery 3 │ 83.52ms │ 83.58ms │ no change │
│ QQuery 4 │ 57.93ms │ 60.68ms │ no change │
│ QQuery 5 │ 122.64ms │ 121.96ms │ no change │
│ QQuery 6 │ 12.89ms │ 12.74ms │ no change │
│ QQuery 7 │ 250.84ms │ 250.33ms │ no change │
│ QQuery 8 │ 26.35ms │ 25.97ms │ no change │
│ QQuery 9 │ 118.93ms │ 119.39ms │ no change │
│ QQuery 10 │ 117.13ms │ 116.33ms │ no change │
│ QQuery 11 │ 57.04ms │ 56.61ms │ no change │
│ QQuery 12 │ 34.96ms │ 35.79ms │ no change │
│ QQuery 13 │ 77.05ms │ 77.11ms │ no change │
│ QQuery 14 │ 15.60ms │ 14.86ms │ no change │
│ QQuery 15 │ 23.89ms │ 24.24ms │ no change │
│ QQuery 16 │ 36.37ms │ 37.05ms │ no change │
│ QQuery 17 │ 173.75ms │ 179.35ms │ no change │
│ QQuery 18 │ 491.96ms │ 490.21ms │ no change │
│ QQuery 19 │ 36.08ms │ 36.41ms │ no change │
│ QQuery 20 │ 79.92ms │ 69.56ms │ +1.15x faster │
│ QQuery 21 │ 284.69ms │ 285.27ms │ no change │
│ QQuery 22 │ 19.66ms │ 19.05ms │ no change │
└──────────────┴──────────┴──────────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary ┃ ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (main) │ 2361.37ms │
│ Total Time (sketch-blocked-aggr-state-management) │ 2362.03ms │
│ Average Time (main) │ 107.34ms │
│ Average Time (sketch-blocked-aggr-state-management) │ 107.37ms │
│ Queries Faster │ 1 │
│ Queries Slower │ 0 │
│ Queries with No Change │ 21 │
└─────────────────────────────────────────────────────┴───────────┘
Note: Skipping /home/db/datafusion/benchmarks/results/main/tpch_sf10.json as /home/db/datafusion/benchmarks/results/sketch-blocked-aggr-state-management/tpch_sf10.json does not exist
--------------------
Benchmark tpch_sf1.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query ┃ main ┃ sketch-blocked-aggr-state-management ┃ Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1 │ 291.46ms │ 292.07ms │ no change │
│ QQuery 2 │ 44.78ms │ 46.78ms │ no change │
│ QQuery 3 │ 109.87ms │ 113.97ms │ no change │
│ QQuery 4 │ 60.18ms │ 60.21ms │ no change │
│ QQuery 5 │ 196.33ms │ 192.04ms │ no change │
│ QQuery 6 │ 57.52ms │ 55.79ms │ no change │
│ QQuery 7 │ 305.37ms │ 301.56ms │ no change │
│ QQuery 8 │ 124.96ms │ 125.75ms │ no change │
│ QQuery 9 │ 226.72ms │ 228.80ms │ no change │
│ QQuery 10 │ 196.97ms │ 193.90ms │ no change │
│ QQuery 11 │ 32.67ms │ 31.29ms │ no change │
│ QQuery 12 │ 85.23ms │ 83.33ms │ no change │
│ QQuery 13 │ 128.19ms │ 125.37ms │ no change │
│ QQuery 14 │ 78.30ms │ 79.75ms │ no change │
│ QQuery 15 │ 107.56ms │ 108.60ms │ no change │
│ QQuery 16 │ 42.01ms │ 43.48ms │ no change │
│ QQuery 17 │ 274.14ms │ 282.84ms │ no change │
│ QQuery 18 │ 453.89ms │ 451.47ms │ no change │
│ QQuery 19 │ 141.27ms │ 140.59ms │ no change │
│ QQuery 20 │ 134.95ms │ 123.06ms │ +1.10x faster │
│ QQuery 21 │ 295.07ms │ 297.03ms │ no change │
│ QQuery 22 │ 25.33ms │ 25.97ms │ no change │
└──────────────┴──────────┴──────────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary ┃ ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (main) │ 3412.78ms │
│ Total Time (sketch-blocked-aggr-state-management) │ 3403.63ms │
│ Average Time (main) │ 155.13ms │
│ Average Time (sketch-blocked-aggr-state-management) │ 154.71ms │
│ Queries Faster │ 1 │
│ Queries Slower │ 0 │
│ Queries with No Change │ 21 │
└─────────────────────────────────────────────────────┴───────────┘
Actually, I found aggregate_grouping_sets_with_yielding_with_spill and aggregate_grouping_sets_source_not_yielding_with_spill don't conver the spilling to disk logic in main...
And this pr make the memory usage a bit bigger in the test, so that trigger the spilling to disk logic and lead to fails...
Benchmark looks amazing 🚀
I have an idea to minimize this PR (and make progress easier):
Problem with existing code
Now aggregation execution code (the entry point in
row_hash.rs/poll_next()has many specialized execution paths, I think the basic one isGroupedHash, other execution paths include spilling/streaming aggregate/skip partial aggregation/with soft limit/...It's hard to understand each of them, if multiple specializations can be triggered in the same execution, it's even trickier (e.g. can spilling and skipping partial aggregation happen within a single execution?) I believe these details are currently under-documented.
This PR
I skimmed through this PR, it appears not to implement compatibility with streaming group-by, but tries to be compatible with spilling/skipping partial aggregation (at least reused some execution path)
Thoughts
I think making these specializations compatible requires significant design/doc/testing effort, which could be leave to another PR.
Given the major improvement of blocked aggregation is, avoding extra copy when groups/accumulators get resized, it seems possible to demonstrate the performance numbers without supporting other advanced execution logic.
I'm wondering is it possible not to support spilling (also skipping partial aggregation and maybe other optimizations) at first? So that this PR's execution path is not mixing with other specialized paths
Thanks! It is clever to just disable this blocked optimization when spilling is enabled.
Actually I found some other problems not introduced by this pr exists in spilling path have blocked this pr... And the current compatible impl for spilling may have the bad performance (we need to copy all blocks into single to do it...).
I plan to just leave them to another prs... I am cleaning up the unnecessary codes, and will switch it to ready soon.
@alamb @jayzhan211 @2010YOUY01 I think this pr is ready now, I made benchmarks in my local, and seems 20% queries in clickbench obviously faster (especially in high cardinality cases).
The latest benchmark results are placed above.
I plan to review this first thing tomorrow (when I am fresh and can focus)
@alamb 😄 Very glad that it can help, and thanks for help! I am fixing comments and adding the unit tests, and will help more things about merging soon.
I ran the benchmarks on my test machine and I see similar results
Overall things get significantly better but a few get slower (e.g. Q37) but I suspect we can figure out how to resolve those regressions
Nice work @Rachelint
Details
--------------------
Benchmark clickbench_1.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query ┃ main_base ┃ sketch-blocked-aggr-state-manag… ┃ Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 0 │ 0.67ms │ 0.65ms │ no change │
│ QQuery 1 │ 68.94ms │ 69.74ms │ no change │
│ QQuery 2 │ 121.34ms │ 124.61ms │ no change │
│ QQuery 3 │ 129.89ms │ 129.66ms │ no change │
│ QQuery 4 │ 936.36ms │ 942.38ms │ no change │
│ QQuery 5 │ 1026.98ms │ 1043.44ms │ no change │
│ QQuery 6 │ 64.75ms │ 64.06ms │ no change │
│ QQuery 7 │ 70.95ms │ 71.93ms │ no change │
│ QQuery 8 │ 1420.64ms │ 1449.10ms │ no change │
│ QQuery 9 │ 1326.29ms │ 1312.99ms │ no change │
│ QQuery 10 │ 450.67ms │ 452.53ms │ no change │
│ QQuery 11 │ 489.22ms │ 490.92ms │ no change │
│ QQuery 12 │ 1098.01ms │ 1150.82ms │ no change │
│ QQuery 13 │ 2093.09ms │ 2132.07ms │ no change │
│ QQuery 14 │ 1509.44ms │ 1407.02ms │ +1.07x faster │
│ QQuery 15 │ 1059.78ms │ 1083.76ms │ no change │
│ QQuery 16 │ 2791.68ms │ 2514.43ms │ +1.11x faster │
│ QQuery 17 │ 2723.41ms │ 2334.14ms │ +1.17x faster │
│ QQuery 18 │ 5524.82ms │ 4618.96ms │ +1.20x faster │
│ QQuery 19 │ 120.94ms │ 120.93ms │ no change │
│ QQuery 20 │ 1666.50ms │ 1646.72ms │ no change │
│ QQuery 21 │ 1980.75ms │ 2004.30ms │ no change │
│ QQuery 22 │ 4447.17ms │ 4814.76ms │ 1.08x slower │
│ QQuery 23 │ 10832.04ms │ 11183.51ms │ no change │
│ QQuery 24 │ 724.92ms │ 770.28ms │ 1.06x slower │
│ QQuery 25 │ 628.52ms │ 676.36ms │ 1.08x slower │
│ QQuery 26 │ 790.67ms │ 810.57ms │ no change │
│ QQuery 27 │ 2450.10ms │ 2456.62ms │ no change │
│ QQuery 28 │ 15010.69ms │ 15157.30ms │ no change │
│ QQuery 29 │ 555.38ms │ 567.09ms │ no change │
│ QQuery 30 │ 1235.77ms │ 1269.06ms │ no change │
│ QQuery 31 │ 1329.18ms │ 1259.29ms │ +1.06x faster │
│ QQuery 32 │ 4442.15ms │ 3562.04ms │ +1.25x faster │
│ QQuery 33 │ 4857.95ms │ 4926.93ms │ no change │
│ QQuery 34 │ 4855.86ms │ 4844.55ms │ no change │
│ QQuery 35 │ 1777.21ms │ 1676.27ms │ +1.06x faster │
│ QQuery 36 │ 329.33ms │ 325.24ms │ no change │
│ QQuery 37 │ 206.27ms │ 222.18ms │ 1.08x slower │
│ QQuery 38 │ 187.72ms │ 193.16ms │ no change │
│ QQuery 39 │ 953.90ms │ 805.04ms │ +1.18x faster │
│ QQuery 40 │ 85.84ms │ 85.80ms │ no change │
│ QQuery 41 │ 75.64ms │ 77.68ms │ no change │
│ QQuery 42 │ 93.59ms │ 94.78ms │ no change │
└──────────────┴────────────┴──────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary ┃ ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (main_base) │ 82545.02ms │
│ Total Time (sketch-blocked-aggr-state-management) │ 80943.69ms │
│ Average Time (main_base) │ 1919.65ms │
│ Average Time (sketch-blocked-aggr-state-management) │ 1882.41ms │
│ Queries Faster │ 8 │
│ Queries Slower │ 4 │
│ Queries with No Change │ 31 │
└─────────────────────────────────────────────────────┴────────────┘
--------------------
Benchmark clickbench_extended.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Query ┃ main_base ┃ sketch-blocked-aggr-state-management ┃ Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ QQuery 0 │ 2560.03ms │ 2572.44ms │ no change │
│ QQuery 1 │ 815.54ms │ 787.44ms │ no change │
│ QQuery 2 │ 1608.52ms │ 1564.49ms │ no change │
└──────────────┴───────────┴──────────────────────────────────────┴───────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary ┃ ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (main_base) │ 4984.08ms │
│ Total Time (sketch-blocked-aggr-state-management) │ 4924.38ms │
│ Average Time (main_base) │ 1661.36ms │
│ Average Time (sketch-blocked-aggr-state-management) │ 1641.46ms │
│ Queries Faster │ 0 │
│ Queries Slower │ 0 │
│ Queries with No Change │ 3 │
└─────────────────────────────────────────────────────┴───────────┘
--------------------
Benchmark clickbench_partitioned.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query ┃ main_base ┃ sketch-blocked-aggr-state-manag… ┃ Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 0 │ 2.24ms │ 2.20ms │ no change │
│ QQuery 1 │ 36.56ms │ 36.84ms │ no change │
│ QQuery 2 │ 94.43ms │ 92.39ms │ no change │
│ QQuery 3 │ 100.36ms │ 97.89ms │ no change │
│ QQuery 4 │ 900.11ms │ 888.40ms │ no change │
│ QQuery 5 │ 927.01ms │ 949.77ms │ no change │
│ QQuery 6 │ 32.76ms │ 33.28ms │ no change │
│ QQuery 7 │ 38.37ms │ 38.96ms │ no change │
│ QQuery 8 │ 1357.93ms │ 1383.54ms │ no change │
│ QQuery 9 │ 1275.61ms │ 1304.53ms │ no change │
│ QQuery 10 │ 355.18ms │ 344.31ms │ no change │
│ QQuery 11 │ 381.17ms │ 389.12ms │ no change │
│ QQuery 12 │ 1023.05ms │ 1023.38ms │ no change │
│ QQuery 13 │ 1777.28ms │ 1802.44ms │ no change │
│ QQuery 14 │ 1402.28ms │ 1300.19ms │ +1.08x faster │
│ QQuery 15 │ 1017.98ms │ 1013.41ms │ no change │
│ QQuery 16 │ 2720.14ms │ 2402.69ms │ +1.13x faster │
│ QQuery 17 │ 2666.07ms │ 2270.11ms │ +1.17x faster │
│ QQuery 18 │ 5556.13ms │ 4609.12ms │ +1.21x faster │
│ QQuery 19 │ 91.77ms │ 90.43ms │ no change │
│ QQuery 20 │ 1701.59ms │ 1733.05ms │ no change │
│ QQuery 21 │ 1923.91ms │ 1950.76ms │ no change │
│ QQuery 22 │ 4527.11ms │ 4901.07ms │ 1.08x slower │
│ QQuery 23 │ 9657.16ms │ 9727.87ms │ no change │
│ QQuery 24 │ 560.04ms │ 560.20ms │ no change │
│ QQuery 25 │ 480.05ms │ 476.33ms │ no change │
│ QQuery 26 │ 632.81ms │ 622.17ms │ no change │
│ QQuery 27 │ 2387.49ms │ 2439.88ms │ no change │
│ QQuery 28 │ 14453.61ms │ 14570.84ms │ no change │
│ QQuery 29 │ 518.62ms │ 518.83ms │ no change │
│ QQuery 30 │ 1102.69ms │ 1082.36ms │ no change │
│ QQuery 31 │ 1164.73ms │ 1044.84ms │ +1.11x faster │
│ QQuery 32 │ 4423.73ms │ 3532.24ms │ +1.25x faster │
│ QQuery 33 │ 4733.47ms │ 4781.66ms │ no change │
│ QQuery 34 │ 4773.42ms │ 4767.46ms │ no change │
│ QQuery 35 │ 1692.69ms │ 1587.80ms │ +1.07x faster │
│ QQuery 36 │ 265.73ms │ 280.72ms │ 1.06x slower │
│ QQuery 37 │ 123.95ms │ 130.43ms │ 1.05x slower │
│ QQuery 38 │ 143.98ms │ 141.72ms │ no change │
│ QQuery 39 │ 931.49ms │ 747.43ms │ +1.25x faster │
│ QQuery 40 │ 58.42ms │ 56.86ms │ no change │
│ QQuery 41 │ 49.07ms │ 48.09ms │ no change │
│ QQuery 42 │ 64.88ms │ 63.64ms │ no change │
└──────────────┴────────────┴──────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary ┃ ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (main_base) │ 78127.05ms │
│ Total Time (sketch-blocked-aggr-state-management) │ 75839.22ms │
│ Average Time (main_base) │ 1816.91ms │
│ Average Time (sketch-blocked-aggr-state-management) │ 1763.70ms │
│ Queries Faster │ 8 │
│ Queries Slower │ 3 │
│ Queries with No Change │ 32 │
└─────────────────────────────────────────────────────┴────────────┘
I ran the benchmarks on my test machine and I see similar results
Overall things get significantly better but a few get slower (e.g.
Q37) but I suspect we can figure out how to resolve those regressionsNice work @Rachelint Details
@Dandandan pointed out that many unnecessary mode check and self.counts.current_mut().unwrap() exist.
https://github.com/apache/datafusion/pull/11943#discussion_r1723425840
And I found the all of q22, q36, q37 will go this path, I think it may be the reason why they get slower.
Upd. The possible reason make above queries slower maybe have been fixed.
Have refactored the codes to eliminate the unnecessary operations which may be the reason why some queries slower following the suggestion https://github.com/apache/datafusion/pull/11943#discussion_r1723425840
I did a small benchmark about q9 which is slightly slower in my local before refactoring, it seems faster.
I guess maybe the refactor can help make q22, q36, q37 faster, too.
Q9: SELECT "RegionID", SUM("AdvEngineID"), COUNT(*) AS c, AVG("ResolutionWidth"), COUNT(DISTINCT "UserID") FROM hits GROUP BY "RegionID" ORDER BY c DESC LIMIT 10;
Query 9 iteration 0 took 1996.6 ms and returned 10 rows
Query 9 iteration 1 took 1993.4 ms and returned 10 rows
Query 9 iteration 2 took 2017.3 ms and returned 10 rows
Query 9 iteration 3 took 1986.7 ms and returned 10 rows
Query 9 iteration 4 took 2007.4 ms and returned 10 rows
It's late at night now, will continue remaining works after wake up and can focus.
Things to do:
- [x] fix https://github.com/apache/datafusion/pull/11943#discussion_r1723425840
- [x] Move codes to suitable crate
- Add more unit tests (For
BlockedNullState,Emit,Blocks, and some other tool functions)- [x] BlockedNullState
- [x] Emit
- [x] Blocks
- [x]
ensure_enough_room_for_xxxx
- [ ] Help to fuzzy tests/documents/further tickets
Progress logs:
- 0821, fixed https://github.com/apache/datafusion/pull/11943#discussion_r1723425840
- 0821, finished codes moving.
- 0822, finished unit tests for BlockedNullState.
- 0823, finisehd adding unit tests.
- 0824, working on eliminating the
flat mode, and we only useblocked mode(flat mode = blocked mode with big single block). - 0825, tried best to merge two modes, and after poc, found totally merge them will lead to some performance problem, so that left some braches now
- 0825, add
enable_aggregation_group_states_blocked_approachto controll on/off of this optimization. - 0826, add related comments and docs.
I plan to take another look at this tomorrow morning (again with fresh eyes -- lol)
Current progress:
1. Tried best to merge the accumulator loigc in two modes(flat and blocked)
Mentioned in: https://github.com/apache/datafusion/pull/11943#discussion_r1727161296
But for using static dispatch to improve the performance, still keep some specific branches for two modes now, detail can see: https://github.com/apache/datafusion/pull/11943#discussion_r1728363433
2. Introduce a conifg option to control on/off for blocked optimization
Have added a config option enable_aggregation_group_states_blocked_approach to control on/off for blocked optimization.
3. Introduce alter_block_size and remove switch_to_mdoe
Memtioned in https://github.com/apache/datafusion/pull/11943#discussion_r1727162759
4. Fix some implementation problems
Like https://github.com/apache/datafusion/pull/11943#discussion_r1728589712
5. Some points I am still not sure...
Should we modify to emit Vec<RecordBatch> memtioned in https://github.com/apache/datafusion/pull/11943#discussion_r1727161296
cc @alamb @jayzhan211 @2010YOUY01 @Dandandan
Hi @alamb main comments https://github.com/apache/datafusion/pull/11943#pullrequestreview-2254643945 for this pr have been fixed, minding have a quick look? It would be appreciated.
The detail about main progress:
-
Added a phase for blocked approach in GroupAggregator document https://github.com/Rachelint/arrow-datafusion/blob/b7a443a16420a5fb400297d05ee59c2122c9de6f/datafusion/physical-plan/src/aggregates/row_hash.rs#L346
-
Added physical plan level unit test and fuzzy test for blocked approach, added unit tests for new introduced structs and functions. https://github.com/Rachelint/arrow-datafusion/blob/b7a443a16420a5fb400297d05ee59c2122c9de6f/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs#L88
-
Added an option
enable_aggregation_intermediate_states_blocked_approachfor this optimization https://github.com/Rachelint/arrow-datafusion/blob/a2d81a59cac598dbf0d7a237d5d7b71b7149a82a/datafusion/common/src/config.rs#L356C13-L356C68 -
Remove the prevoius two mode (flat and blocked), like https://github.com/Rachelint/arrow-datafusion/blob/a2d81a59cac598dbf0d7a237d5d7b71b7149a82a/datafusion/functions-aggregate/src/count.rs#L379
Things planned in next prs:
- Improve the impl, now the
BlockedGroupIndexwill be computed multiple times... I want to avoid it, but it will lead to api change, and not a trivial ting... The change may be like:
// current
fn update_batch(
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()>;
// new
fn update_batch(
&mut self,
values: &[ArrayRef],
group_indices: &[BlockedGroupIndex],
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()>;
-
More complete fuzz tests, now have extracted some common codes, and use to impl the new blocked approach fuzz test. https://github.com/Rachelint/arrow-datafusion/blob/b7a443a16420a5fb400297d05ee59c2122c9de6f/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs#L88
-
Support blocked apporach in more
GroupValuess andGroupsAccumulators.
Hi @alamb main comments https://github.com/apache/datafusion/pull/11943#pullrequestreview-2254643945 for this pr have been fixed, minding have a quick look? It would be appreciated.
Sorry for the delay -- I am back now full time and will review this PR over the next few days
Marking as draft as I don't think this is waiting on review and I am trying to keep the review backlog under control
Thank you for your contribution. Unfortunately, this pull request is stale because it has been open 60 days with no activity. Please remove the stale label or comment or this will be closed in 7 days.