datafusion
datafusion copied to clipboard
Allow Overriding AsyncFileReader used by ParquetExec
Is your feature request related to a problem or challenge? Please describe what you are trying to do.
We are seeing a number of projects with differing requirements for how the interaction with object store and parquet should proceed:
- Fetching multiple byte ranges in parallel - https://github.com/apache/arrow-datafusion/issues/2949
- Fetching data from sources that aren't typical object stores - https://github.com/apache/arrow-rs/issues/2230#issuecomment-1200144042
Something clearly isn't right here, and it's creating friction preventing users from getting things working.
Describe the solution you'd like
The general philosophy of DataFusion is to be pluggable, and allow for easy extension where the defaults are not applicable to the use-case. This is particularly important for the interfaces to data storage, where a lot of application-specific trade-offs will occur.
I would therefore like to propose adding an option to ParquetExec to specify ParquetOpenFn (name to be discussed).
type ParquetOpenFn = Box<dyn Fn(ObjectMeta) -> Result<Box<dyn AsyncFileReader>>>
This will be called by ParquetOpener to construct the AsyncFileReader passed to ParquetRecordBatchStream
By default this would simply construct a ParquetFileReader as currently, but the user would be able to override this with a custom implementation as desired. This would allow:
- Interacting with ObjectStore differently - #2949
- Calling out to something that isn't even an ObjectStore such as a custom tiered storage engine - https://github.com/apache/arrow-rs/issues/2230#issuecomment-1200144042
- Almost certainly something else
Thoughts @thinkharderdev @Cheappie @alamb @crepererum ?
Describe alternatives you've considered
We could not do this
Additional context
Makes sense, I guess this would just be something you can register in the SessionContext?
Makes sense, I guess this would just be something you can register in the SessionContext?
So for people manually constructing ParquetExec as part of a custom TableProvider, they can obviously just pass the new parameter.
The option could then be added to ParquetFormat to allow constructing a ListingOptions and in turn a ListingTable that uses this setting. Finally we could expose it in ParquetReadOptions so that it can be used with SessionContext::read_parquet, but I'm not sure if we want to expose it at this high a level. Does that sound reasonable?
For anyone else following along, here is the AsyncFileReader is the trait I think @tustvold is talking about https://docs.rs/parquet/19.0.0/parquet/arrow/async_reader/trait.AsyncFileReader.html
Here is the ParquetFileReader:
https://github.com/apache/arrow-datafusion/blob/c345f6d707a9bf8ed3395af997e8b3495ed6de7d/datafusion/core/src/physical_plan/file_format/parquet.rs#L350-L355
In general I think this proposal sounds like a nice compromise between "DataFusion can handle reading remote object stores out of the box" and "my project needs fine grained control" and as @tustvold mentions I think it fits well with the philosophy of DataFusion
Thank you for proposing this
So for people manually constructing ParquetExec as part of a custom TableProvider, they can obviously just pass the new parameter.
Yes this makes sense.
The option could then be added to ParquetFormat to allow constructing a ListingOptions and in turn a ListingTable that uses this setting.
Yes this makes sense
Finally we could expose it in ParquetReadOptions so that it can be used with SessionContext::read_parquet, but I'm not sure if we want to expose it at this high a level.
I suggest we start leaving it out of ParquetReadOptions because I think that interface is already somewhat confusing (see https://github.com/apache/arrow-datafusion/pull/2988) and if someone need the API we/they can add it
Looks interesting, If I am correct I should be able to replace my impl of ObjectStore with this function that provides AsyncFileReader. The only thing that is left is that I would need to pass custom attributes through ObjectMeta, in my last proposal I have suggested such additional field custom_attributes: Option<Box<[u8]>>. We can do that through either ObjectMeta or PartitionedFile, or just embed that field in PartitionedFile but we could pass some other wrapper to AsyncFileReader that would wrap ObjectMeta along with this custom field.
What do you think ?
I should be able to replace my impl
Precisely
custom_attributes
Could you not just pass these into the ParquetOpenFn on construction, I had understood them to be session related and not per-object?
Looks good. I would however advice you to NOT use Box<dyn Fn...> but Box<dyn Opener> with a new trait Opener because this later allows you to more easily add new functions (like as_any(&self) -> dyn Any, or name(&self) -> &'static str).
I should be able to replace my impl
Precisely
custom_attributes
Could you not just pass these into the
ParquetOpenFnon construction, I had understood them to be session related and not per-object?
It depends on how far I would take this solution. You are right that I have per session parameters, but also my catalog service produces listing as binary stream. If I could pass entry through Box<[u8]> instead of Path, then I am able to perform O(1) access into fields. It would be nice to have, because otherwise I would have to eagerly compose Path just after listing and then within my tiered storage impl I would have to parse components from Path.
I have another idea, instead of keeping metadata in partitions, we could emit Opener's where they could work as self contained handles.
partitions: Vec<Vec<Box<dyn Opener>>>
then trait could look like this
trait Opener {
fn open() -> Result<Box<dyn AsyncFileReader>>;
}
Pros:
- self contained
- fully customizable
Cons:
- box allocations
What do you think?
How does the following sound:
- Take
PartitionedFileinParquetOpenFninstead of justObjectMeta(or whatever it ends up getting called) - Add
extension_meta: HashMap<String, String>toPartitionedFile
we could emit Opener's where they could work as self contained handles.
I think this is definitely possible, but there is a lot of logic, e.g. partition pruning, explain plans, plan serialization, etc... not to mention tests that are reliant on the files not being opaque. It would be a fairly involved change
I see, I didn't account for that It will require so large rework. Sure such approach with passing extension_meta through PartitionedFile will work for me, but could we allow different types of MetadataExt ? Like in example below. That would allow me to simply reinterpret pointer behind box, without having to parse components.
pub enum MetadataExt {
Array(Box<[u8]>),
Map(HashMap<String, String>)
}
extension_meta: MetadataExt
Sure, I have no objection to Box<[u8]> I just thought a HashMap might be easier to use. I'm sure you're aware, but with regards to reinterpret casting, you will need to be very careful with alignment 😅
@tustvold have you started working on this issue ? If not then I have made just few initial steps toward implementing that
Closed by https://github.com/apache/arrow-datafusion/pull/3051