datafusion
datafusion copied to clipboard
Support Aggregating by `RunArray`s
Is your feature request related to a problem or challenge?
It's currently not possible to aggregate by RunArrays.
Example code grouping by a `RunArray`
use arrow::array::{Array, Int32Array, RunArray, StringViewArray};
use arrow::datatypes::{DataType, Field, Schema, Int32Type};
use arrow::record_batch::RecordBatch;
use datafusion::datasource::MemTable;
use datafusion::prelude::*;
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), datafusion::error::DataFusionError> {
// Create a new DataFusion context
let ctx = SessionContext::new();
// First, let's create our data
// We'll have temperature readings where multiple consecutive readings come from the same sensor
// Temperature values (not run-length encoded)
// This represents all temperature readings in sequence
let temperatures = Int32Array::from(vec![
22, 23, 24, 25, 22, 21, 20, 21, 22, 23, 24, 25, 26, 27, 28
]);
// Create the string values for sensor IDs
let sensor_id_values = StringViewArray::from(vec!["sensor_A", "sensor_B", "sensor_C", "sensor_D"]);
// Create the run ends array (positions where each run ends)
let sensor_id_run_ends = Int32Array::from(vec![4, 7, 12, 15]);
// Create RunArray for sensor IDs with Int32Type as run end type
let sensor_id_ree = RunArray::<Int32Type>::try_new(&sensor_id_run_ends, &sensor_id_values)
.expect("Failed to create sensor ID RunArray");
// Get the exact data type of the RunArray for the schema
let sensor_id_type = sensor_id_ree.data_type().clone();
// Create schema
let schema = Arc::new(Schema::new(vec![
Field::new("sensor_id", sensor_id_type, false),
Field::new("temperature", DataType::Int32, false),
]));
// Create record batch
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(sensor_id_ree), Arc::new(temperatures)],
)?;
// Register as a table
let provider = MemTable::try_new(schema, vec![vec![batch]])?;
ctx.register_table("sensor_readings", Arc::new(provider))?;
// Run aggregation query
// Group by sensor ID and calculate statistics
let sql = "
SELECT
sensor_id,
AVG(temperature) AS avg_temp,
MIN(temperature) AS min_temp,
MAX(temperature) AS max_temp,
COUNT(temperature) AS reading_count
FROM sensor_readings
GROUP BY sensor_id
ORDER BY sensor_id
";
let results = ctx.sql(sql).await?.collect().await?;
for batch in results {
println!("{:?}", batch);
}
Ok(())
}
Describe the solution you'd like
To make it happen there are a variety of things that need to happen:
- [x] Support for
RunArrays inarrow-select'sconcat. https://github.com/apache/arrow-rs/pull/7487 - [ ] Support for
RunArrays inarrow-row. https://github.com/apache/arrow-rs/pull/7649 - [ ] Support for
RunArrays inarrow-data'sbuild_extend_nullsandbuild_extend. https://github.com/apache/arrow-rs/pull/7671 - [ ] Support for grouping by
RunArrays in DataFusion (mainly indatafusion/common/src/cast.rsanddatafusion/physical-plan/src/aggregates/group_values/row.rsto turn groups intoRunArrays after aggregating anddatafusion/common/src/hash_utils.rsto implement the actual hashing handling)
Describe alternatives you've considered
We're currently expanding REE arrays before pushing them through DataFusion query plans, but being able to do it with zero-copy would be much better for performance.
Additional context
I've already got all pieces implemented, but I'm opening this for context and more easy tracking.
@alamb
I think a specialization for RunEndArrays will likely be quite powerful
Opened the PR for the last missing piece: https://github.com/apache/datafusion/pull/18981