datafusion icon indicating copy to clipboard operation
datafusion copied to clipboard

DataFusion to run SQL queries on Parquet files with error No suitable object store found for file

Open helxsz opened this issue 1 year ago • 11 comments

I have a parquet file in a directory called /resource/user inside the project directory.

--project root
  --resources
    --user
      --user1.parquet
      --user2.parquet
      --user3.parquet
  --main.rs

I use this directory as the data source for query, then use Apache DataFusion to run SQL queries on Parquet files

  let ctx = SessionContext::new();

  // Configure listing options
  let file_format = ParquetFormat::default().with_enable_pruning(Some(true));
  let listing_options = ListingOptions::new(Arc::new(file_format))
  .with_table_partition_cols(vec![])
  .with_file_extension(".parquet")
  .with_collect_stat(true);

  ctx.register_listing_table(
      "my_table",
      &format!("file://{}","./resources/user"),
      listing_options,
      None,
      None,
  )
  .await
  .unwrap();

  // execute the query
  let df = ctx.sql("SELECT * FROM my_table LIMIT 1",)
      .await?;

  // print the results
  df.show().await?;

running the code gives me an error saying:

called Result::unwrap() on an Err value: Internal("No suitable object store found for file://./resources/user")

helxsz avatar Feb 19 '24 20:02 helxsz

I think you may need to register an object store... something like (untested):

use object_store::local::LocalFileSystem;

let local = LocalFileSystem::default();
let u = url::Url::parse("file://...")?;

ctx.runtime_env()
        .register_object_store(&u, Arc::new(local));

I.e. use register_object_store to register a local file system as the object store. There are a few examples in the examples folder that register different object stores (s3 and http) for a more complete example.

tshauck avatar Feb 19 '24 22:02 tshauck

Thanks @tshauck -- I hit this recently too. If this turns out to be the case, we can probably improve the documentation / make this easier to find / possibly improve the UX. Please let us know @helxsz

alamb avatar Feb 20 '24 06:02 alamb

I used objectstore with s3 which is working well.

object_store = { version = "0.9.0", features =["aws"] }
datafusion = { git = "https://github.com/apache/arrow-datafusion", branch = "main", features = ["backtrace"]}
datafusion-common = { version = "35.0.0" }
async fn get_data_multiple_parquet_s3() -> Result<(), Box<dyn std::error::Error>> {

  // create local execution context
  let ctx = SessionContext::new();

  let s3 = AmazonS3Builder::new()
      .with_bucket_name(DEFAULT_S3_BUCKET)
      //.with_region(DEFAULT_S3_REGION)
      .with_access_key_id(DEFAULT_S3_ACCESS_KEY)
      .with_secret_access_key(DEFAULT_S3_SECRET_KEY)
      .with_endpoint(DEFAULT_S3_URL)
      .with_allow_http(true)
      .build()?;

  const MAX_OBJECT_STORE_REQUESTS: usize = 1000;

  let s3 = LimitStore::new(s3, MAX_OBJECT_STORE_REQUESTS);

  let bucket_name = "test";
  let path = format!("s3://{bucket_name}");
  let s3_url = Url::parse(&path).unwrap();

  ctx.runtime_env().register_object_store( &s3_url , Arc::new(s3));
  
  // Configure listing options
  let out_path = format!("s3://{bucket_name}/test_write/");

  let file_format = ParquetFormat::default().with_enable_pruning(Some(true));
  let listing_options = ListingOptions::new(Arc::new(file_format))
  .with_file_extension(".parquet");

  // Register a listing table - this will use all files in the directory as data sources for the query
  ctx.register_listing_table(
      "my_table",
      &out_path,
      listing_options,
      None,
      None,
  )
  .await
  .unwrap();

println!("get_data_multiple_parquet_s3 。。。");

  // execute the query
  let df = ctx
      .sql("SELECT * \
      FROM my_table \
      ",
      )
      .await?;

  // print the results
  df.show().await?;

  Ok(())
}

however trying with local file system with absolute path not working with an error saying:

called Result::unwrap() on an Err value: ObjectStore(NotFound { path: "/Users/Desktop/rust/rustapi-master/resources/user", source: Custom { kind: NotFound, error: "is directory" } })

on line

  ctx.register_listing_table(
      "my_table",
      &format!("file://{}","/Users/Desktop/rust/rustapi-master/resources/user"),
      listing_options,
      None,
      None,
  )
  .await
  .unwrap();

full code in filesystem

async fn get_listing_file() -> Result<(), Box<dyn std::error::Error>> {

  // create local execution context
  let ctx = SessionContext::new();

  use object_store::local::LocalFileSystem;

  let local = LocalFileSystem::default();
  let url = url::Url::parse(&format!("file://{}","/Users/Desktop/rust/rustapi-master/resources/user"))?;
  
  ctx.runtime_env().register_object_store(&url, Arc::new(local));
  
  // Configure listing options
   
  let file_format = ParquetFormat::default().with_enable_pruning(Some(true));
  let listing_options = ListingOptions::new(Arc::new(file_format))
  //.with_table_partition_cols(vec![])
  .with_file_extension(".parquet");
  //.with_collect_stat(true);

  // Register a listing table - this will use all files in the directory as data sources for the query
  ctx.register_listing_table(
      "my_table",
      &format!("file://{}","/Users/Desktop/rust/rustapi-master/resources/user"),
      listing_options,
      None,
      None,
  )
  .await
  .unwrap();
/**/

  // execute the query
  let df = ctx
      .sql("SELECT * \
      FROM my_table \
      LIMIT 1",
      )
      .await?;

  // print the results
  df.show().await?;

  Ok(())
}

helxsz avatar Feb 22 '24 10:02 helxsz

I think you'll need a trailing slash file:///Users/Desktop/rust/rustapi-master/resources/user/.

Based on this issue I opened up a PR to improve the parquet listing example.

Checkout for relative and absolute path examples... https://github.com/apache/arrow-datafusion/blob/7f266717f277c8afefbefc674b59a9a60164d001/datafusion-examples/examples/parquet_sql_multiple_files.rs#L44-L52

tshauck avatar Feb 22 '24 16:02 tshauck

Related PR: https://github.com/apache/arrow-datafusion/pull/9310

alamb avatar Feb 26 '24 18:02 alamb

I was recently trying to query the NYC dataset from ballista. Path looks something like https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet

What would be the correct way to register this path with DF?

Trying this:

create external table nyc_trip_data
stored as parquet
location 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet';

Results in: Error building plan: Execution error: No object store available for: https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet

I believe this is probably s3 configured for distribution with cloudfront (educated guess). Duckdb can handle this via the httpfs extension: https://duckdb.org/docs/extensions/httpfs/https

Thanks!

aditanase avatar May 20 '24 13:05 aditanase

@aditanase how are you running the external statement?

It seems to work well from datafusion-cli

andrewlamb@Andrews-MacBook-Pro-2:~/Software/datafusion$ datafusion-cli
DataFusion CLI v38.0.0
> create external table nyc_trip_data
stored as parquet
location 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet';
0 row(s) fetched.
Elapsed 0.819 seconds.

> describe nyc_trip_data;
+-----------------------+------------------------------+-------------+
| column_name           | data_type                    | is_nullable |
+-----------------------+------------------------------+-------------+
| VendorID              | Int32                        | YES         |
| tpep_pickup_datetime  | Timestamp(Microsecond, None) | YES         |
| tpep_dropoff_datetime | Timestamp(Microsecond, None) | YES         |
| passenger_count       | Int64                        | YES         |
| trip_distance         | Float64                      | YES         |
| RatecodeID            | Int64                        | YES         |
| store_and_fwd_flag    | LargeUtf8                    | YES         |
| PULocationID          | Int32                        | YES         |
| DOLocationID          | Int32                        | YES         |
| payment_type          | Int64                        | YES         |
| fare_amount           | Float64                      | YES         |
| extra                 | Float64                      | YES         |
| mta_tax               | Float64                      | YES         |
| tip_amount            | Float64                      | YES         |
| tolls_amount          | Float64                      | YES         |
| improvement_surcharge | Float64                      | YES         |
| total_amount          | Float64                      | YES         |
| congestion_surcharge  | Float64                      | YES         |
| Airport_fee           | Float64                      | YES         |
+-----------------------+------------------------------+-------------+
19 row(s) fetched.
Elapsed 0.002 seconds.

If you are just using a dataframe you ned to register the HTTP object store:

Here is an example of how to register a file URL https://github.com/apache/datafusion/pull/10549/files#diff-bc239df2db94469eee52dbc0dba96dd7e5b8dde36dd1d63292fde71577efc3f5R96-R105

    // register file:// object store provider
    //  Get this error if not there:
    // Error: Internal("No suitable object store found for file://")
    // TODO: should make the error more helpful (and add an example of how to register local file object store)
    // todo add example of how to register local file object store
    let url = Url::try_from("file://")
        .map_err(|e| internal_datafusion_err!("can't parse file url: {e}"))?;
    let object_store = object_store::local::LocalFileSystem::new();
    ctx.runtime_env()
        .register_object_store(&url, Arc::new(object_store));

I think you may need to register the the https://docs.rs/object_store/latest/object_store/http/struct.HttpBuilder.html object store as https://

I will try and make some more documentation about this

alamb avatar May 20 '24 15:05 alamb

@alamb thanks for the very quick reply! Just tested with datafusion-cli, you're right that it's working. I was trying from a test deployment of Ballista. Will add the object store as you recommend, looks straightforward.

I'd be happy to contribute some docs / examples if you point me at something similar.

aditanase avatar May 21 '24 13:05 aditanase

I'd be happy to contribute some docs / examples if you point me at something similar.

Thanks @aditanase 🙏

I would recommend two things:

Suggestion 1: Change the error message

Change the error message to include instructions on how to fix the issue

For example, instead of

No suitable object store found for file://

Perhaps it could be something more like

No object store registered for url scheme 'file://'. See RuntimeEnv::register_object_store

Suggestion 2: Add an example to the docs

Once the error message directs to RuntimeEnv::register_object_store then it would be ideal if the docs

https://docs.rs/datafusion/latest/datafusion/execution/runtime_env/struct.RuntimeEnv.html#method.register_object_store

actually had an example

So that would perhaps take the form of adapting the example above as a doc example

Does that make sense?

alamb avatar May 21 '24 18:05 alamb

I filed https://github.com/apache/datafusion/issues/10616 to track these ideas for UX improvements

alamb avatar May 22 '24 10:05 alamb

Sent a quick PR for the improved docs.

Also closing the loop on Ballista integration, I ended up adding a branch in the default object store that Ballista injects in new sessions: https://github.com/apache/datafusion-ballista/blob/main/ballista/core/src/object_store_registry/mod.rs#L107

Something like this seems to work - not sure if it's worth sending a PR their way, left this here in case some else needs it:

if url.to_string().starts_with("https://") || url.to_string().starts_with("http://")
{
    let store = HttpBuilder::new()
        .with_url(url.origin().ascii_serialization())
        .build()?;
    return Ok(Arc::new(store));
}

aditanase avatar May 22 '24 11:05 aditanase