clusterfutures
clusterfutures copied to clipboard
Is `SlurmExecutor.map` method supported? If so, there is a glitch in error handling
Hi there,
In our fractal-server project, we make use of (our own subclass of) cfut.SlurmExecutor
, both via the executor.submit(..)
and executor.map(..)
methods. While debugging some map()
issues, we noticed that cfut has a custom cfut.map
function, and the examples do not use the executor.map
standard one.
Question: Is there a special reason for this choice?
In the (upcoming) following comment below, I will report on a glitch in how the Slurmexecutor.map()
method handles errors, which is only relevant if cfut decides to support this approach (rather than the custom cfut.map
function). If you folks prefer to only support the cfut.map
, I suggest adding a note in the README (on the line of "Warning: SlurmExecutor.map() is not fully supported", perhaps pointing to the current issue).
Side note: the proposed change is in the ClusterExecutor._completion
method, which we already subclass on our side for other reasons. Therefore we will fix the issue on our side, and we don't strictly need any update to cfut. If you'd like to have it in cfut as well, we will gladly submit a PR for that. If you prefer not updating cfut, this is in no way blocking for us (I would still suggest adding a warning in the README).
(all of this is comes from work/discussion with @mfranzon and @jluethi)
The standard Executor.map
method has an important difference from Executor.submit
, in that at the end of map
the list of Futures that were created (one for each item of the iterable of arguments) are all cancelled. On the other hand, the Executor.submit
simply returns a Future, and does not take care of cancelling it.
TL;DR This feature of Executor.map
may break the handling of errors in SlurmExecutor.map
.
Side note on python versions
This slightly changed after python 3.8 (see [this cpython commit](https://github.com/python/cpython/commit/763801aae2eca326e7828fcd274acde6165a4a4b)), but the relevant line is always there, e.g. in [python3.8](https://github.com/python/cpython/blob/3.8/Lib/concurrent/futures/_base.py#L624-L625) and in [python 3.11](https://github.com/python/cpython/blob/3.11/Lib/concurrent/futures/_base.py#L623-L624).Consider this slurm_example.py
script:
import cfut
def square(n):
return n * n
def just_raise(n):
raise RuntimeError(f"error with {n=}")
def example_6():
"""Successful use of .map() method
"""
with cfut.SlurmExecutor(False) as exc:
result_generator = exc.map(square, [5, 7, 11])
results = list(result_generator)
print(f"{results=}")
assert results == [25, 49, 121]
def example_7():
"""This example does not work as expected
"""
with cfut.SlurmExecutor(False) as exc:
res = exc.map(just_raise, [5, 7, 11])
print(list(res))
if __name__ == '__main__':
example_6()
example_7()
When running it (with python 3.8 and cfut 0.5), we obtain the following output:
$ python3 slurm_example.py
results=[25, 49, 121]
Exception in thread Thread-2:
Traceback (most recent call last):
File "/usr/lib/python3.8/threading.py", line 932, in _bootstrap_inner
self.run()
File "/usr/local/lib/python3.8/dist-packages/cfut/__init__.py", line 65, in run
self.check(i)
File "/usr/local/lib/python3.8/dist-packages/cfut/__init__.py", line 181, in check
super().check(i)
File "/usr/local/lib/python3.8/dist-packages/cfut/__init__.py", line 77, in check
self.callback(self.waiting[filename])
File "/usr/local/lib/python3.8/dist-packages/cfut/__init__.py", line 133, in _completion
fut.set_exception(RemoteException(result))
File "/usr/lib/python3.8/concurrent/futures/_base.py", line 547, in set_exception
raise InvalidStateError('{}: {!r}'.format(self._state, self))
concurrent.futures._base.InvalidStateError: CANCELLED: <Future at 0x7f39320eb100 state=cancelled>
Namely:
- The
map()
method works fine, when there are no exceptions being raised. - When there is an exception, something in the
cfut.SlurmExecutor._completion()
method fails. The problem is that we are still trying to address the future, typically withset_result
orset_exception
, but the future has already been cancelled as part of themap
method - or at least this is our understanding.
What follows is the first workaround we could come up with. We modify the _completion
method as follows:
def _completion(self, jobid):
"""Called whenever a job finishes."""
with self.jobs_lock:
fut, workerid = self.jobs.pop(jobid)
if not self.jobs:
self.jobs_empty_cond.notify_all()
# --------- NEW CODE BLOCK ----------
if fut.cancelled():
new_fut = futures.Future()
new_fut.set_exception(JobDied("Something went wrong, we hit a cancelled future"))
self.jobs[jobid] = new_fut, workerid
return
# ALL THE REST STAYS THE SAME
With this change, the output of running the example script becomes:
$ python3 slurm_example.py
results=[25, 49, 121]
Traceback (most recent call last):
File "slurm_example.py", line 32, in <module>
example_7()
File "slurm_example.py", line 27, in example_7
print(list(res))
File "/usr/lib/python3.8/concurrent/futures/_base.py", line 619, in result_iterator
yield fs.pop().result()
File "/usr/lib/python3.8/concurrent/futures/_base.py", line 444, in result
return self.__get_result()
File "/usr/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
raise self._exception
cfut.RemoteException:
Traceback (most recent call last):
File "slurm_example.py", line 9, in just_raise
RuntimeError: error with n=5
which is closer to my expectations.
Any comment/criticism on the workaround is obviously welcome. If there is interest in supporting the .map()
method, I can turn this into a PR and we can continue over there.
This isn't my project, but I am now a co-maintainer. I'd say that the .map()
method on the executor should be supported as well.
I don't think your workaround is the right way to go, however. The call to submit has already returned the old future object, so constructing a new one won't do anything useful for the caller. Note that the _completion
method normally takes things out of the jobs dict, so having it sometimes put something back in doesn't really make sense.
I think a better solution might be just to catch the InvalidStateError and ignore it. The first future with an exception will raise it, then all the later ones from the same .map()
call will be cancelled - but we never 'see' those, so the results don't matter.
I had a look at the concurrent.futures
code, and the thread and process executors both prevent this by setting the future into a 'running' state (fut.set_running_or_notify_cancel()
before starting to process it); after this, it can no longer be cancelled. For threads, this is easy, because the Future class is thread safe. For processes, it hinges on transferring tasks from an unbounded pending queue (where they can be cancelled) to a relatively small call queue (where they can't). I think it would be tricky to copy this pattern in clusterfutures.
Thanks for the details.
Using set_running_or_notify_cancel
looks like a nice solution to this problem, indeed. I will try and implement it, and report back here.
I'm not sure there's a great way to do it, unless we just mark them all as running as soon as they're created. But maybe that's reasonable, since we haven't actually got any way to cancel a task once submitted. :thinking:
My bad, I had not read your message carefully. After some more thoughts, I agree that just skipping the _completion
call-back function if the future is cancelled does work as intended.
I went back to this comment:
I think a better solution might be just to catch the InvalidStateError and ignore it. The first future with an exception will raise it, then all the later ones from the same .map() call will be cancelled - but we never 'see' those, so the results don't matter.
and tried it out.
The proposed change is then to modify the beginning of _completion
with something like a broad try/except block that captures the InvalidStateError:
from concurrent.futures import InvalidStateError
# ...
def _completion(self, jobid):
"""Called whenever a job finishes."""
with self.jobs_lock:
fut, workerid = self.jobs.pop(jobid)
if not self.jobs:
self.jobs_empty_cond.notify_all()
if self.debug:
print("job completed: %i" % jobid, file=sys.stderr)
try: # THIS IS NEW
try:
with open(OUTFILE_FMT % workerid, 'rb') as f:
outdata = f.read()
except FileNotFoundError:
fut.set_exception(
JobDied(f"Cluster job {jobid} finished without writing a result")
)
else:
success, result = cloudpickle.loads(outdata)
if success:
fut.set_result(result)
else:
fut.set_exception(RemoteException(result))
os.unlink(OUTFILE_FMT % workerid)
# Clean up communication files.
os.unlink(INFILE_FMT % workerid)
self._cleanup(jobid)
# THE FOLLOWING BLOCK IS NEW
except InvalidStateError:
print(f"[_completion] {fut=} is cancelled, skip")
after this change (and after adding a bit of logging also in the Executor.map()
method from concurrent/futures/_base.py
), the following script
import cfut
import time
def square_or_raise(n):
time.sleep(2)
if n == 3:
raise ValueError(f"error with {n=}")
else:
return n * n
if __name__ == "__main__":
with cfut.SlurmExecutor() as exc:
result_generator = exc.map(square_or_raise, range(10))
for result in result_generator:
print(f"{result=}")
print("I reached the end of the result list (this line is never executed)")
gives the following output
result=0
result=1
result=4
[map] try block failed, let's cancel all futures
[map] now cancel future=<Future at 0x7f2b8c2bc8b0 state=pending>
[map] now cancel future=<Future at 0x7f2b8c2bc5b0 state=pending>
[map] now cancel future=<Future at 0x7f2b8c2bc940 state=pending>
[map] now cancel future=<Future at 0x7f2b8c37abe0 state=pending>
[map] now cancel future=<Future at 0x7f2b8c2b3820 state=pending>
[map] now cancel future=<Future at 0x7f2b8c2adfd0 state=pending>
[_completion] fut=<Future at 0x7f2b8c2adfd0 state=cancelled> is cancelled, skip
[_completion] fut=<Future at 0x7f2b8c2b3820 state=cancelled> is cancelled, skip
[_completion] fut=<Future at 0x7f2b8c37abe0 state=cancelled> is cancelled, skip
[_completion] fut=<Future at 0x7f2b8c2bc940 state=cancelled> is cancelled, skip
[_completion] fut=<Future at 0x7f2b8c2bc5b0 state=cancelled> is cancelled, skip
[_completion] fut=<Future at 0x7f2b8c2bc8b0 state=cancelled> is cancelled, skip
Traceback (most recent call last):
File "slurm_example.py", line 16, in <module>
for result in result_generator:
File "/usr/lib/python3.8/concurrent/futures/_base.py", line 620, in result_iterator
yield fs.pop().result()
File "/usr/lib/python3.8/concurrent/futures/_base.py", line 444, in result
return self.__get_result()
File "/usr/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
raise self._exception
cfut.RemoteException:
Traceback (most recent call last):
File "slurm_example.py", line 8, in square_or_raise
ValueError: error with n=3
Some comments:
- The proposed change is also meaningful in case of a
SlurmExecutor.submit
, and not only forSlurmExecutor.map
. If (for whatever reason) a future obtained withfut = exc.submit(fun, args)
is cancelled, then (with cfut=0.5) we would hit the same InvalidStateError described above. With the proposed change, we would only see a log similar to
[_completion] fut=<Future at 0x7fdd1131e490 state=cancelled> is cancelled, return
- The fact that all other futures of a
map
are cancelled as soon as one of the tasks fail is common to older and more recent python versions. The minor difference is that starting with python 3.10 also the future corresponding to a failing task (that is, the one for which we will callset_exception
) is cancelled, via the_result_or_cancel
function. For older python, that future is never cancelled - as far as I understand. This does not change anything in the proposed update to clusterfutures.
(I can obviously prepare a PR with the small change I implemented, or something similar, if that's useful)
I'd probably make the try/except smaller, just around the bit where we call set_result
and set_exception
. And any prints should be behind an if self.debug
, like the existing messages (or we should use logging, but let's stick with the mechanism that's there for now). Other than that, it seems like a reasonable improvement to the status quo.