strax
strax copied to clipboard
Single threaded alternative processor
What is the problem / what does the code in this PR do This adds a single-threaded alternative processor backend that avoids the mailbox system and uses less memory.
Strax has a custom-built concurrency backend (the 'mailbox system'). It works, but it has problems and we hope to eventually replace it. @jmosbacher has done much work towards this; see also the discussion in #81.
A single-threaded processor won't work for the DAQ, but it could help reprocessing, analysis, and debugging:
- Use less memory, by only reading in data when it is strictly needed and deleting it as soon as possible. Mailboxes with max_workers=1 / lazy mode do a reasonable job at this already, so the difference should not be dramatic in practice.
- Avoid hangs, deadlocks, and errors from mailboxes. There are places in strax where exceptions don't get reported but instead cause a mailbox hang, which is annoying during development.
- Make it easy to track how much time is spent on a plugin. We could add a printout of how long each plugin took, as we had in pax. (Profilers also give you that info and much besides, but you don't enable them in production as they slow things down.) Even without that, it is easier to read and reason about the debug log, since you know only one thing happens at a time.
- Provide some alternative processor, available as a fallback and minimal reference implementation for fancier processing backends.
Can you briefly describe how it works?
I started from a commit from Yossi's #410 to allow processors to be selected at runtime by the context. Currently BaseProcessor
is a bit empty, we can see if there is more we can generalize to there.
The SingleThreadProcessor
is an alternative to the ThreadedMailboxProcessor
. To keep the processor classes comparable, I put most of the mailbox-replacing logic into a PostOffice
class. There is only one PostOffice
instance per processor, it handles all the data types ('topics'), and it is much simpler than a single mailbox since it needs no threading or locking. We could eventually split this off into a separate package, maybe together with mailboxes.
The trickiest part was dealing with rechunking in savers. The current code assumed it had its own thread available to independently pull on an iterator in save_from
. In single-threaded processing we instead have to send chunks to _save_chunk
individually (as we do in multiprocessing), so then the rechunking logic is skipped. I thus factored it out into a separate Rechunker
class and wrapped that around the Saver when we are single-threading. Might be useful to have the rechunking logic separate anyway.
For testing, I let some tests in test_core run on both processors, and there are some asserts. Maybe you'd like to see some dedicated unit tests and docs as well, let me know.
The default processor is not changed, you have to add processor='single_thread'
to your get_array/get_df/etc call to switch to the single-threaded processor. I would propose we test it in some reprocessing jobs first, if it works, we could then make it the default for max_workers=1
.
Can you give a minimal working example (or illustrate with a figure)? This shows the mprof output for the full processing of a tiny (~90 second) background run, starting from lz4 raw records on my laptop, with all needed online resources already downloaded. First for the current mailbox processing:
st.make('026195', 'event_basics')
and for single-threaded processing:
st.make('026195', 'event_basics', processor='single_thread')
For reviewing, note the number of lines changed is deceptively large. The stuff in strax.processor
basically just got moved to strax.processors.threaded_mailbox
. I kept strax.mailbox
in place since mailboxes also used directly in the rechunker script. (And strax.processor
is now present only to keep an old straxen test running that imports from it directly.)