deluge icon indicating copy to clipboard operation
deluge copied to clipboard

Convert an existing Stream into deluge

Open arlyon opened this issue 2 years ago • 5 comments

Hi!

I have a stream that I cannot control that I would like to operate over in parallel. What options do I have here?

Cheers

Alex

arlyon avatar Jan 11 '23 09:01 arlyon

Hey @arlyon! A Stream would only give out a single evaluated element at a time (see the poll_next method on the Stream). This means that it's construction itself blocks us from evaluating it in parallel. If you want to do further processing that itself is async, I would suggest collecting the stream into a vector, using into_deluge on that vector and performing some async transformations on the resultant Deluge in a similar fashion to what the docs show.

Is this helpful?

mkawalec avatar Jan 12 '23 12:01 mkawalec

I was imagining something like rayon's par_bridge which I assume eagerly collects n items and distributes them across n threads. Less efficient than starting with a collection in memory, but is able to handle 'very large' datasets much easier. Streams in particular seem useful for this because, in may case, I am receiving and opening a list of AsyncRead streams of unknown length and want to process all of them while reducing the number of file descriptors I have open. I suppose I could collect all the file names from walking the filesystem first, but for I would like to avoid the overhead for very large lists of files.

https://docs.rs/rayon/latest/rayon/iter/trait.ParallelBridge.html

arlyon avatar Jan 13 '23 10:01 arlyon

Good idea, I'll add something similar to the todolist, that converts a Stream into a Deluge without materializing the whole intermediate dataset. Of course you are encouraged to contribute it yourself if you have the capacity.

mkawalec avatar Jan 18 '23 10:01 mkawalec

Closing because I don't believe there is a straightforward way to convert a stream into a Deluge given that a Deluge needs to know how many elements it is going to have when it is being created. But I'll keep on thinking about it :)

mkawalec avatar Mar 15 '23 14:03 mkawalec

I just realized I can use StreamExt::chunks for this exact purpose, hiding it behind a conversion api :D

mkawalec avatar Mar 15 '23 14:03 mkawalec