datafusion icon indicating copy to clipboard operation
datafusion copied to clipboard

Support Aggregating by `RunArray`s

Open brancz opened this issue 6 months ago • 1 comments

Is your feature request related to a problem or challenge?

It's currently not possible to aggregate by RunArrays.

Example code grouping by a `RunArray`
use arrow::array::{Array, Int32Array, RunArray, StringViewArray};
use arrow::datatypes::{DataType, Field, Schema, Int32Type};
use arrow::record_batch::RecordBatch;
use datafusion::datasource::MemTable;
use datafusion::prelude::*;
use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<(), datafusion::error::DataFusionError> {
    // Create a new DataFusion context
    let ctx = SessionContext::new();

    // First, let's create our data
    // We'll have temperature readings where multiple consecutive readings come from the same sensor

    // Temperature values (not run-length encoded)
    // This represents all temperature readings in sequence
    let temperatures = Int32Array::from(vec![
        22, 23, 24, 25, 22, 21, 20, 21, 22, 23, 24, 25, 26, 27, 28
    ]);

    // Create the string values for sensor IDs
    let sensor_id_values = StringViewArray::from(vec!["sensor_A", "sensor_B", "sensor_C", "sensor_D"]);

    // Create the run ends array (positions where each run ends)
    let sensor_id_run_ends = Int32Array::from(vec![4, 7, 12, 15]);

    // Create RunArray for sensor IDs with Int32Type as run end type
    let sensor_id_ree = RunArray::<Int32Type>::try_new(&sensor_id_run_ends, &sensor_id_values)
        .expect("Failed to create sensor ID RunArray");

    // Get the exact data type of the RunArray for the schema
    let sensor_id_type = sensor_id_ree.data_type().clone();

    // Create schema
    let schema = Arc::new(Schema::new(vec![
        Field::new("sensor_id", sensor_id_type, false),
        Field::new("temperature", DataType::Int32, false),
    ]));

    // Create record batch
    let batch = RecordBatch::try_new(
        schema.clone(),
        vec![Arc::new(sensor_id_ree), Arc::new(temperatures)],
    )?;

    // Register as a table
    let provider = MemTable::try_new(schema, vec![vec![batch]])?;
    ctx.register_table("sensor_readings", Arc::new(provider))?;

    // Run aggregation query
    // Group by sensor ID and calculate statistics
    let sql = "
        SELECT
            sensor_id,
            AVG(temperature) AS avg_temp,
            MIN(temperature) AS min_temp,
            MAX(temperature) AS max_temp,
            COUNT(temperature) AS reading_count
        FROM sensor_readings
        GROUP BY sensor_id
        ORDER BY sensor_id
    ";

    let results = ctx.sql(sql).await?.collect().await?;
    for batch in results {
        println!("{:?}", batch);
    }

    Ok(())
}

Describe the solution you'd like

To make it happen there are a variety of things that need to happen:

  • [x] Support for RunArrays in arrow-select's concat. https://github.com/apache/arrow-rs/pull/7487
  • [ ] Support for RunArrays in arrow-row. https://github.com/apache/arrow-rs/pull/7649
  • [ ] Support for RunArrays in arrow-data's build_extend_nulls and build_extend. https://github.com/apache/arrow-rs/pull/7671
  • [ ] Support for grouping by RunArrays in DataFusion (mainly in datafusion/common/src/cast.rs and datafusion/physical-plan/src/aggregates/group_values/row.rs to turn groups into RunArrays after aggregating and datafusion/common/src/hash_utils.rs to implement the actual hashing handling)

Describe alternatives you've considered

We're currently expanding REE arrays before pushing them through DataFusion query plans, but being able to do it with zero-copy would be much better for performance.

Additional context

I've already got all pieces implemented, but I'm opening this for context and more easy tracking.

@alamb

brancz avatar May 09 '25 18:05 brancz

I think a specialization for RunEndArrays will likely be quite powerful

alamb avatar May 09 '25 20:05 alamb

Opened the PR for the last missing piece: https://github.com/apache/datafusion/pull/18981

brancz avatar Nov 28 '25 10:11 brancz