datafusion icon indicating copy to clipboard operation
datafusion copied to clipboard

Allow Overriding AsyncFileReader used by ParquetExec

Open tustvold opened this issue 3 years ago • 13 comments

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

We are seeing a number of projects with differing requirements for how the interaction with object store and parquet should proceed:

  • Fetching multiple byte ranges in parallel - https://github.com/apache/arrow-datafusion/issues/2949
  • Fetching data from sources that aren't typical object stores - https://github.com/apache/arrow-rs/issues/2230#issuecomment-1200144042

Something clearly isn't right here, and it's creating friction preventing users from getting things working.

Describe the solution you'd like

The general philosophy of DataFusion is to be pluggable, and allow for easy extension where the defaults are not applicable to the use-case. This is particularly important for the interfaces to data storage, where a lot of application-specific trade-offs will occur.

I would therefore like to propose adding an option to ParquetExec to specify ParquetOpenFn (name to be discussed).

type ParquetOpenFn = Box<dyn Fn(ObjectMeta) -> Result<Box<dyn AsyncFileReader>>>

This will be called by ParquetOpener to construct the AsyncFileReader passed to ParquetRecordBatchStream

By default this would simply construct a ParquetFileReader as currently, but the user would be able to override this with a custom implementation as desired. This would allow:

  • Interacting with ObjectStore differently - #2949
  • Calling out to something that isn't even an ObjectStore such as a custom tiered storage engine - https://github.com/apache/arrow-rs/issues/2230#issuecomment-1200144042
  • Almost certainly something else

Thoughts @thinkharderdev @Cheappie @alamb @crepererum ?

Describe alternatives you've considered

We could not do this

Additional context

tustvold avatar Jul 31 '22 17:07 tustvold

Makes sense, I guess this would just be something you can register in the SessionContext?

thinkharderdev avatar Jul 31 '22 20:07 thinkharderdev

Makes sense, I guess this would just be something you can register in the SessionContext?

So for people manually constructing ParquetExec as part of a custom TableProvider, they can obviously just pass the new parameter.

The option could then be added to ParquetFormat to allow constructing a ListingOptions and in turn a ListingTable that uses this setting. Finally we could expose it in ParquetReadOptions so that it can be used with SessionContext::read_parquet, but I'm not sure if we want to expose it at this high a level. Does that sound reasonable?

tustvold avatar Jul 31 '22 20:07 tustvold

For anyone else following along, here is the AsyncFileReader is the trait I think @tustvold is talking about https://docs.rs/parquet/19.0.0/parquet/arrow/async_reader/trait.AsyncFileReader.html

Here is the ParquetFileReader: https://github.com/apache/arrow-datafusion/blob/c345f6d707a9bf8ed3395af997e8b3495ed6de7d/datafusion/core/src/physical_plan/file_format/parquet.rs#L350-L355

alamb avatar Jul 31 '22 21:07 alamb

In general I think this proposal sounds like a nice compromise between "DataFusion can handle reading remote object stores out of the box" and "my project needs fine grained control" and as @tustvold mentions I think it fits well with the philosophy of DataFusion

Thank you for proposing this

So for people manually constructing ParquetExec as part of a custom TableProvider, they can obviously just pass the new parameter.

Yes this makes sense.

The option could then be added to ParquetFormat to allow constructing a ListingOptions and in turn a ListingTable that uses this setting.

Yes this makes sense

Finally we could expose it in ParquetReadOptions so that it can be used with SessionContext::read_parquet, but I'm not sure if we want to expose it at this high a level.

I suggest we start leaving it out of ParquetReadOptions because I think that interface is already somewhat confusing (see https://github.com/apache/arrow-datafusion/pull/2988) and if someone need the API we/they can add it

alamb avatar Jul 31 '22 21:07 alamb

Looks interesting, If I am correct I should be able to replace my impl of ObjectStore with this function that provides AsyncFileReader. The only thing that is left is that I would need to pass custom attributes through ObjectMeta, in my last proposal I have suggested such additional field custom_attributes: Option<Box<[u8]>>. We can do that through either ObjectMeta or PartitionedFile, or just embed that field in PartitionedFile but we could pass some other wrapper to AsyncFileReader that would wrap ObjectMeta along with this custom field.

What do you think ?

kamilkonior avatar Aug 01 '22 08:08 kamilkonior

I should be able to replace my impl

Precisely

custom_attributes

Could you not just pass these into the ParquetOpenFn on construction, I had understood them to be session related and not per-object?

tustvold avatar Aug 01 '22 08:08 tustvold

Looks good. I would however advice you to NOT use Box<dyn Fn...> but Box<dyn Opener> with a new trait Opener because this later allows you to more easily add new functions (like as_any(&self) -> dyn Any, or name(&self) -> &'static str).

crepererum avatar Aug 01 '22 08:08 crepererum

I should be able to replace my impl

Precisely

custom_attributes

Could you not just pass these into the ParquetOpenFn on construction, I had understood them to be session related and not per-object?

It depends on how far I would take this solution. You are right that I have per session parameters, but also my catalog service produces listing as binary stream. If I could pass entry through Box<[u8]> instead of Path, then I am able to perform O(1) access into fields. It would be nice to have, because otherwise I would have to eagerly compose Path just after listing and then within my tiered storage impl I would have to parse components from Path.

kamilkonior avatar Aug 01 '22 09:08 kamilkonior

I have another idea, instead of keeping metadata in partitions, we could emit Opener's where they could work as self contained handles.

partitions: Vec<Vec<Box<dyn Opener>>>

then trait could look like this

trait Opener {
    fn open() -> Result<Box<dyn AsyncFileReader>>;
}

Pros:

  • self contained
  • fully customizable

Cons:

  • box allocations

What do you think?

kamilkonior avatar Aug 01 '22 10:08 kamilkonior

How does the following sound:

  • Take PartitionedFile in ParquetOpenFn instead of just ObjectMeta (or whatever it ends up getting called)
  • Add extension_meta: HashMap<String, String> to PartitionedFile

we could emit Opener's where they could work as self contained handles.

I think this is definitely possible, but there is a lot of logic, e.g. partition pruning, explain plans, plan serialization, etc... not to mention tests that are reliant on the files not being opaque. It would be a fairly involved change

tustvold avatar Aug 01 '22 11:08 tustvold

I see, I didn't account for that It will require so large rework. Sure such approach with passing extension_meta through PartitionedFile will work for me, but could we allow different types of MetadataExt ? Like in example below. That would allow me to simply reinterpret pointer behind box, without having to parse components.

pub enum MetadataExt {
    Array(Box<[u8]>),
    Map(HashMap<String, String>)
}

extension_meta: MetadataExt

kamilkonior avatar Aug 01 '22 12:08 kamilkonior

Sure, I have no objection to Box<[u8]> I just thought a HashMap might be easier to use. I'm sure you're aware, but with regards to reinterpret casting, you will need to be very careful with alignment 😅

tustvold avatar Aug 01 '22 12:08 tustvold

@tustvold have you started working on this issue ? If not then I have made just few initial steps toward implementing that

kamilkonior avatar Aug 06 '22 09:08 kamilkonior

Closed by https://github.com/apache/arrow-datafusion/pull/3051

tustvold avatar Mar 24 '23 09:03 tustvold