polars
polars copied to clipboard
Hive partitioning tracking issue
- [x] Allow the user to specify the schema of the Hive partitions manually
- [x] Support directory input: https://github.com/pola-rs/polars/issues/14342
- [x] Improve logic for inferring schema from paths, add parameter for number of paths to use for inference: https://github.com/pola-rs/polars/issues/13892
- [x] Re-use CSV parsing logic for parsing the values in Hive partition paths
- [x] Support loading from datasets where the hive columns are also stored in the data files: https://github.com/pola-rs/polars/issues/12041
- [x] Support partitioning on temporal data (e.g. date): https://github.com/pola-rs/polars/issues/12894
- Maybe add a
try_parse_hive_dates
parameter
- Maybe add a
- [ ] Support Hive partitioning logic for writers: https://github.com/pola-rs/polars/issues/11500
- Batched group-by materialization
- [ ] Support Hive partitioning logic in other readers besides Parquet:
- [ ] IPC: https://github.com/pola-rs/polars/issues/14885
- [ ] CSV
- [ ] NDJSON
If I may, here's another one https://github.com/pola-rs/polars/issues/14936
I see you just merged the ability to specify a hive partition schema manually, does it allow for partial inference? Ie. if you have multiple keys that are partitioned against but you specify only a subset of them, will it infer the rest?
@stinodego why not extend the schema to the full table instead of just the partition columns?
I see you just merged the ability to specify a hive partition schema manually, does it allow for partial inference?
At this point it does not. You have to specify the full schema of the Hive partitions. Similar to other schema
arguments in the API. I can see how a schema_overrides
type of parameter would be useful though. Not sure if they should be combined, will have to think about it.
@stinodego why not extend the schema to the full table instead of just the partition columns?
At least in the case of Parquet, that part of the schema is already available from the data. Not sure a full schema
/schema_overrides
would provide much benefit over simply casting after scanning.
@stinodego it is part of the parquet, but in situations with schema evolution, Polars would not be able to handle those situations. Also if I know the schema ahead, you can esentially skip reading the parquet metadata
in situations with schema evolution, Polars would not be able to handle those situations
Can you give an example?
you can esentially skip reading the parquet metadata
I don't know, there's other stuff in the metadata besides the schema. Not sure yet exactly what we're actually using.
in situations with schema evolution, Polars would not be able to handle those situations
Can you give an example? Sure, take these two parquet files that we have written:
df = pl.DataFrame({
"foo": [1],
"bar": [2],
}).write_parquet("polars_parquet/test1.parquet")
df = pl.DataFrame({
"foo": [2],
"bar": [3],
"baz": ["hello world"]
}).write_parquet("polars_parquet/test2.parquet")
When you read with Polars, it incorrectly assumes that the first parquet is the schema for all parquets in the table. So when you read you get only foo, bar
:
pl.read_parquet("polars_parquet/*.parquet")
shape: (2, 2)
┌─────┬─────┐
│ foo ┆ bar │
│ --- ┆ --- │
│ i64 ┆ i64 │
╞═════╪═════╡
│ 1 ┆ 2 │
│ 2 ┆ 3 │
└─────┴─────┘
Now let's write in the other order, and polars will panick because it cannot handle that a column is missing in a parquet file. See this issue I made a while ago https://github.com/pola-rs/polars/issues/14980:
df = pl.DataFrame({
"foo": [2],
"bar": [3],
"baz": ["hello world"]
}).write_parquet("polars_parquet/test1.parquet")
df = pl.DataFrame({
"foo": [1],
"bar": [2],
}).write_parquet("polars_parquet/test2.parquet")
pl.read_parquet("polars_parquet/*.parquet")
thread '<unnamed>' panicked at /home/runner/work/polars/polars/crates/polars-parquet/src/arrow/read/deserialize/mod.rs:144:31:
called `Option::unwrap()` on a `None` value
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
It's a common use case to evolve parquet tables without having to rewrite all the older files to conform to this new schema
Having something akin to Pyarrow datasets: https://github.com/pola-rs/polars/issues/13086, would make lot's of sense
Ok, I see what you mean. We should support this.
This might be interesting inspiration/source of ideas for a dataset abstraction in polars: https://padawan.readthedocs.io/en/latest/
Any chance you would reconsider this as part of the reworking of hive partition handling? https://github.com/pola-rs/polars/issues/12041
Here's another one https://github.com/pola-rs/polars/issues/15586. It's to change the default for write_statistics
to True, nothing complicated.
Here are a couple more.
Can't forget to document at the end.
This one might be a bit of a tangent but it's to incorporate the pageindex spec of parquet files https://github.com/pola-rs/polars/issues/12752
As I understand adding partitioned fields to the schema supposed to enable hive partitions support. However in my case it shows an error instead
const TEST_S3: &str = "s3://my_bucket/data_lake/some_dir/partitioned_table_root_dir/*";
let mut schema = Schema::new();
schema.with_column("year".into(), DataType::Int8);
schema.with_column("month".into(), DataType::Int8);
let schema = Arc::new(schema);
let cloud_options = cloud::CloudOptions::default().with_aws([
(Key::AccessKeyId, &cred.access_key.unwrap()),
(Key::SecretAccessKey, &cred.secret_key.unwrap()),
(Key::Region, &"eu-west-1".into()),
]);
let mut args = ScanArgsParquet::default();
args.hive_options.enabled = true;
args.hive_options.schema = Some(schema);
args.cloud_options = Some(cloud_options);
// Check time required to read the data.
let start = std::time::Instant::now();
let df = LazyFrame::scan_parquet(TEST_S3, args)?
.with_streaming(true)
.collect()?;
the result is Error: Context { error: ComputeError(ErrString("Object at location data_lake/some_dir/partitioned_table_root_dir not found: Client error with status 404 Not Found: No Body")), msg: ErrString("'parquet scan' failed") }
Enhancement request for "Support directory input": https://github.com/pola-rs/polars/issues/14342
Enhancement request for "Support directory input": #14342
Thank you. To be honest I'm quite surprised. How anyone can use this tool in any serious work without ability to load data from a directory. All tables are partitioned multi file. 👀
You can already achieve this by appending **/*.parquet
to your directory, which will read all parquet files in that directory.
Directory support will function slightly differently, as it will do some additional validation, but it's mostly the same.
Yes, it is described in the referenced enhancement request (the /**/*.parquet part).
You can already achieve this by appending
**/*.parquet
to your directory, which will read all parquet files in that directory.
Thank you but my parquet tiles do not have any extensions. And adding /**/*
does not help. It shows following error
const TEST_S3: &str = "s3://my_bucket/data_lake/some_dir/partitioned_table_root_dir/**/*
Error: Context { error: ComputeError(ErrString("Object at location partitioned_table_root_dir/year=2024/month=5 not found: Client error with status 404 Not Found: No Body")), msg: ErrString("'parquet scan' failed") }
Meanwhile if I manually set some specific combination of my partition values it works.
const TEST_S3: &str = "s3://my_bucket/data_lake/some_dir/partitioned_table_root_dir/year=2024/month=5/*";
But is believe manually adding values it is not how HIVE partitioning supposed to work? Or I'm doing something wrong?
If I'm adding extensions to all files the **/*.parquet
trick works well.
Would be grate to add Support Hive partitioning logic in other readers besides Parquet to JSON
Would be grate to add Support Hive partitioning logic in other readers besides Parquet to JSON
It's on the list!
Tossing in a suggestion to also support reading/writing Pyarrow/Spark compatible parquet _metadata files. See #7707
https://github.com/pola-rs/polars/issues/15823 probably belongs here.
@stinodego, Regarding this comment
Ok, I see what you mean. We should support this.
Is there a github issue tracking this? It's not noted in the issue checklist here and, as far as I can see, the trail goes cold with the comment and #15508.
For us, the lack of ability to explicitly set schemas for the table has prevented us using scan_parquet
. We are forced to go via scan_pyarrow_dataset
instead, which is suboptimal and messy code.
Others to track:
- https://github.com/pola-rs/polars/issues/14838
- https://github.com/pola-rs/polars/issues/14712
- https://github.com/pola-rs/polars/issues/17045
- https://github.com/pola-rs/polars/issues/14936
- https://github.com/pola-rs/polars/issues/14885
Not sure if this is the correct place to write this, but...
For the native partitioned Parquet reader, would it be possible to support loading unions of columns from different partitions when they contain different sets of columns? This would correspond to "diagonal" concat.
For example, when working with limit order book data, daily partitions of orderbook levels have varying amount of columns.
The pyarrow reader silently drops colums which are not present in all partitions at the same time.
I wonder if it would be possible to surface concatenation option to the top-level API in the native polars reader?
Some addition to ion-elgreco's comment on schema evolution: Let's not forget about hive-partitioned parquet files. It seems, that polars is working in a different way, when it comes to hive-partitioned files - it does not chop all files to the schema of the first file, but throws an error:
import polars as pl
import os
# create directories:
path1 = './a/month_code=M202406'
if not os.path.exists(path1):
os.makedirs(path1)
path2 = './a/month_code=M202407'
if not os.path.exists(path2):
os.makedirs(path2)
# create different partitions:
df = pl.DataFrame({'a': [1,2,3], 'b': ['a','b','c']})
df.write_parquet('./a/month_code=M202406/part_0.parquet')
df2 = pl.DataFrame({'a': [1,2,3], 'b': ['a','a','b'], 'c': [22,33,44]})
df2.write_parquet('./a/month_code=M202407/part_0.parquet')
# try to read data:
df3 = pl.scan_parquet('./a', hive_partitioning=True)
df3.collect()
And I get the error:
SchemaError: schemas contained differing number of columns: 2 != 3
This should be handled, as well (ideally with an option to fill those columns with null values, that do not exist in the current partition, but exists in some other partitions).
https://github.com/pola-rs/polars/issues/12041#issuecomment-2330399357