arrow icon indicating copy to clipboard operation
arrow copied to clipboard

[Python] group_by functionality directly on large dataset, instead of on a table?

Open nosterlu opened this issue 2 years ago • 7 comments

I have a large dataset that I would like to use group_by on without having to read the entire table into memory first.

After reading the documentation it seems dataset.to_batches is the best way of doing this? But it gets really complex when using other aggregation methods than for example count and sum.

I implemented it like below for count and sum, but for other more complex aggregations I am still forced to read the entire table.

table = []
for batch in ds.to_batches(columns=columns, filter=filters, batch_size=1e6):
    t = pyarrow.Table.from_batches([batch])
    table.append(t.group_by(group_by).aggregate(agg))
table = pyarrow.concat_tables(table)

# then after this I use group_by again on the concatenated table with `sum` as aggregation method

Thankful for any pointers or comments!

nosterlu avatar Jul 29 '22 13:07 nosterlu

There has been some discussions about adding basic analysis capabilities to Datasets, at the moment work is undergoing to support Dataset.filter and I guess group_by might be added in the future too. Adding group by is a bit more complex as the current Table.group_by implementation doesn't immediately support Datasets, even though it's perfectly possible.

amol- avatar Aug 09 '22 09:08 amol-

Hope this is not off-topic, but you can leverage duckdb or polars for this.

import duckdb
import pyarrow.dataset as ds
import polars as pl

dset = ds.dataset('path/to/data')

# duckdb
con = duckdb.connect()

table = con.query("SELECTsum(col1), count(col1) FROM dset WHERE  col1>10 GROUP BY col2").arrow() 

#polars
table = pl.scan_ds(dset).filter(pl.col("col1")>10).groupby("col2").agg(
        [pl.col("col1").sum(), pl.col("col1").count()]
    ).collect().to_arrow()

legout avatar Oct 06 '22 15:10 legout

Thank you @legout. Duckdb works really well, but polars is struggling. Maybe I am doing something wrong.

But anyway here is how it worked for me

# pyarrow 8.0.0
# duckdb 0.5.1
# polars 0.14.18
ib = dataset("install-base-from-vdw-standard/", filesystem=fs, partitioning="hive")

ib.count_rows()
# 1415259797
ib.schema
"""
bev: bool
market: int16
function_group: int32
part: int32
kdp: bool
kdp_accessory: bool
yearweek: int32
qty_part: int32
vehicle_type: int32
model_year: int32
-- schema metadata --
pandas: '{"index_columns": [], "column_indexes": [{"name": null, "field_n' + 1081
"""

def do_duckdb():
    sql = """
        SELECT i.part,
               i.bev,
               i.market,
               kdp_accessory,                   
               yearweek,
               SUM(i.qty_part) as qty_part_sum,
        FROM ib i
        WHERE vehicle_type=536
        GROUP BY
           i.part,
           i.bev,
           i.market,
           i.kdp_accessory,
           yearweek
    """
    conn = duckdb.connect(":memory:")
    result = conn.execute(sql)
    table = result.fetch_arrow_table()
    return table


def do_polar():
    table = (
        pl.scan_ds(ib)
        .filter("vehicle_type" == 536)
        .groupby(["part", "bev", "market", "kdp_accessory", "yearweek"])
        .agg(pl.col("qty_part").sum())
        .collect()
        .to_arrow()
    )
    return table

%time table = do_duckdb()
# memory consumption increased temporarily with 2GB, 18.8s

%time table = do_polar()
# memory consumption increased slowly to fill almost all memory (32GB) before
# normalizing, 4min 54s

Note, duckdb was 50% faster than my pyarrow implementation for doing it on a table. duckdb used a little more RAM but not much. Pyarrow table in batches uses less RAM, but slows it down a little.

def do_pyarrow_batches():
    table = []
    columns = ["part", "bev", "market", "kdp_accessory", "yearweek", "qty_part"]
    filters = field("vehicle_type") == 536
    agg = [("qty_part", "sum")]
    group_by = ["part", "bev", "market", "kdp_accessory", "yearweek"]
    for batch in ib.to_batches(columns=columns, filter=filters, batch_size=1e6):
        t = pyarrow.Table.from_batches([batch])
        table.append(t.group_by(group_by).aggregate(agg))
    table = pyarrow.concat_tables(table)
    # need to aggregate again
    new_agg = []
    for a in agg:
        how = a[1].replace("hash_", "")
        new_agg.append((a[0] + "_" + how, "sum"))
    table = table.group_by(group_by).aggregate(new_agg)
    return table

def do_pyarrow_table():
    table = (
        ib.to_table(
            columns=["part", "bev", "market", "kdp_accessory", "yearweek", "qty_part"],
            filter=field("vehicle_type") == 536,
        )
        .group_by(["part", "bev", "market", "kdp_accessory", "yearweek"])
        .aggregate([("qty_part", "sum")])
    )
    return table

``´

nosterlu avatar Oct 07 '22 08:10 nosterlu

Thank you @legout. Duckdb works really well, but polars is struggling. Maybe I am doing something wrong.

But anyway here is how it worked for me

# pyarrow 8.0.0
# duckdb 0.5.1
# polars 0.14.18
ib = dataset("install-base-from-vdw-standard/", filesystem=fs, partitioning="hive")

ib.count_rows()
# 1415259797
ib.schema
"""
bev: bool
market: int16
function_group: int32
part: int32
kdp: bool
kdp_accessory: bool
yearweek: int32
qty_part: int32
vehicle_type: int32
model_year: int32
-- schema metadata --
pandas: '{"index_columns": [], "column_indexes": [{"name": null, "field_n' + 1081
"""

def do_duckdb():
    sql = """
        SELECT i.part,
               i.bev,
               i.market,
               kdp_accessory,                   
               yearweek,
               SUM(i.qty_part) as qty_part_sum,
        FROM ib i
        WHERE vehicle_type=536
        GROUP BY
           i.part,
           i.bev,
           i.market,
           i.kdp_accessory,
           yearweek
    """
    conn = duckdb.connect(":memory:")
    result = conn.execute(sql)
    table = result.fetch_arrow_table()
    return table


def do_polar():
    table = (
        pl.scan_ds(ib)
        .filter("vehicle_type" == 536)
        .groupby(["part", "bev", "market", "kdp_accessory", "yearweek"])
        .agg(pl.col("qty_part").sum())
        .collect()
        .to_arrow()
    )
    return table

%time table = do_duckdb()
# memory consumption increased temporarily with 2GB, 18.8s

%time table = do_polar()
# memory consumption increased slowly to fill almost all memory (32GB) before
# normalizing, 4min 54s

Note, duckdb was 50% faster than my pyarrow implementation for doing it on a table. duckdb used a little more RAM but not much. Pyarrow table in batches uses less RAM, but slows it down a little.

def do_pyarrow_batches():
    table = []
    columns = ["part", "bev", "market", "kdp_accessory", "yearweek", "qty_part"]
    filters = field("vehicle_type") == 536
    agg = [("qty_part", "sum")]
    group_by = ["part", "bev", "market", "kdp_accessory", "yearweek"]
    for batch in ib.to_batches(columns=columns, filter=filters, batch_size=1e6):
        t = pyarrow.Table.from_batches([batch])
        table.append(t.group_by(group_by).aggregate(agg))
    table = pyarrow.concat_tables(table)
    # need to aggregate again
    new_agg = []
    for a in agg:
        how = a[1].replace("hash_", "")
        new_agg.append((a[0] + "_" + how, "sum"))
    table = table.group_by(group_by).aggregate(new_agg)
    return table

def do_pyarrow_table():
    table = (
        ib.to_table(
            columns=["part", "bev", "market", "kdp_accessory", "yearweek", "qty_part"],
            filter=field("vehicle_type") == 536,
        )
        .group_by(["part", "bev", "market", "kdp_accessory", "yearweek"])
        .aggregate([("qty_part", "sum")])
    )
    return table

``´

Polars is not able to push down the filter into a pyarrow dataset. Only in readers directly implemented by polars. (scan_parquet, scan_csv, etc). So that means the dataset is first completely read in memory. We could see if we can translate out predicate to filters pyarrow understands.

ritchie46 avatar Oct 07 '22 12:10 ritchie46

What is the size of the dataset and where is it stored? In a s3 bucket? If so, this could be interesting for you:

https://github.com/apache/arrow/issues/14336

legout avatar Oct 07 '22 13:10 legout

Thank you @legout. Duckdb works really well, but polars is struggling. Maybe I am doing something wrong. But anyway here is how it worked for me

# pyarrow 8.0.0
# duckdb 0.5.1
# polars 0.14.18
ib = dataset("install-base-from-vdw-standard/", filesystem=fs, partitioning="hive")

ib.count_rows()
# 1415259797
ib.schema
"""
bev: bool
market: int16
function_group: int32
part: int32
kdp: bool
kdp_accessory: bool
yearweek: int32
qty_part: int32
vehicle_type: int32
model_year: int32
-- schema metadata --
pandas: '{"index_columns": [], "column_indexes": [{"name": null, "field_n' + 1081
"""

def do_duckdb():
    sql = """
        SELECT i.part,
               i.bev,
               i.market,
               kdp_accessory,                   
               yearweek,
               SUM(i.qty_part) as qty_part_sum,
        FROM ib i
        WHERE vehicle_type=536
        GROUP BY
           i.part,
           i.bev,
           i.market,
           i.kdp_accessory,
           yearweek
    """
    conn = duckdb.connect(":memory:")
    result = conn.execute(sql)
    table = result.fetch_arrow_table()
    return table


def do_polar():
    table = (
        pl.scan_ds(ib)
        .filter("vehicle_type" == 536)
        .groupby(["part", "bev", "market", "kdp_accessory", "yearweek"])
        .agg(pl.col("qty_part").sum())
        .collect()
        .to_arrow()
    )
    return table

%time table = do_duckdb()
# memory consumption increased temporarily with 2GB, 18.8s

%time table = do_polar()
# memory consumption increased slowly to fill almost all memory (32GB) before
# normalizing, 4min 54s

Note, duckdb was 50% faster than my pyarrow implementation for doing it on a table. duckdb used a little more RAM but not much. Pyarrow table in batches uses less RAM, but slows it down a little.

def do_pyarrow_batches():
    table = []
    columns = ["part", "bev", "market", "kdp_accessory", "yearweek", "qty_part"]
    filters = field("vehicle_type") == 536
    agg = [("qty_part", "sum")]
    group_by = ["part", "bev", "market", "kdp_accessory", "yearweek"]
    for batch in ib.to_batches(columns=columns, filter=filters, batch_size=1e6):
        t = pyarrow.Table.from_batches([batch])
        table.append(t.group_by(group_by).aggregate(agg))
    table = pyarrow.concat_tables(table)
    # need to aggregate again
    new_agg = []
    for a in agg:
        how = a[1].replace("hash_", "")
        new_agg.append((a[0] + "_" + how, "sum"))
    table = table.group_by(group_by).aggregate(new_agg)
    return table

def do_pyarrow_table():
    table = (
        ib.to_table(
            columns=["part", "bev", "market", "kdp_accessory", "yearweek", "qty_part"],
            filter=field("vehicle_type") == 536,
        )
        .group_by(["part", "bev", "market", "kdp_accessory", "yearweek"])
        .aggregate([("qty_part", "sum")])
    )
    return table

``´

Polars is not able to push down the filter into a pyarrow dataset. Only in readers directly implemented by polars. (scan_parquet, scan_csv, etc). So that means the dataset is first completely read in memory. We could see if we can translate out predicate to filters pyarrow understands.

For me, a pyarrow datasets is the entry point for running queries/analytics on remote (s3 bucket) parquet datasets, whitout loading all data into memory. Is this possible with scan_parquet, too?

legout avatar Oct 07 '22 13:10 legout

What is the size of the dataset and where is it stored? In a s3 bucket? If so, this could be interesting for you:

#14336

I use adlfs towards an azure storage account. The size of the entire (gzip parquet) dataset is maybe 5GB

nosterlu avatar Oct 07 '22 14:10 nosterlu

@ritchie46, FYI, I changed the definition of the dataset to help polars with filtering the partitions like so

ibpolar = dataset("install-base-from-vdw-standard/vehicle_type=536/", filesystem=fs, partitioning="hive")

polars now used half of the memory compared to duckdb, and spent 7 seconds vs 12 on duckdb.

So being able to push down filters on partition columns would help a lot.

nosterlu avatar Nov 04 '22 08:11 nosterlu

Hi, all-- Have there been any developments on this front? @amol-

There has been some discussions about adding basic analysis capabilities to Datasets, at the moment work is undergoing to support Dataset.filter and I guess group_by might be added in the future too. Adding group by is a bit more complex as the current Table.group_by implementation doesn't immediately support Datasets, even though it's perfectly possible.

MatthewRGonzalez avatar Aug 12 '23 14:08 MatthewRGonzalez

@MatthewRGonzalez I think this can now be achieved using the new Acero Declaration API. See https://github.com/apache/arrow/blob/main/python/pyarrow/acero.py#L302-L308 and probably replace Declaration("table_source", TableSourceNodeOptions(table)) with a _dataset_to_decl call like sorting does ( https://github.com/apache/arrow/blob/main/python/pyarrow/acero.py#L280-L287 )

amol- avatar Aug 16 '23 16:08 amol-

Thanks, @amol! I'm still experimenting but the following worked:

import pyarrow as pa
import pyarrow.acero as ac
import pyarrow.dataset as ds

def _group_by(table_or_dataset, aggregates, keys):

    if isinstance(table_or_dataset, ds.Dataset):
        data_source = ac._dataset_to_decl(table_or_dataset, use_threads=True)
    else:
        data_source = ac.Declaration(
            "table_source", ac.TableSourceNodeOptions(table_or_dataset)
        )
    decl = ac.Declaration.from_sequence([data_source, ac.Declaration("aggregate", ac.AggregateNodeOptions(aggregates, keys = keys)) ])
    return(decl.to_table(use_threads=True))


## dataset from below table
# table = pa.table({'a': [1, 1, 2,2,3], 'b': [4, 5, 6,7,8]})
dataset = ds.dataset("my_data")

_group_by(dataset,[("b", "hash_sum", None, "b_sum")], keys = "e")

MatthewRGonzalez avatar Aug 20 '23 00:08 MatthewRGonzalez

A dummy question @MatthewRGonzalez from me maybe, but how do you install pyarrow acero? I have pyarrow version 10 currently because of limitation in snowflake-api, has acero been added afterwards?

nosterlu avatar Aug 23 '23 15:08 nosterlu

@nosterlu I believe it was added around version 12.

It's quite fast (maybe faster than similar polars aggregations). I hope this functionality continues to be developed!

MatthewRGonzalez avatar Aug 23 '23 18:08 MatthewRGonzalez

Polars scan_parquet can now read hive partitions from AWS, s3, gcs, http without pyarrow

deanm0000 avatar Dec 13 '23 15:12 deanm0000

Thank for the reminder on this one @deanm0000

ritchie46 avatar Dec 21 '23 19:12 ritchie46