datafusion
datafusion copied to clipboard
DataFusion to run SQL queries on Parquet files with error No suitable object store found for file
I have a parquet file in a directory called /resource/user inside the project directory.
--project root
--resources
--user
--user1.parquet
--user2.parquet
--user3.parquet
--main.rs
I use this directory as the data source for query, then use Apache DataFusion to run SQL queries on Parquet files
let ctx = SessionContext::new();
// Configure listing options
let file_format = ParquetFormat::default().with_enable_pruning(Some(true));
let listing_options = ListingOptions::new(Arc::new(file_format))
.with_table_partition_cols(vec![])
.with_file_extension(".parquet")
.with_collect_stat(true);
ctx.register_listing_table(
"my_table",
&format!("file://{}","./resources/user"),
listing_options,
None,
None,
)
.await
.unwrap();
// execute the query
let df = ctx.sql("SELECT * FROM my_table LIMIT 1",)
.await?;
// print the results
df.show().await?;
running the code gives me an error saying:
called Result::unwrap() on an Err value: Internal("No suitable object store found for file://./resources/user")
I think you may need to register an object store... something like (untested):
use object_store::local::LocalFileSystem;
let local = LocalFileSystem::default();
let u = url::Url::parse("file://...")?;
ctx.runtime_env()
.register_object_store(&u, Arc::new(local));
I.e. use register_object_store to register a local file system as the object store. There are a few examples in the examples folder that register different object stores (s3 and http) for a more complete example.
Thanks @tshauck -- I hit this recently too. If this turns out to be the case, we can probably improve the documentation / make this easier to find / possibly improve the UX. Please let us know @helxsz
I used objectstore with s3 which is working well.
object_store = { version = "0.9.0", features =["aws"] }
datafusion = { git = "https://github.com/apache/arrow-datafusion", branch = "main", features = ["backtrace"]}
datafusion-common = { version = "35.0.0" }
async fn get_data_multiple_parquet_s3() -> Result<(), Box<dyn std::error::Error>> {
// create local execution context
let ctx = SessionContext::new();
let s3 = AmazonS3Builder::new()
.with_bucket_name(DEFAULT_S3_BUCKET)
//.with_region(DEFAULT_S3_REGION)
.with_access_key_id(DEFAULT_S3_ACCESS_KEY)
.with_secret_access_key(DEFAULT_S3_SECRET_KEY)
.with_endpoint(DEFAULT_S3_URL)
.with_allow_http(true)
.build()?;
const MAX_OBJECT_STORE_REQUESTS: usize = 1000;
let s3 = LimitStore::new(s3, MAX_OBJECT_STORE_REQUESTS);
let bucket_name = "test";
let path = format!("s3://{bucket_name}");
let s3_url = Url::parse(&path).unwrap();
ctx.runtime_env().register_object_store( &s3_url , Arc::new(s3));
// Configure listing options
let out_path = format!("s3://{bucket_name}/test_write/");
let file_format = ParquetFormat::default().with_enable_pruning(Some(true));
let listing_options = ListingOptions::new(Arc::new(file_format))
.with_file_extension(".parquet");
// Register a listing table - this will use all files in the directory as data sources for the query
ctx.register_listing_table(
"my_table",
&out_path,
listing_options,
None,
None,
)
.await
.unwrap();
println!("get_data_multiple_parquet_s3 。。。");
// execute the query
let df = ctx
.sql("SELECT * \
FROM my_table \
",
)
.await?;
// print the results
df.show().await?;
Ok(())
}
however trying with local file system with absolute path not working with an error saying:
called Result::unwrap() on an Err value: ObjectStore(NotFound { path: "/Users/Desktop/rust/rustapi-master/resources/user", source: Custom { kind: NotFound, error: "is directory" } })
on line
ctx.register_listing_table(
"my_table",
&format!("file://{}","/Users/Desktop/rust/rustapi-master/resources/user"),
listing_options,
None,
None,
)
.await
.unwrap();
full code in filesystem
async fn get_listing_file() -> Result<(), Box<dyn std::error::Error>> {
// create local execution context
let ctx = SessionContext::new();
use object_store::local::LocalFileSystem;
let local = LocalFileSystem::default();
let url = url::Url::parse(&format!("file://{}","/Users/Desktop/rust/rustapi-master/resources/user"))?;
ctx.runtime_env().register_object_store(&url, Arc::new(local));
// Configure listing options
let file_format = ParquetFormat::default().with_enable_pruning(Some(true));
let listing_options = ListingOptions::new(Arc::new(file_format))
//.with_table_partition_cols(vec![])
.with_file_extension(".parquet");
//.with_collect_stat(true);
// Register a listing table - this will use all files in the directory as data sources for the query
ctx.register_listing_table(
"my_table",
&format!("file://{}","/Users/Desktop/rust/rustapi-master/resources/user"),
listing_options,
None,
None,
)
.await
.unwrap();
/**/
// execute the query
let df = ctx
.sql("SELECT * \
FROM my_table \
LIMIT 1",
)
.await?;
// print the results
df.show().await?;
Ok(())
}
I think you'll need a trailing slash file:///Users/Desktop/rust/rustapi-master/resources/user/.
Based on this issue I opened up a PR to improve the parquet listing example.
Checkout for relative and absolute path examples... https://github.com/apache/arrow-datafusion/blob/7f266717f277c8afefbefc674b59a9a60164d001/datafusion-examples/examples/parquet_sql_multiple_files.rs#L44-L52
Related PR: https://github.com/apache/arrow-datafusion/pull/9310
I was recently trying to query the NYC dataset from ballista. Path looks something like https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet
What would be the correct way to register this path with DF?
Trying this:
create external table nyc_trip_data
stored as parquet
location 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet';
Results in: Error building plan: Execution error: No object store available for: https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet
I believe this is probably s3 configured for distribution with cloudfront (educated guess).
Duckdb can handle this via the httpfs extension: https://duckdb.org/docs/extensions/httpfs/https
Thanks!
@aditanase how are you running the external statement?
It seems to work well from datafusion-cli
andrewlamb@Andrews-MacBook-Pro-2:~/Software/datafusion$ datafusion-cli
DataFusion CLI v38.0.0
> create external table nyc_trip_data
stored as parquet
location 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet';
0 row(s) fetched.
Elapsed 0.819 seconds.
> describe nyc_trip_data;
+-----------------------+------------------------------+-------------+
| column_name | data_type | is_nullable |
+-----------------------+------------------------------+-------------+
| VendorID | Int32 | YES |
| tpep_pickup_datetime | Timestamp(Microsecond, None) | YES |
| tpep_dropoff_datetime | Timestamp(Microsecond, None) | YES |
| passenger_count | Int64 | YES |
| trip_distance | Float64 | YES |
| RatecodeID | Int64 | YES |
| store_and_fwd_flag | LargeUtf8 | YES |
| PULocationID | Int32 | YES |
| DOLocationID | Int32 | YES |
| payment_type | Int64 | YES |
| fare_amount | Float64 | YES |
| extra | Float64 | YES |
| mta_tax | Float64 | YES |
| tip_amount | Float64 | YES |
| tolls_amount | Float64 | YES |
| improvement_surcharge | Float64 | YES |
| total_amount | Float64 | YES |
| congestion_surcharge | Float64 | YES |
| Airport_fee | Float64 | YES |
+-----------------------+------------------------------+-------------+
19 row(s) fetched.
Elapsed 0.002 seconds.
If you are just using a dataframe you ned to register the HTTP object store:
Here is an example of how to register a file URL https://github.com/apache/datafusion/pull/10549/files#diff-bc239df2db94469eee52dbc0dba96dd7e5b8dde36dd1d63292fde71577efc3f5R96-R105
// register file:// object store provider
// Get this error if not there:
// Error: Internal("No suitable object store found for file://")
// TODO: should make the error more helpful (and add an example of how to register local file object store)
// todo add example of how to register local file object store
let url = Url::try_from("file://")
.map_err(|e| internal_datafusion_err!("can't parse file url: {e}"))?;
let object_store = object_store::local::LocalFileSystem::new();
ctx.runtime_env()
.register_object_store(&url, Arc::new(object_store));
I think you may need to register the the https://docs.rs/object_store/latest/object_store/http/struct.HttpBuilder.html object store as https://
I will try and make some more documentation about this
@alamb thanks for the very quick reply! Just tested with datafusion-cli, you're right that it's working.
I was trying from a test deployment of Ballista. Will add the object store as you recommend, looks straightforward.
I'd be happy to contribute some docs / examples if you point me at something similar.
I'd be happy to contribute some docs / examples if you point me at something similar.
Thanks @aditanase 🙏
I would recommend two things:
Suggestion 1: Change the error message
Change the error message to include instructions on how to fix the issue
For example, instead of
No suitable object store found for file://
Perhaps it could be something more like
No object store registered for url scheme 'file://'. See
RuntimeEnv::register_object_store
Suggestion 2: Add an example to the docs
Once the error message directs to RuntimeEnv::register_object_store then it would be ideal if the docs
https://docs.rs/datafusion/latest/datafusion/execution/runtime_env/struct.RuntimeEnv.html#method.register_object_store
actually had an example
So that would perhaps take the form of adapting the example above as a doc example
Does that make sense?
I filed https://github.com/apache/datafusion/issues/10616 to track these ideas for UX improvements
Sent a quick PR for the improved docs.
Also closing the loop on Ballista integration, I ended up adding a branch in the default object store that Ballista injects in new sessions: https://github.com/apache/datafusion-ballista/blob/main/ballista/core/src/object_store_registry/mod.rs#L107
Something like this seems to work - not sure if it's worth sending a PR their way, left this here in case some else needs it:
if url.to_string().starts_with("https://") || url.to_string().starts_with("http://")
{
let store = HttpBuilder::new()
.with_url(url.origin().ascii_serialization())
.build()?;
return Ok(Arc::new(store));
}