polars icon indicating copy to clipboard operation
polars copied to clipboard

Hive partitioning tracking issue

Open stinodego opened this issue 11 months ago • 24 comments

  • [x] Allow the user to specify the schema of the Hive partitions manually
  • [x] Support directory input: https://github.com/pola-rs/polars/issues/14342
  • [x] Improve logic for inferring schema from paths, add parameter for number of paths to use for inference: https://github.com/pola-rs/polars/issues/13892
  • [x] Re-use CSV parsing logic for parsing the values in Hive partition paths
  • [x] Support loading from datasets where the hive columns are also stored in the data files: https://github.com/pola-rs/polars/issues/12041
  • [x] Support partitioning on temporal data (e.g. date): https://github.com/pola-rs/polars/issues/12894
    • Maybe add a try_parse_hive_dates parameter
  • [ ] Support Hive partitioning logic for writers: https://github.com/pola-rs/polars/issues/11500
    • Batched group-by materialization
  • [ ] Support Hive partitioning logic in other readers besides Parquet:
    • [ ] IPC: https://github.com/pola-rs/polars/issues/14885
    • [ ] CSV
    • [ ] NDJSON

stinodego avatar Apr 02 '24 13:04 stinodego

If I may, here's another one https://github.com/pola-rs/polars/issues/14936

deanm0000 avatar Apr 02 '24 15:04 deanm0000

I see you just merged the ability to specify a hive partition schema manually, does it allow for partial inference? Ie. if you have multiple keys that are partitioned against but you specify only a subset of them, will it infer the rest?

kszlim avatar Apr 03 '24 06:04 kszlim

@stinodego why not extend the schema to the full table instead of just the partition columns?

ion-elgreco avatar Apr 03 '24 07:04 ion-elgreco

I see you just merged the ability to specify a hive partition schema manually, does it allow for partial inference?

At this point it does not. You have to specify the full schema of the Hive partitions. Similar to other schema arguments in the API. I can see how a schema_overrides type of parameter would be useful though. Not sure if they should be combined, will have to think about it.

@stinodego why not extend the schema to the full table instead of just the partition columns?

At least in the case of Parquet, that part of the schema is already available from the data. Not sure a full schema/schema_overrides would provide much benefit over simply casting after scanning.

stinodego avatar Apr 03 '24 08:04 stinodego

@stinodego it is part of the parquet, but in situations with schema evolution, Polars would not be able to handle those situations. Also if I know the schema ahead, you can esentially skip reading the parquet metadata

ion-elgreco avatar Apr 03 '24 09:04 ion-elgreco

in situations with schema evolution, Polars would not be able to handle those situations

Can you give an example?

you can esentially skip reading the parquet metadata

I don't know, there's other stuff in the metadata besides the schema. Not sure yet exactly what we're actually using.

stinodego avatar Apr 03 '24 09:04 stinodego

in situations with schema evolution, Polars would not be able to handle those situations

Can you give an example? Sure, take these two parquet files that we have written:

df = pl.DataFrame({
    "foo": [1],
    "bar": [2],
}).write_parquet("polars_parquet/test1.parquet")

df = pl.DataFrame({
    "foo": [2],
    "bar": [3],
    "baz": ["hello world"]
}).write_parquet("polars_parquet/test2.parquet")

When you read with Polars, it incorrectly assumes that the first parquet is the schema for all parquets in the table. So when you read you get only foo, bar:

pl.read_parquet("polars_parquet/*.parquet")
shape: (2, 2)
┌─────┬─────┐
│ foo ┆ bar │
│ --- ┆ --- │
│ i64 ┆ i64 │
╞═════╪═════╡
│ 1   ┆ 2   │
│ 2   ┆ 3   │
└─────┴─────┘

Now let's write in the other order, and polars will panick because it cannot handle that a column is missing in a parquet file. See this issue I made a while ago https://github.com/pola-rs/polars/issues/14980:

df = pl.DataFrame({
    "foo": [2],
    "bar": [3],
    "baz": ["hello world"]
}).write_parquet("polars_parquet/test1.parquet")

df = pl.DataFrame({
    "foo": [1],
    "bar": [2],
}).write_parquet("polars_parquet/test2.parquet")

pl.read_parquet("polars_parquet/*.parquet")

thread '<unnamed>' panicked at /home/runner/work/polars/polars/crates/polars-parquet/src/arrow/read/deserialize/mod.rs:144:31:
called `Option::unwrap()` on a `None` value
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

It's a common use case to evolve parquet tables without having to rewrite all the older files to conform to this new schema

ion-elgreco avatar Apr 03 '24 10:04 ion-elgreco

Having something akin to Pyarrow datasets: https://github.com/pola-rs/polars/issues/13086, would make lot's of sense

ion-elgreco avatar Apr 03 '24 10:04 ion-elgreco

Ok, I see what you mean. We should support this.

stinodego avatar Apr 03 '24 11:04 stinodego

This might be interesting inspiration/source of ideas for a dataset abstraction in polars: https://padawan.readthedocs.io/en/latest/

kszlim avatar Apr 06 '24 23:04 kszlim

Any chance you would reconsider this as part of the reworking of hive partition handling? https://github.com/pola-rs/polars/issues/12041

jrothbaum avatar Apr 09 '24 18:04 jrothbaum

Here's another one https://github.com/pola-rs/polars/issues/15586. It's to change the default for write_statistics to True, nothing complicated.

deanm0000 avatar Apr 10 '24 19:04 deanm0000

Here are a couple more.

Can't forget to document at the end.

This one might be a bit of a tangent but it's to incorporate the pageindex spec of parquet files https://github.com/pola-rs/polars/issues/12752

deanm0000 avatar Apr 12 '24 10:04 deanm0000

As I understand adding partitioned fields to the schema supposed to enable hive partitions support. However in my case it shows an error instead

const TEST_S3: &str = "s3://my_bucket/data_lake/some_dir/partitioned_table_root_dir/*";

 let mut schema = Schema::new();
    schema.with_column("year".into(), DataType::Int8);
    schema.with_column("month".into(), DataType::Int8);

    let schema = Arc::new(schema);

    let cloud_options = cloud::CloudOptions::default().with_aws([
        (Key::AccessKeyId, &cred.access_key.unwrap()),
        (Key::SecretAccessKey, &cred.secret_key.unwrap()),
        (Key::Region, &"eu-west-1".into()),
    ]);

    let mut args = ScanArgsParquet::default();
    args.hive_options.enabled = true;
    args.hive_options.schema = Some(schema);
    args.cloud_options = Some(cloud_options);

    // Check time required to read the data.
    let start = std::time::Instant::now();

    let df = LazyFrame::scan_parquet(TEST_S3, args)?
        .with_streaming(true)
        .collect()?;

the result is Error: Context { error: ComputeError(ErrString("Object at location data_lake/some_dir/partitioned_table_root_dir not found: Client error with status 404 Not Found: No Body")), msg: ErrString("'parquet scan' failed") }

Smotrov avatar May 01 '24 10:05 Smotrov

Enhancement request for "Support directory input": https://github.com/pola-rs/polars/issues/14342

lmocsi avatar May 04 '24 20:05 lmocsi

Enhancement request for "Support directory input": #14342

Thank you. To be honest I'm quite surprised. How anyone can use this tool in any serious work without ability to load data from a directory. All tables are partitioned multi file. 👀

Smotrov avatar May 07 '24 08:05 Smotrov

You can already achieve this by appending **/*.parquet to your directory, which will read all parquet files in that directory.

Directory support will function slightly differently, as it will do some additional validation, but it's mostly the same.

stinodego avatar May 07 '24 11:05 stinodego

Yes, it is described in the referenced enhancement request (the /**/*.parquet part).

lmocsi avatar May 07 '24 18:05 lmocsi

You can already achieve this by appending **/*.parquet to your directory, which will read all parquet files in that directory.

Thank you but my parquet tiles do not have any extensions. And adding /**/* does not help. It shows following error

const TEST_S3: &str = "s3://my_bucket/data_lake/some_dir/partitioned_table_root_dir/**/*

Error: Context { error: ComputeError(ErrString("Object at location partitioned_table_root_dir/year=2024/month=5 not found: Client error with status 404 Not Found: No Body")), msg: ErrString("'parquet scan' failed") }

Meanwhile if I manually set some specific combination of my partition values it works.

const TEST_S3: &str = "s3://my_bucket/data_lake/some_dir/partitioned_table_root_dir/year=2024/month=5/*";

But is believe manually adding values it is not how HIVE partitioning supposed to work? Or I'm doing something wrong?

If I'm adding extensions to all files the **/*.parquet trick works well.

Smotrov avatar May 08 '24 09:05 Smotrov

Would be grate to add Support Hive partitioning logic in other readers besides Parquet to JSON

Smotrov avatar May 08 '24 16:05 Smotrov

Would be grate to add Support Hive partitioning logic in other readers besides Parquet to JSON

It's on the list!

stinodego avatar May 09 '24 17:05 stinodego

Tossing in a suggestion to also support reading/writing Pyarrow/Spark compatible parquet _metadata files. See #7707

talawahtech avatar May 18 '24 19:05 talawahtech

https://github.com/pola-rs/polars/issues/15823 probably belongs here.

deanm0000 avatar Jun 06 '24 22:06 deanm0000

@stinodego, Regarding this comment

Ok, I see what you mean. We should support this.

Is there a github issue tracking this? It's not noted in the issue checklist here and, as far as I can see, the trail goes cold with the comment and #15508.

For us, the lack of ability to explicitly set schemas for the table has prevented us using scan_parquet. We are forced to go via scan_pyarrow_dataset instead, which is suboptimal and messy code.

couling avatar Jun 29 '24 06:06 couling

Others to track:

  • https://github.com/pola-rs/polars/issues/14838
  • https://github.com/pola-rs/polars/issues/14712
  • https://github.com/pola-rs/polars/issues/17045
  • https://github.com/pola-rs/polars/issues/14936
  • https://github.com/pola-rs/polars/issues/14885

ritchie46 avatar Jul 03 '24 07:07 ritchie46

Not sure if this is the correct place to write this, but...

For the native partitioned Parquet reader, would it be possible to support loading unions of columns from different partitions when they contain different sets of columns? This would correspond to "diagonal" concat.

For example, when working with limit order book data, daily partitions of orderbook levels have varying amount of columns.

The pyarrow reader silently drops colums which are not present in all partitions at the same time.

I wonder if it would be possible to surface concatenation option to the top-level API in the native polars reader?

danielgafni avatar Jul 08 '24 18:07 danielgafni

Some addition to ion-elgreco's comment on schema evolution: Let's not forget about hive-partitioned parquet files. It seems, that polars is working in a different way, when it comes to hive-partitioned files - it does not chop all files to the schema of the first file, but throws an error:

import polars as pl
import os

# create directories:
path1 = './a/month_code=M202406' 
if not os.path.exists(path1):
    os.makedirs(path1)
path2 = './a/month_code=M202407' 
if not os.path.exists(path2):
    os.makedirs(path2)

# create different partitions:
df = pl.DataFrame({'a': [1,2,3], 'b': ['a','b','c']})
df.write_parquet('./a/month_code=M202406/part_0.parquet')

df2 = pl.DataFrame({'a': [1,2,3], 'b': ['a','a','b'], 'c': [22,33,44]})
df2.write_parquet('./a/month_code=M202407/part_0.parquet')

# try to read data:
df3 = pl.scan_parquet('./a', hive_partitioning=True)
df3.collect()

And I get the error: SchemaError: schemas contained differing number of columns: 2 != 3

This should be handled, as well (ideally with an option to fill those columns with null values, that do not exist in the current partition, but exists in some other partitions).

lmocsi avatar Sep 03 '24 12:09 lmocsi

https://github.com/pola-rs/polars/issues/12041#issuecomment-2330399357

Veiasai avatar Sep 07 '24 08:09 Veiasai