vak
vak copied to clipboard
ENH: Out of memory when preparing the dataset (computing spectrograms)
Problem
I'm prepping a big dataset (15 GB of WAV). When computing the spectrograms I run out of memory with the error: concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.
Suggestion
This can be fixed by limiting the number of partitions used by dask.bag
to 20. I.e. changing line 229 in audio.py
from bag = db.from_sequence(audio_files)
to bag = db.from_sequence(audio_files, npartitions=20)
fixes the problem (setting partition_size
would probably also work). The best would be to automatically figure out the number of workers and memory available and set npartitions
/partition_size
accordingly. Unfortunately, I cannot see a way to do that without additional dependencies.
An alternative would be to add the number of workers to the [PREP]
section in config.toml
.
Hi @kalleknast, sorry you're running into this issue. Thank you for taking the time to provide a detailed suggestion on how to deal with it.
An alternative would be to add the number of workers to the [PREP] section in config.toml.
This seems like the easiest thing to do that would have the least impact on anyone else. I would be open to adding this option to [PREP]
. You're imagining something like this?
[PREP]
...
npartitions = 20
A more standard .toml config (as described in #345) could have a vak.prep.dask
table with npartitions
and partition_size
options, but just adding an npartitions
would be a good-enough solution for now.
This can be fixed by limiting the number of partitions used by dask.bag to 20. I.e. changing line 229 in audio.py from bag = db.from_sequence(audio_files) to bag = db.from_sequence(audio_files, npartitions=20) fixes the problem (setting partition_size would probably also work)
This workaround let you finish prep
ing the dataset?
I am hesitant to set a lower number like 20 as a default since dask
defaults to "about 100", see https://docs.dask.org/en/stable/bag-creation.html?highlight=npartitions#db-from-sequence. It's not clear to me if lowering this number to 20 would slow down parallel processing for people that might have data with different characteristics (e.g. smaller audio files).
The best would be to automatically figure out the number of workers and memory available and set npartitions/partition_size accordingly. Unfortunately, I cannot see a way to do that without additional dependencies.
Could you point me to what you are looking at for automatic optimization of npartitions
? If it's just something like depending on dask[all]
instead of dask[bag]
I could be fine with that but need to think it over.
Thanks again, happy to help make this easier for you. I can add the npartitions
option myself or accept a pull request if you are interested.
Dask uses the multiprocessing package (i.e. no extra dependencies). We can get the number of cups from that: multiprocessing.cpu_count()
.
In audio.py
:
adding to line 4:
import multiprocessing
and changing lines 229-230 to:
npartitions = multiprocessing.cpu_count() - 1
bag = db.from_sequence(audio_files, npartitions=npartitions)
Preparing 14.7 GB of WAVs to 83.2 GB of spectrograms took 11m 46s.
I tried by setting partition_size
to 3% of the available memory too. Increasing the partition_size
beyond 3% of available memory results in out of memory errors.
import psutil
partition_size = int(.03*psutil.virtual_memory().available)
bag = db.from_sequence(audio_files, partition_size=partition_size)
However, the computation took way longer 32m 20s instead of 11m 46s.
Thank you @kalleknast this is helpful.
I am working on understanding what's going on here a little better.
Can I ask you to do a couple things so I can help you?
- [ ] replicate your original error and then paste the full traceback here so I can get a better sense of what's going on when it crashes? E.g., formatted with triple backticks so it's searchable and reads as console output at the same time
- [ ] give me some measure of the size of your individual audio files and how many files there are. The times and total sizes are very helpful but it would also be really good to know if there's some difference in individual file size that makes it necessary to change
npartitions
orpartition_size
Also can I ask what operating system you are on?
I'm wondering if it's Windows, which in the past is the platform where I have run into more dask
errors. I searched dask
issues and this is the only one where I found the same error message, and it's Windows related:
https://github.com/dask/dask/issues/8506
Nothing against Windows--I would love the code to work there--I'm just trying to get to the root of the error
System: Ubuntu 20.04.5 LTS with 62.7 GM memory
Files: 107 wav files sampled at 96kHz with sizes between 13.4 and 212.6 MB (most around 130 MB)
Traceback:
$ vak prep train_config.toml
2022-10-14 22:59:43,745 - vak.cli.prep - INFO - Determined that purpose of config file is: train.
Will add 'csv_path' option to 'TRAIN' section.
2022-10-14 22:59:43,745 - vak.core.prep - INFO - purpose for dataset: train
2022-10-14 22:59:43,745 - vak.core.prep - INFO - will split dataset
2022-10-14 22:59:44,315 - vak.io.dataframe - INFO - making array files containing spectrograms from audio files in: data/WAV
2022-10-14 22:59:44,319 - vak.io.audio - INFO - creating array files with spectrograms
[ ] | 0% Completed | 12.69 sms
Traceback (most recent call last):
File "/home/hjalmar/callclass/bin/vak", line 8, in <module>
sys.exit(main())
File "/home/hjalmar/callclass/lib/python3.8/site-packages/vak/__main__.py", line 45, in main
cli.cli(command=args.command, config_file=args.configfile)
File "/home/hjalmar/callclass/lib/python3.8/site-packages/vak/cli/cli.py", line 30, in cli
COMMAND_FUNCTION_MAP[command](toml_path=config_file)
File "/home/hjalmar/callclass/lib/python3.8/site-packages/vak/cli/prep.py", line 132, in prep
vak_df, csv_path = core.prep(
File "/home/hjalmar/callclass/lib/python3.8/site-packages/vak/core/prep.py", line 201, in prep
vak_df = dataframe.from_files(
File "/home/hjalmar/callclass/lib/python3.8/site-packages/vak/io/dataframe.py", line 134, in from_files
spect_files = audio.to_spect(
File "/home/hjalmar/callclass/lib/python3.8/site-packages/vak/io/audio.py", line 236, in to_spect
spect_files = list(bag.map(_spect_file))
File "/home/hjalmar/callclass/lib/python3.8/site-packages/dask/bag/core.py", line 1480, in __iter__
return iter(self.compute())
File "/home/hjalmar/callclass/lib/python3.8/site-packages/dask/base.py", line 315, in compute
(result,) = compute(self, traverse=False, **kwargs)
File "/home/hjalmar/callclass/lib/python3.8/site-packages/dask/base.py", line 600, in compute
results = schedule(dsk, keys, **kwargs)
File "/home/hjalmar/callclass/lib/python3.8/site-packages/dask/multiprocessing.py", line 233, in get
result = get_async(
File "/home/hjalmar/callclass/lib/python3.8/site-packages/dask/local.py", line 500, in get_async
for key, res_info, failed in queue_get(queue).result():
File "/usr/lib/python3.8/concurrent/futures/_base.py", line 437, in result
return self.__get_result()
File "/usr/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
raise self._exception
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.
That's perfect, thanks so much. Will keep doing homework and let you know what I find out in the next couple of days
Just updating that I asked a question in the dask forum here: https://dask.discourse.group/t/how-to-troubleshoot-optimize-n-partitions-partition-size-for-dask-bag/1216/2
Hi again @kalleknast just checking back -- I never got a reply on the dask
forum.
Any chance you'd be willing to share data so I could replicate and test some of the different approaches here? I don't have any audio files that are quite that size right at hand although I could probably find some -- I think some of the USV files end up being pretty big because of the high sampling rate, e.g. https://zenodo.org/record/3428024
You could contact me at nicholdav at gmail if that's easier. Thanks!
Hi @NickleDave. I just sent you a link to a part of the data set.
Awesome, thank you @kalleknast!
I will download this weekend to verify I can access, and see if I can replicate the issue on my machine.
I'm catching up on the backlog here and a bit busy at my day job right now, but hoping to get to this over the Thanksgiving holiday at the latest.
I will ask you to test any fix on your machine and of course add you as a contributor if we end up merging something.
Very appreciated!
Confirming I was able to download the files and replicate this issue, thank you @kalleknast -- I've got this on the to-do list for features/enhancements now
Hi @kalleknast just want to let you know we didn't forget about you. This is still on the to-do list.
@yardencsGitHub has me working on other things (https://github.com/vocalpy/vak/pull/605) but this affects his group and we know it's an issue for people working with large files due to high sample rates, e.g. bat calls. So I will get to it
Hi @NickleDave
Try my fix on many small files. It works fine on my large files. It should be good if it works on small too.
In audio.py
:
import multiprocessing
replace line 229 in the original
bag = db.from_sequence(audio_files)
with
npartitions = multiprocessing.cpu_count()
bag = db.from_sequence(audio_files, npartitions=npartitions)
Thank you @kalleknast I will test this.
Should have a chance this weekend. If it works fine I will just add the option in the config as we discussed and raise an issue to revisit in more detail in version 1.0.
That way we can get things working for you sooner.
@kalleknast just icymi I did release a version 0.8.0 today that includes this https://github.com/vocalpy/vak/releases/tag/0.8.0
You should be able to pip install
now and install off conda-forge
sometime tomorrow.
Thanks again for your help with this.
Worked perfectly!
Thanks
Awesome, glad to hear it