parallelism of independent streams
Hello, thanks for the great library!
I'm making a program to read a file, get some information out and then write it to another file. Since i like to process a whole directory of such files i was wondering which function to use to parallelize so that each file gets processed in a separate OS thread and have 100% CPU utilization on all cores.
parMap comes to mind, but perhaps there is something in streamly that i should use instead? I looked at WAsyncT. That one goes in round-robin fashion on an element of a stream. In case i read a file byte by byte (or chunk by chunk) it would open all the files and then process all chunks from all files (from how i understood it from the documentation).
I rather would have that it finishes a file first before it's get pushed in a waiting state (because the CPU core is busy). What i mean by that if core 1 to core 4 is busy handling file 1 to 4, i don't want file 1 to go in waiting state to start handling file 5. I would like to any of the files (1 to 4) to finish before starting on file 5. ... (or file 1 to 8 if hyperthreading is enabled).
The reason for this is firstly to guarantee full CPU load and less waiting time. And secondly that fully processed files come available earlier.
As additional question i wonder if there is a way to tell streamly in which order to start picking up files. Perhaps it's useful to optimize the order of files to process so that all of them finish as soon as possible. Likely this would simply be to start with the files from biggest to smallest (last).
Please take a look at this example. This example traverses the directory tree concurrently:
let stream = S.concatMapTreeWith ahead listDir (S.yieldM $ return (Left "."))
The above snippet uses the ahead style (speculative) concurrency to read directories concurrently and give you a stream of the contents. However, if you do not have too many files it may be faster to read them serially.
If you do not want to read directories recursively you can just use toFiles
Now that you have the file stream you can process the files concurrently:
S.drain $ aheadly $ S.mapM processFile stream
You can also use asyncly instead of aheadly if you do not care about the order of results. mapM is your parMap.
If you want to process biggest files first then you will have to first sort the stream based on the file size. Note that sorting means buffering the full stream in memory. If you have too many files (e.g. millions) it may take significant amount of memory.
I'm tripping over the wording of the following:
The Semigroup operation for WAsyncT interleaves the elements from the two streams.
I assume here, that since List is a semigroup, i could take the example [one file, another file] (two files = two streams), and then add additional files which would be yet another stream.
Then what is "the element" of "a stream"?
You mentioned concurrently / concurrency a couple of times, but as far as i know a good concurrency story doesn't say much about parallelism or maxed out CPU load.
--
About the toFiles function you mention i wonder if it throws exceptions. What happens if the directory can not be read from? I was providing streamly with a list of already opened handles to files to make this error handling more explicit.
Then what is "the element" of "a stream"?
Not sure I understand the question fully. A stream is a sequence of actions each of which results in an output element. WAsyncT would run one action from each constituent stream and emit the resulting elements in the output stream.
Streamly automatically scales to utilize the CPU, please see the concurrency control parameters in the Streamly module for better control over concurrency.
toFiles uses getDirectoryContents from the directory package, it would throw any exceptions that getDirectoryContents may throw. Note, toFiles gives you a stream of file names. You can then concatMap a file reading operation on that stream.