datafusion-tui icon indicating copy to clipboard operation
datafusion-tui copied to clipboard

Add `IoObjectStore` that uses main runtime for network requests

Open matthewmturner opened this issue 1 year ago • 25 comments

Add's a wrapper ObjectStore that uses spawn_io / the main tokio runtime for making network requests instead of the DedicatedExecutor which should only be used for CPU bound work.

matthewmturner avatar Nov 19 '24 15:11 matthewmturner

@alamb FYI

matthewmturner avatar Nov 19 '24 15:11 matthewmturner

Just want to add a test for this then plan to merge it

matthewmturner avatar Nov 20 '24 15:11 matthewmturner

Just want to add a test for this then plan to merge it

I am making great progress on the example

  • https://github.com/apache/datafusion/pull/13424

I als plan to yoink this code too

alamb avatar Nov 20 '24 15:11 alamb

@alamb do you envision having the dedicated executor eventually live in the main datafusion repo? If not (or even as an interim solution while its iterated on) I could imagine having a library for dft that exports the DedicatedExecutor and IoObjectStore so others can more easily use it.

I.e. an example is great to help understand and see how this part of execution works - but exposing the functionality directly should make it easier to just pick up and use.

matthewmturner avatar Nov 20 '24 16:11 matthewmturner

@alamb do you envision having the dedicated executor eventually live in the main datafusion repo? If not (or even as an interim solution while its iterated on) I could imagine having a library for dft that exports the DedicatedExecutor and IoObjectStore so others can more easily use it.

I envision DedicatedExecutor and IoObjectStore being directly in DataFusion (I can't really write the example without them :) )

Stay tuned 📺

alamb avatar Nov 20 '24 16:11 alamb

This is problematic, as the returned GetResult contains a stream that will then "migrate" across the runtime boundary. It is at best undocumented what the implications of this are. Other issues will be found with things like put_multipart and friends.

There is some further context on here - https://github.com/apache/arrow-rs/pull/4040#issuecomment-1724082933

I would strongly encourage implementing the IO dispatch at an application boundary that makes, e.g. AsyncFileReader, the ObjectStore trait is not designed to allow it to be shimmed in this way.

tustvold avatar Nov 20 '24 16:11 tustvold

I would strongly encourage implementing the IO dispatch at an application boundary that makes, e.g. AsyncFileReader, the ObjectStore trait is not designed to allow it to be shimmed in this way.

I don't fully understand this - perhaps we can discuss in the context of my example. I am pretty close to having something to show for discussion

Specifically my vision is to have an example that shows how to run a DataFUsion plan with CPU on one threadpool and IO on another.

alamb avatar Nov 20 '24 16:11 alamb

I don't fully understand this - perhaps we can discuss in the context of my example

Rather than wrapping the ObjectStore trait, you would wrap the calls within DF that interact with ObjectStore (or any other interface that performs IO), e.g. AsyncFileReader, the internal methods of ListingTable, etc... The problem arises because a number of the APIs exposed by ObjectStore are exposed as streams, in some cases, e.g. GetResult the behaviour of tokio may sort of work (https://github.com/apache/arrow-rs/pull/4040#discussion_r1161309686), but others like list will simply not work correctly - polling the stream will potentially initiate a new request on the calling runtime.

Ultimately DF needs to separate IO based activities from CPU-based activities, and overloading this onto the ObjectStore interface won't work reliably.

tustvold avatar Nov 20 '24 16:11 tustvold

Ultimately DF needs to separate IO based activities from CPU-based activities, and overloading this onto the ObjectStore interface won't work reliably.

Thank you -- I have some ideas about this. Will keep working (I am thinking of some way to move the work of one stream to another runtime)

alamb avatar Nov 20 '24 16:11 alamb

@alamb perhaps this is relevant https://stackoverflow.com/a/78094264 (answer from Alice Rhyl)

matthewmturner avatar Nov 20 '24 17:11 matthewmturner

Or you could just spawn IO at a non-streaming application boundary... This is simpler and much easier to reason about... But I guess I am now repeating myself...

Edit: TBC the TCPStream is deep in the guts of our HTTP client, there aren't APIs that would allow us to move it, nor would it make sense given the whole purpose is to keep IO off the DF threads

tustvold avatar Nov 20 '24 18:11 tustvold

thanks @tustvold, appreciate your insight on this. need to digest implications of this a bit more to figure out next steps. It's not immediately clear to me if interfaces like AsyncFileReader are tied to parquet (i believe thats just in the parquet crate - im hoping this will work in a way that is agnostic to underlying file type. I havent had the time to look much into this yet though so maybe i just need to spend more time with it.

matthewmturner avatar Nov 20 '24 18:11 matthewmturner

im hoping this will work in a way that is agnostic to underlying file type

Parquet will likely need to be handled separately from CSV/JSON/Avro because of the very different IO pattern, but the IO for those three should be similar enough to allow sharing. It has been long time since I worked on it, but I at least remember extracting a common FileStream trait or something similar. I'd hope something similar would be possible for the write side.

tustvold avatar Nov 20 '24 18:11 tustvold

i am going to mark this as draft for the time being while these more fundamental issues are resolved.

matthewmturner avatar Nov 21 '24 14:11 matthewmturner

My hope/plan is to figure out a good way to properly separate IO/Compute in the context of

  • https://github.com/apache/datafusion/pull/13424

I hope to have a chance to work on it more tomorrow / over the weekend

alamb avatar Nov 21 '24 14:11 alamb

I'm super interested in the outcome here. I've got a production flight server that I'm trying to replace w/ flight sql + DataFusion, but I keep hitting executor starvation issue and have been tracking the conversation around dedicated runtimes closely. I ported DedicatedExecutor and CrossRtStream from Influx last week (good timing I know) and tried the IoObjectStore approach yesterday since it was super easy to hack in. Results are better but I am still hitting starvation when data transfer passes a couple hundred MB.

That is all to say, if I can help test any approaches, lmk.

djanderson avatar Nov 21 '24 17:11 djanderson

@djanderson just to confirm i understand - is the dedicated executor being starved / the IO it needs to perform cant proceed? or the main runtime is being starved?

matthewmturner avatar Nov 21 '24 21:11 matthewmturner

I believe that the main runtime is being starved because it's manifesting itself as dropped network connections, either between MinIO and the flight sql server (more often) or between the flight sql server and the python client I'm using to test.

That said I just got tokio-console set up and I'm still learning how to read it. Hopefully I can use it to make sure that I'm in fact getting all my IO running on the main thread via spawn_io and didn't miss some place.

djanderson avatar Nov 21 '24 22:11 djanderson

Any updates, @djanderson? I haven't found an easy way to determine which tasks ran on which runtime in the console. I also took a similar approach to using the DedicatedExecutor at the ObjectStore boundary in my project, but still see periodic S3 timeouts a la: https://github.com/apache/arrow-rs-object-store/issues/272

@alamb, do you have any suggestions on the number of threads to allocate for cpu bound tasks vs IO bound ones relative to number of available cores? I will try lowering the priority of the threads in the CPU runtime to see if that makes a difference.

rohitrastogi avatar Dec 05 '24 09:12 rohitrastogi

@alamb, do you have any suggestions on the number of threads to allocate for cpu bound tasks vs IO bound ones relative to number of available cores? I will try lowering the priority of the threads in the CPU runtime to see if that makes a difference.

I recommend N-1 cores for the CPU bound tasks (so there is always a full core available to do network requests)

Also, it is important that you actually run the stream in the separate runtime.

I have sketched this out in the different_runtime_advanced example here:

https://github.com/apache/datafusion/pull/13424/files#diff-fde0efd9753d1a0d0ebbfb605816c4ea770aa924896bac7ceecdeed2934e491aR146-R211

I am still playing around with making it easier to use the DedicatedExecutor paradigm, which is why I don't have that PR up for review yet

alamb avatar Dec 05 '24 14:12 alamb

FWIW @adriangb reports success using this procedure here:

  • https://github.com/apache/datafusion/pull/13634

alamb avatar Dec 05 '24 14:12 alamb

@rohitrastogi no updates from me, still also seeing the periodic timeouts and also didn't get much insight via tokio-console.

What I'm doing right now is trying to implement some of the file range caching ideas from https://blog.haoxp.xyz/posts/caching-datafusion/, in the hopes that if i cache part of the file before the failure a retry might get me over the line. Still keeping a close eye on this work, appreciate everyone's efforts!

djanderson avatar Dec 05 '24 16:12 djanderson

I took this idea a bit further (and wrote a bunch of tests) as part of this PR in DataFusion:

  • https://github.com/apache/datafusion/pull/14286

alamb avatar Jan 25 '25 13:01 alamb

I took this idea a bit further (and wrote a bunch of tests) as part of this PR in DataFusion:

  • https://github.com/apache/datafusion/pull/14286

Great. I will work on updating to that once it is merged (with a config option). I also have in mind adding some benchmarking / tokio metrics functionality to the flightsql server. I think it will be very interesting to produce some data under the different configurations.

matthewmturner avatar Jan 25 '25 17:01 matthewmturner

I took this idea a bit further (and wrote a bunch of tests) as part of this PR in DataFusion:

Great. I will work on updating to that once it is merged (with a config option). I also have in mind adding some benchmarking / tokio metrics functionality to the flightsql server. I think it will be very interesting to produce some data under the different configurations.

💯 for getting some data -- I am currently working kind of blind here as I don't have a reproducer myself

alamb avatar Jan 26 '25 10:01 alamb