spikeinterface icon indicating copy to clipboard operation
spikeinterface copied to clipboard

How to parallelise efficiently?

Open fruce-ki opened this issue 9 months ago • 12 comments

Hello!

I have a large backlog of data (many thousands of Maxwell recordings), so I want to take maximum advantage of the computer's's capacity. I have a workstation with 20 Intel cores (ie. 40 "cores") and about 320GB of RAM.

Using the built-in parallelization option

global_job_kwargs = dict(n_jobs = 36, total_memory = '300G', mp_context = 'spawn', progress_bar = False)
si.set_global_job_kwargs(**global_job_kwargs)

does not make use of the provided capacity. Preprocessing for example uses only about max 5% of the CPU capacity and 20% of the RAM capacity. Clearly there are bottlenecks in parallelizing the processing of a single electrode array.

So instead of preprocessing one recording at a time with many cores, I tried processing many recordings at once with one core each, by setting n_jobs=1 and using Parallel() from the joblib package. This works very well in terms of making use of the CPU capacity, but inevitably causes a crash sooner or later.

For the actual sorting step I just use the provided run_sorter_jobs(engine_kwargs = {'n_jobs': 36}) with each joblist item having 'job_kwargs' : {'n_jobs': 1}`, and this at least does use the available capacity well.

I am not a python expert nor a computer scientist. I also don't know the internal designs and limitations of spikeinterface. So I am not getting anywhere trying to troubleshoot this by myself.

My hope is that you might have some insight on how I could get through preprocessing and postprocessing a very large number of recordings more quickly. Thank you in advance!

fruce-ki avatar Feb 28 '25 11:02 fruce-ki

Is your data stored on a SSD or a HDD ? If the preprocessing isn't using as much resources as you allow it to, your bottleneck might be your disk's read speed.

Same goes for manually using joblib to iterate over multiple files, your process will try to access many files at once but the maximum access speed will remain the same.

b-grimaud avatar Mar 03 '25 09:03 b-grimaud

The files are staged to an SSD.

The reason I am asking is that joblib does maximise CPU usage, so there is untapped potential there. My problem with using joblib onto spikeinterface functions is that the child processes die, due to some thread management issues and memory leaks/overflows.

fruce-ki avatar Mar 03 '25 10:03 fruce-ki

Hi,

run_sorter_jobs() using joblib backend is not a super great idea. It was implemented long time ago and should be removed in my point of view. On spikeinterface side we cannot handle how the sorter is spawning and/or using thread and playing with GPU. So using joblib has engine to has severa worker on the same machine will lead to nested parralelisation (and you want to avoid this)

run_sorter_jobs() is more usefull with slurm for instance.

For pre/post processing our internal machanism of parralelisation is quite flexible and should be efficient. In short with high CPU count you should be able to staturate your read/write ability on your disk if it is a newtworks drive.

Be also aware that :

  • recently you also have pool_engine="process"/"thread" in job_kwargs
  • you can also play with max_threads_per_worker in case you are using processes
  • the process works better onlinux than windows, the spawn has a high cost at start.
global_job_kwargs = dict(pool_engine="process", n_jobs = 36, total_memory = '300G', mp_context = 'spawn', progress_bar = False)
si.set_global_job_kwargs(**global_job_kwargs)

samuelgarcia avatar Mar 04 '25 12:03 samuelgarcia

@samuelgarcia I agree with you on avoiding joblib. But in principle using total_memory="300G" should get the RAM usage close to the requested amount. Internally, this works by setting the chunk size so that chunk_size * n_jobs ~ total memory.

If that is not the case, we might have a bug.

Can you try to instead of using total memory, setting the n_jobs and chunk_duration? For example:

global_job_kwargs = dict(pool_engine="process", n_jobs = 36, chunk_duration = '10s', mp_context = 'spawn', progress_bar = False)
si.set_global_job_kwargs(**global_job_kwargs)

Should use 10 times the default RAM, since default duration is 1s

alejoe91 avatar Mar 05 '25 10:03 alejoe91

At the moment I have decoupled preprocessing, sorting, and postprocessing from one another, to run them independently instead of all in one go. Let's focus on just preprocessing. run_sorter_jobs() works well in terms of CPU capacity on its own anyway, so I am not inclined to wrap it in joblib.

And just to be clear, this is a single workstation. All processing and storage are local to the machine, and it is a Linux machine.

pool_engine="process" is not a recognised kwarg in the version I have installed (0.101.2).

Filling memory capacity does not seem to be a problem. In fact the opposite is true, it exceeds the expected usage, by a lot.

I was monitoring a run with 20 wells preprocessing wrapped in joblib, with 1 core and 12GB each. So I expected 50% CPU capacity and 240G (20 * 12) RAM capacity. But the actual RAM usage was peaking repeatedly at 320GB (100%).

I tried a run without the joblib workaround: n_jobs=30, mp_context+'spawn', max_threads_per_process=1, chunk_memory='1G', total_memory=None for a single well. The resource monitor clocked about 50% CPU capacity usage, but it did use the 30 cores (just not to 100%), after an initial single-core phase. So I think it works as you intended, minus some bottleneck steps. By contrast I expected 30G RAM usage (a bit under 10%), but on the resource monitor it slowly climbed all the way to 30%.

Something there is not right with the memory usage. It might be the reason I experience so many crashes every time I try to scale up my processing, as it consumes a lot more memory than I allocate to it.

fruce-ki avatar Mar 05 '25 14:03 fruce-ki

Oh, I just discovered that using less chunk_memory results in more cores. If I assign 10G per chunk, I get maybe 5 of the 30 assigned cores. If I assign 500M instead, all 30 cores come into play properly. Interesting. Maybe I really need to rethink how much memory is worth allocating. (the RAM footprint is still much larger than the parameters would indicate)

fruce-ki avatar Mar 05 '25 16:03 fruce-ki

Hey guys, I've also been running into some memory issues during preprocessing, with a similar result that preprocessing seems to use much more memory than allocated with the total_memory parameter. I'm working on a linux machine with 32 cores and 128GB RAM, and am running out of memory trying to filter, downsample and extract LFP traces for ~20GB Neuropixels recordings. Would be super interested to hear if you managed to find an optimal job parameter set for your recording @fruce-ki, or if anyone has got further investigating this since this was raised?

jakeswann1 avatar Jul 30 '25 09:07 jakeswann1

Like I said in my last comment above, I just allocate 500M or maybe even 100M chunk memory and that scales well on my 40 virtualcore 320GB machine. I don't bother with the total memory parameter as they both ultimately control the same thing. And that thing is not the total memory used overall by the transient calculations, but rather the amount of data loaded at any given time. the more data you load, the bigger all the calculated arrays also become. So just play around with the chunk size and monitor the resources, until you get a value thatallows you to scale up to the number of workers you want to use.

Also, I am now using a version newer than 0.101.2, ie cloned the main branch at the time, where a number of resource related bugs where fixed.

fruce-ki avatar Jul 30 '25 10:07 fruce-ki

That's super helpful, thanks. I'm finding that when I call get_traces with some preprocessing chain, my peak traced memory usage ends up being some integer multiple of the requested total trace size, increasing the more preproessing steps are added, up to like 9x with my full preprocessing pipeline. Is this expected? It was sort of my understanding that the lazy loading and chunk-based processing meant that you wouldn't need to hold many full copies of the data in memory at once, though maybe I've misunderstood how this actually works under the hood. I'd rather not have to write to and from the disk between each processing step, so I'd appreciate any advice on how to handle trace extraction like this. Is it the case that I need to handle running the preprocessing across chunks of the recording separately myself, or is there a way to handle this built-in? My main use case here is extracting and saving downsampled and filtered LFP

jakeswann1 avatar Jul 30 '25 10:07 jakeswann1

I'm not a developer of the package, I have no idea what's under the hood. I am also not familiar with your processing steps. I work with Maxwell recordings and extracting traces is't in my explicit steps.

But I suspect that you assign every step to a new variable, since it scales up with the steps. And if you are not saving stuff to disk, then it needs to be in RAM. There is no other way to remember a value, it is either in RAM or on disk.

fruce-ki avatar Jul 30 '25 11:07 fruce-ki

Currently, despite the fact that preprocessing chains are lazy, i.e. data are only loaded when there is a get_traces() call, every preprocessing step will have its own buffer of data. Therefore, if you have 5 steps in your chains, you'll end up with 5 copies of your traces in memory. This is on the to-do list to only create a single buffer, and make all these operations apply in place within this buffer. This might explains your memory scaling. However, I'm surprised by your memory usage. Usally, for all preprocessing steps, loading 1s of data is fine, and loading larger chunks might not necessarily speed up things.

yger avatar Jul 30 '25 12:07 yger

Thanks both! This is very useful to understand. Not sure if it's useful, but I'll put the results of some of my profiling experiments here for reference:

Using the preprocessing chain:

si.set_global_job_kwargs(n_jobs=-2) #31 processes, default chunk size

recording = se.read_openephys(recording_path)

recording_filtered = spre.bandpass_filter(recording, 0.1, 300)
recording_downsampled= spre.resample(recording_filtered, 1000)

traces = recording_downsampled.get_traces(start_frame=0, end_frame=30000)

I get the following memory usage profile:

=== System Info ===
Total memory: 125.5 GB
Available memory: 121.1 GB
Current usage: 3.6%

[Loading recording] Starting memory: 187.4 MB
  Recording shape: 384 channels x 27093596 samples
  Sampling rate: 30000 Hz
  Data type: int16
[Loading recording] Complete:
  Final memory: 187.8 MB
  Memory change: +0.4 MB
  Peak memory: 187.4 MB
  Peak increase: +0.0 MB
  Duration: 0.10 seconds

=== Expected traces sizes for 10s ===
Downsampled (1000Hz): 7.3 MB (384 channels x 10000 samples)
Original (30000Hz): 219.7 MB (384 channels x 300000 samples)
Compression ratio: 30.0x smaller after downsampling

[Bandpass filtering] Starting memory: 187.8 MB
[Bandpass filtering] Complete:
  Final memory: 187.8 MB
  Memory change: +0.0 MB
  Peak memory: 187.8 MB
  Peak increase: +0.0 MB
  Peak vs downsampled traces: 0.0x
  Peak vs original traces: 0.0x
  Duration: 0.10 seconds

[Downsampling to 1kHz] Starting memory: 187.8 MB
[Downsampling to 1kHz] Complete:
  Final memory: 187.8 MB
  Memory change: +0.0 MB
  Peak memory: 187.8 MB
  Peak increase: +0.0 MB
  Peak vs downsampled traces: 0.0x
  Peak vs original traces: 0.0x
  Duration: 0.10 seconds

[Extracting traces (10s)] Starting memory: 187.8 MB
  Traces shape: (10000, 384)
  Actual traces size: 7.3 MB
  Predicted vs actual: 7.3 vs 7.3 MB
  Data type: int16
[Extracting traces (10s)] Complete:
  Final memory: 195.6 MB
  Memory change: +7.8 MB
  Peak memory: 2408.8 MB
  Peak increase: +2221.0 MB
  Peak vs downsampled traces: 303.2x
  Peak vs original traces: 10.1x
  Duration: 7.42 seconds

=== Final memory usage: 195.6 MB ===```

Whereas, saving to .zarr after spre.resample, then loading from .zarr before calling get_traces(), I get the following profile:

=== System Info ===
Total memory: 125.5 GB
Available memory: 121.1 GB
Current usage: 3.6%

[Loading recording] Starting memory: 173.7 MB
  Recording shape: 384 channels x 27093596 samples
  Sampling rate: 30000 Hz
  Data type: int16
[Loading recording] Complete:
  Final memory: 179.5 MB
  Memory change: +5.8 MB
  Peak memory: 173.7 MB
  Peak increase: +0.0 MB
  Duration: 0.10 seconds

=== Expected traces sizes for 10s ===
Downsampled (1000Hz): 7.3 MB (384 channels x 10000 samples)
Original (30000Hz): 219.7 MB (384 channels x 300000 samples)
Compression ratio: 30.0x smaller after downsampling

[Bandpass filtering] Starting memory: 179.5 MB
[Bandpass filtering] Complete:
  Final memory: 179.5 MB
  Memory change: +0.0 MB
  Peak memory: 179.5 MB
  Peak increase: +0.0 MB
  Peak vs downsampled traces: 0.0x
  Peak vs original traces: 0.0x
  Duration: 0.10 seconds

[Downsampling to 1kHz] Starting memory: 179.5 MB
write_zarr_recording 
engine=process - n_jobs=31 - samples_per_chunk=1,000 - chunk_memory=750.00 KiB - total_memory=22.71 MiB - chunk_duration=1.00s
[Downsampling to 1kHz] Complete:
  Final memory: 181.7 MB
  Memory change: +2.2 MB
  Peak memory: 181.3 MB
  Peak increase: +1.8 MB
  Peak vs downsampled traces: 0.2x
  Peak vs original traces: 0.0x
  Duration: 132.72 seconds

[Extracting traces (10s)] Starting memory: 181.7 MB
  Traces shape: (10000, 384)
  Actual traces size: 7.3 MB
  Predicted vs actual: 7.3 vs 7.3 MB
  Data type: int16
[Extracting traces (10s)] Complete:
  Final memory: 194.9 MB
  Memory change: +13.2 MB
  Peak memory: 181.7 MB
  Peak increase: +0.0 MB
  Peak vs downsampled traces: 0.0x
  Peak vs original traces: 0.0x
  Duration: 0.10 seconds

=== Final memory usage: 194.9 MB ===

Makese sense that there are likely to be a few copies of the array made in memory for the various preprocessing steps, but 10 seems like quite a lot for just a filter + resample? I'll go ahead with saving to, and loading from, disk between preprocessing and getting traces for memory reasons, but hopefully this helps identify if the behaviour is intended! Happy to do some more testing if that would be useful for anyone

jakeswann1 avatar Jul 30 '25 13:07 jakeswann1