polars icon indicating copy to clipboard operation
polars copied to clipboard

Implement streaming for scan_pyarrow_dataset

Open lmocsi opened this issue 2 years ago • 14 comments
trafficstars

Description

As per the streaming API documentation (https://pola-rs.github.io/polars/user-guide/concepts/streaming/#when-is-streaming-available) streaming now supports: scan_csv, scan_parquet, scan_ipc. Would be nice, if it supported scan_pyarrow_dataset, as well. (Or if scan_parquet supported partitioning='hive')

lmocsi avatar Sep 29 '23 18:09 lmocsi

Yes, I would welcome this. I would like a function that accepts polars expression that represent predicates, column names that are projected and slice information and then could produce a python generator.

ritchie46 avatar Sep 29 '23 18:09 ritchie46

Yeah, a big use case for me is to use a pyarrow scanner to filter a dataset (certain rows and subset of columns) from location A (GCS) and save it to location B (ABFS) without ever reading the data as a pyarrow Table.

ldacey avatar Sep 30 '23 18:09 ldacey

Yeah, a big use case for me is to use a pyarrow scanner to filter a dataset (certain rows and subset of columns) from location A (GCS) and save it to location B (ABFS) without ever reading the data as a pyarrow Table.

You can do this now without polars see https://arrow.apache.org/docs/python/dataset.html go all the way to the section Writing large amounts of data

deanm0000 avatar Oct 03 '23 12:10 deanm0000

Yeah, a big use case for me is to use a pyarrow scanner to filter a dataset (certain rows and subset of columns) from location A (GCS) and save it to location B (ABFS) without ever reading the data as a pyarrow Table.

You can do this now without polars see https://arrow.apache.org/docs/python/dataset.html go all the way to the section Writing large amounts of data

Yeah - I do it without polars today. All of my other tasks (Airflow) are using Polars to read the data into a LazyFrame to transform it, filter it, drop duplicates, etc. I was just saying it would be cool to be able to read a dataset and then stream the output with polars, similar to how a scanner works in pyarrow.

For example, this worked for a 450mil row dataset and allowed me to repartition the data monthly instead of daily. Is there a way to do this with Polars currently to replace projection with df.with_columns(..) or do other LazyFrame transformations/filters?

projection = {col: ds.field(col) for col in dataset.schema.names}
projection.update({"month_id": pc.strftime(ds.field("timestamp_utc"), format="%Y%m").cast(pa.int32())}) 
scanner = dataset.scanner(columns=projection)
ds.write_dataset(
    data=scanner,
    base_dir="...",
    filesystem=gcs,
    format="parquet",
    partitioning=["month_id"],
    partitioning_flavor="hive",
)

ldacey avatar Oct 03 '23 13:10 ldacey

pyarrow's dataset writer will/can open all (or if too many, a big subset of) the destination files simultaneously and write to them as it traverses the source. polars doesn't have that capability yet so you'd have to sink_parquet to each destination of your hive manually. In other words, even if/when polars streams from a pyarrow dataset, it'd be a separate feature to have it sink to multiple parquet files as a purpose-build dataset writer. It seems the direction polars is going is to have native dataset readers and writers rather than to rely on pyarrow at all.

deanm0000 avatar Oct 03 '23 15:10 deanm0000

pyarrow's dataset writer will/can open all (or if too many, a big subset of) the destination files simultaneously and write to them as it traverses the source. polars doesn't have that capability yet so you'd have to sink_parquet to each destination of your hive manually. In other words, even if/when polars streams from a pyarrow dataset, it'd be a separate feature to have it sink to multiple parquet files as a purpose-build dataset writer. It seems the direction polars is going is to have native dataset readers and writers rather than to rely on pyarrow at all.

Is this capability on the todo list at polars, or should I request it (writing multiple files simultaneously)?

lmocsi avatar Oct 03 '23 19:10 lmocsi

I don't think there's a to-do list apart from the issues list. I suppose it can't hurt to create the issue but I'd frame it more along the lines of writing datasets including hive partitioning natively rather than focusing on the underlying parallelism that would accompany it. Just my 2 cents. I don't really speak for the main devs of the project.

deanm0000 avatar Oct 03 '23 22:10 deanm0000

@lmocsi did you have the opportunity to create the issue related with the topic? I would also be interested of having streaming mode for scan_pyarrow_dataset as well.

As for now, I use a workaround described in this article : getting all the table partitions on S3 in a list and then call the scan_parquet function to be able to use the streaming mode.

One issue I had is if one of the sources tables is updated during the pipeline run, it will crash it.

arnaud-vennin avatar Nov 09 '23 11:11 arnaud-vennin

@arnaud-vennin did you manage to run your code on private S3 cloud, rather than AWS? currently I can't achieve streaming.

barak1412 avatar Dec 12 '23 17:12 barak1412

Not related to the issue itself, but wondering why you went with arrow dataset instead of parquet dataset.

jmakov avatar Dec 15 '23 02:12 jmakov

@jmakov look at my ticket here.

barak1412 avatar Dec 15 '23 08:12 barak1412

Not related to the issue itself, but wondering why you went with arrow dataset instead of parquet dataset.

Cause at the time of submitting the issue, scan_parquet did not support hive partitioning I guess.

lmocsi avatar Feb 21 '24 09:02 lmocsi

@barak1412 I'm sorry I missed the notification related to your question. Currently we are using scan_parquet() which now accepts partitioning,

arnaud-vennin avatar May 17 '24 12:05 arnaud-vennin

@arnaud-vennin Thanks. Unfortunately still no streaming option for scan_pyarrow_dataset, so for example, working with HDFS metastore is still very limited (I hope it will be prioritized soon, after the streaming engine redesign).

barak1412 avatar May 17 '24 15:05 barak1412