polars icon indicating copy to clipboard operation
polars copied to clipboard

Panic when reading from parquet s3 without a select statement

Open universalmind303 opened this issue 2 years ago • 2 comments

Polars version checks

  • [X] I have checked that this issue has not already been reported.

  • [X] I have confirmed this bug exists on the latest version of Polars.

Issue description

following the example in examples/read_parquet_cloud

thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: Os { code: 2, kind: NotFound, message: "No such file or directory" }',

Reproducible example

use awscreds::Credentials;
use polars::prelude::cloud;

use polars::prelude::*;
fn main() -> PolarsResult<()> {
    let s3_path = "s3://my/dataset.parquet";
    let region = "us-west-2".to_string();
    use polars::prelude::cloud::AmazonS3ConfigKey as Key;
    let cred = Credentials::default().unwrap();

    let mut args = ScanArgsParquet::default();
    let cloud_options = cloud::CloudOptions::default().with_aws([
        (Key::AccessKeyId, &cred.access_key.unwrap()),
        (Key::SecretAccessKey, &cred.secret_key.unwrap()),
        (Key::Region, &region),
    ]);
    args.cloud_options = Some(cloud_options);
    let df = LazyFrame::scan_parquet(s3_path, args)?
        .with_streaming(true)
        .collect()?;

    dbg!(df);
    Ok(())
}

However, It works fine if I use a select

    let df = LazyFrame::scan_parquet(s3_path, args)?
        .with_streaming(true)
        .select([
            // select all columns
            all(),
        ])
        .collect()?;

Expected behavior

code does not panic & reads the parquet file.

Installed versions

Cargo.toml

[dependencies]
aws-creds = "0.34.0"
polars = {git = "https://github.com/pola-rs/polars.git", rev = "a7d38dda7342eb706703b993abed40187e0633f2", features = [
  "lazy",
  "parquet",
  "aws",
  "async"
]}

universalmind303 avatar Jan 12 '23 20:01 universalmind303

Could you fix this one @universalmind303? I don't have any cloud access.

ritchie46 avatar Jan 18 '23 09:01 ritchie46

It looks like what is happening is when you use scan_parquet().with_streaming(true).select([all()]) it is converting the logical plan to a UdfExec physical plan during the conversion in physical_plan/streaming/convert.rs. This uses the streaming BatchedParquetReader. But when you don't specify with_streaming and a projection it pushes it down to a ParquetExec which calls the prepare_scan_args and reads from the file system. Non streaming projections also have this issue: scan_parquet().select([col("foo")]).

So it seems like there either needs to be a non batched parquet cloud reader, or some additional logic to convert the parquet scan with CloudOptions to force use the streaming reader.

@ritchie46 any suggestions on which approach should be taken? or maybe there are some other options i'm not considering?

I don't like the idea of forcing it to use the streaming engine, but at the same time, implementing a high performance non batched parquet cloud reader is not an easy task.

universalmind303 avatar Jan 18 '23 20:01 universalmind303