Processpool
Check out this pull request on ![]()
See visual diffs & provide feedback on Jupyter Notebooks.
Powered by ReviewNB
Looking good. Can you please add a bit of documentation to this explaining the background behind what it solves and why (with links to background info), because it's a bit non-obvious!
Will do, thank you.
Ok, parallel is getting out of hand, now with half a dozen interconnected parameters that all affect its behaviour in significant and non-straight-forward ways:
n_workers- Has a side-effect of usingpool.map()in a worker process/thread, or justmap()in the current process.threadpool- Use threads or processes.- Threads in Python are subject to GIL, so only good for IO tasks, but they can be started much faster and you don't need to serialize the data sent between them.
method- Affects the start method for worker processed. Don't use it with threads. Platform-dependent, and unclear when to change it.executor- Use ProcessPoolExecutor or ProcessPool. Does not work when using threads.maxtasksperchild- If usingProcessPool, limit the lifetime of workers.
Besides that, we also provide overloaded ProcessPool, ProcessPoolExecutor and ThreadPoolExecutor. They act like the classes from concurrent.futures and multiprocessing.pool.Pool, but their .map methods will revert to plain map( ) if you start them with max_workers=0. Other methods ignore this suggestion and start 1 worker process/thread. ProcessPool also accepts daemonic=, which controls if worker processes should be daemonic or not.
Speaking of daemonic, what is it all about? For our purposes, just 2 things seem relevant.
- A daemonic process will terminate when the parent process (python interpreter) is terminated. A non-demonic process will persist.
- A daemonic process can not have children, which means you can't use things like Pool or PoolExecutor.
Processes created by
ProcessPoolExecutorare not daemonic (as of 3.8?), whileProcessPoollets you control this property.
I don't think it would a controversial thing to say that parallel() is becoming too complicated. I also don't like that most parameter combinations are invalid. Let me try to simplify it.
First, what is the reason we need 4 different types of pools?
As long as we only use the .map method, there is not much difference between a Pool and a PoolExecutor:
ProcessPoolExecutorsupportstimeout=for.map( ). Workers are always non-daemonic.ProcessPoolsupportsmaxtasksperchildand workers can be daemonic or non-daemonic. Has a lot more methods besides.map(), but we don't use them, and they will ignoremax_workers=0.ThreadPoolExecutorsupportstimeout=for.map( ), uses threads.ThreadPool- Does not supporttimeoutormaxtastaperchild, but has all the methods provided by ProcessPool. Not implemented.
In reality, it looks like most of the features are not being used. At least in fastai code:
ProcessPoolExecutorandThreadPoolExecutorare never used directly, only throughparallel().timeout=is never used (nb 09b passestimeout=tof(), not to.map())threadpoolis used exactly once, again, in nb 09b.
I'd like to propose:
- Drop custom
ProcessPoolExecutorandProcessPoolExecutor, as neither are being used by external code. Inparallel( )do:
if max_workers:
with pool(...) as pl: r = pl.map(...)`
else: r = map(...)
- Drop
threadpool=. Threads work differently and have vastly different semantics when it comes to synchronization, and this option is only used once, and in a context where a process pool would work just as well. - Since we never use
timeout, drop it and always use ProcessPool instead of ProceePoolExecutor. - Since in the past all processes have been no-daemonic, keep them non-daemonic only when using ProcessPool.
- Realistically, the only 2 values for
maxtasksperchildare 1 and unlimited, replace it with an easier-to-understand parameter, likereuse_workers=False. methodis extremely finicky. Maybe we could drop it in favour of heuristics?reuse_workers=Trueinstead ofmaxtasksperchild
Proposed interface for parallel:
#before:
parallel(f, items, *args, n_workers=defaults.cpus, total=None, progress=None, pause=0,
method=None, threadpool=False, chunksize=1, maxtasksperchild=None, **kwargs)
#after:
parallel(f, items, *args, n_workers=defaults.cpus, total=None, progress=None, pause=0,
chunksize=1, reuse_workers=True, **kwargs)
P.S: I've also noticed that we default to using "fork" on macOS. The documentation says:
Changed in version 3.8: On macOS, the spawn start method is now the default. The fork start method should be considered unsafe as it can lead to crashes of the subprocess. See https://bugs.python.org/issue?@action=redirect&bpo=33725
At the bottom of the imports cell, we have
try:
if sys.platform == 'darwin' and IN_NOTEBOOK: set_start_method("fork")
except: pass
It was introduced in a commit that just says "fix import" https://github.com/fastai/fastcore/commit/65d703f9bd64991e1ba8cac2d413197b093f7ae8 , and it changes the behaviour of all python multiprocessing methods on macOS. Should it be there at all?
@jph00 , I had a crack at it. What do you think?
https://github.com/xl0/fastcore/blob/minpool/nbs/03a_parallel.ipynb
All fastai tests worked with --n_workers=1 and default.
I've also noticed that we default to using "fork" on macOS
Yes, that's the only way we've found to get DL notebooks to run in practice, despite the concerns that you correctly pointed out.
In your analysis above you seem AFAICT to largely be restricting your impact analysis to usages within the fastcore lib itself. Would you be able to also test the impact of these changes on running the tests for nbdev, fastai, ghapi, and execnb? (Apologies in advance if you've already done this and I missed it.)
@jph00 , Thank you for the clarification.
No, I did test with fastcore, nbdev and fastai. I just checked - execnb passes clearly, ghapi needs to drop threadpool=True in nb 03. The change does not perceivable affect performance even if I change the start method to "spawn".
@jph00 , one thing I've noticed with ProcessPool.
There is no guarantee as to which worker will start first when using .imap( ). The tasks are scheduled for execution, and the results are returned in the right order, but once the work is scheduled for the newly created workers, any one of them may get to run first. This seems to be not the case for ProcessPoolExecutor - the tasks seem to be always started in the input order, but I'm not sure if it's a guarantee or not.
Do you think this is an issue?
I'm using imap() for the progress bar. .map() just returns the results.
~Ok, this was actually pretty easy to fix. Instead of wrapping g into _call(), I wrap items into a generator instead:~
# |export
def _gen(items, pause):
for item in items:
time.sleep(pause)
yield item
~and in parallel()~
with ProcessPool(n_workers, context=get_context(method), reuse_workers=reuse_workers) as ex:
lock = Manager().Lock()
_g = partial(_call, lock, pause, n_workers, g)
r = ex.imap(_g, items, chunksize=chunksize)
~becomes~
with ProcessPool(n_workers, context=get_context(method), reuse_workers=reuse_workers) as ex:
_items = _gen(items, pause)
r = ex.imap(g, _items, chunksize=chunksize)
~What changes is, we pause in the parent process before handing work to the worker, and not in the worker before starting the work. Which means, we no longer even need the lock.~
@jph00 , please disregard the previous comment, I was wrong. Would it be an issue that the tasks can start out of order?