clusterfutures icon indicating copy to clipboard operation
clusterfutures copied to clipboard

Is `SlurmExecutor.map` method supported? If so, there is a glitch in error handling

Open tcompa opened this issue 2 years ago • 8 comments

Hi there, In our fractal-server project, we make use of (our own subclass of) cfut.SlurmExecutor, both via the executor.submit(..) and executor.map(..) methods. While debugging some map() issues, we noticed that cfut has a custom cfut.map function, and the examples do not use the executor.map standard one.

Question: Is there a special reason for this choice?

In the (upcoming) following comment below, I will report on a glitch in how the Slurmexecutor.map() method handles errors, which is only relevant if cfut decides to support this approach (rather than the custom cfut.map function). If you folks prefer to only support the cfut.map, I suggest adding a note in the README (on the line of "Warning: SlurmExecutor.map() is not fully supported", perhaps pointing to the current issue).

Side note: the proposed change is in the ClusterExecutor._completion method, which we already subclass on our side for other reasons. Therefore we will fix the issue on our side, and we don't strictly need any update to cfut. If you'd like to have it in cfut as well, we will gladly submit a PR for that. If you prefer not updating cfut, this is in no way blocking for us (I would still suggest adding a warning in the README).

(all of this is comes from work/discussion with @mfranzon and @jluethi)

tcompa avatar Feb 09 '23 10:02 tcompa

The standard Executor.map method has an important difference from Executor.submit, in that at the end of map the list of Futures that were created (one for each item of the iterable of arguments) are all cancelled. On the other hand, the Executor.submit simply returns a Future, and does not take care of cancelling it.

TL;DR This feature of Executor.map may break the handling of errors in SlurmExecutor.map.

Side note on python versions This slightly changed after python 3.8 (see [this cpython commit](https://github.com/python/cpython/commit/763801aae2eca326e7828fcd274acde6165a4a4b)), but the relevant line is always there, e.g. in [python3.8](https://github.com/python/cpython/blob/3.8/Lib/concurrent/futures/_base.py#L624-L625) and in [python 3.11](https://github.com/python/cpython/blob/3.11/Lib/concurrent/futures/_base.py#L623-L624).

Consider this slurm_example.py script:

import cfut


def square(n):
    return n * n


def just_raise(n):
    raise RuntimeError(f"error with {n=}")


def example_6():
    """Successful use of .map() method
    """
    with cfut.SlurmExecutor(False) as exc:
        result_generator = exc.map(square, [5, 7, 11])
        results = list(result_generator)
        print(f"{results=}")
        assert results == [25, 49, 121]


def example_7():
    """This example does not work as expected
    """
    with cfut.SlurmExecutor(False) as exc:
        res = exc.map(just_raise, [5, 7, 11])
        print(list(res))


if __name__ == '__main__':
    example_6()
    example_7()

When running it (with python 3.8 and cfut 0.5), we obtain the following output:

$ python3 slurm_example.py 
results=[25, 49, 121]
Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/usr/local/lib/python3.8/dist-packages/cfut/__init__.py", line 65, in run
    self.check(i)
  File "/usr/local/lib/python3.8/dist-packages/cfut/__init__.py", line 181, in check
    super().check(i)
  File "/usr/local/lib/python3.8/dist-packages/cfut/__init__.py", line 77, in check
    self.callback(self.waiting[filename])
  File "/usr/local/lib/python3.8/dist-packages/cfut/__init__.py", line 133, in _completion
    fut.set_exception(RemoteException(result))
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 547, in set_exception
    raise InvalidStateError('{}: {!r}'.format(self._state, self))
concurrent.futures._base.InvalidStateError: CANCELLED: <Future at 0x7f39320eb100 state=cancelled>

Namely:

  • The map() method works fine, when there are no exceptions being raised.
  • When there is an exception, something in the cfut.SlurmExecutor._completion() method fails. The problem is that we are still trying to address the future, typically with set_result or set_exception, but the future has already been cancelled as part of the map method - or at least this is our understanding.

tcompa avatar Feb 09 '23 11:02 tcompa

What follows is the first workaround we could come up with. We modify the _completion method as follows:

    def _completion(self, jobid):
        """Called whenever a job finishes."""
        with self.jobs_lock:
            fut, workerid = self.jobs.pop(jobid)
            if not self.jobs:
                self.jobs_empty_cond.notify_all()

            # --------- NEW CODE BLOCK ----------
            if fut.cancelled():
                new_fut = futures.Future()
                new_fut.set_exception(JobDied("Something went wrong, we hit a cancelled future"))
                self.jobs[jobid] = new_fut, workerid
                return

        # ALL THE REST STAYS THE SAME

With this change, the output of running the example script becomes:

$ python3 slurm_example.py 
results=[25, 49, 121]
Traceback (most recent call last):
  File "slurm_example.py", line 32, in <module>
    example_7()
  File "slurm_example.py", line 27, in example_7
    print(list(res))
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 619, in result_iterator
    yield fs.pop().result()
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 444, in result
    return self.__get_result()
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
    raise self._exception
cfut.RemoteException: 
Traceback (most recent call last):
  File "slurm_example.py", line 9, in just_raise
RuntimeError: error with n=5

which is closer to my expectations.

Any comment/criticism on the workaround is obviously welcome. If there is interest in supporting the .map() method, I can turn this into a PR and we can continue over there.

tcompa avatar Feb 09 '23 11:02 tcompa

This isn't my project, but I am now a co-maintainer. I'd say that the .map() method on the executor should be supported as well.

I don't think your workaround is the right way to go, however. The call to submit has already returned the old future object, so constructing a new one won't do anything useful for the caller. Note that the _completion method normally takes things out of the jobs dict, so having it sometimes put something back in doesn't really make sense.

I think a better solution might be just to catch the InvalidStateError and ignore it. The first future with an exception will raise it, then all the later ones from the same .map() call will be cancelled - but we never 'see' those, so the results don't matter.

I had a look at the concurrent.futures code, and the thread and process executors both prevent this by setting the future into a 'running' state (fut.set_running_or_notify_cancel() before starting to process it); after this, it can no longer be cancelled. For threads, this is easy, because the Future class is thread safe. For processes, it hinges on transferring tasks from an unbounded pending queue (where they can be cancelled) to a relatively small call queue (where they can't). I think it would be tricky to copy this pattern in clusterfutures.

takluyver avatar Feb 10 '23 11:02 takluyver

Thanks for the details. Using set_running_or_notify_cancel looks like a nice solution to this problem, indeed. I will try and implement it, and report back here.

tcompa avatar Feb 10 '23 13:02 tcompa

I'm not sure there's a great way to do it, unless we just mark them all as running as soon as they're created. But maybe that's reasonable, since we haven't actually got any way to cancel a task once submitted. :thinking:

takluyver avatar Feb 10 '23 17:02 takluyver

My bad, I had not read your message carefully. After some more thoughts, I agree that just skipping the _completion call-back function if the future is cancelled does work as intended.

I went back to this comment:

I think a better solution might be just to catch the InvalidStateError and ignore it. The first future with an exception will raise it, then all the later ones from the same .map() call will be cancelled - but we never 'see' those, so the results don't matter.

and tried it out.

The proposed change is then to modify the beginning of _completion with something like a broad try/except block that captures the InvalidStateError:

from concurrent.futures import InvalidStateError
# ...
    def _completion(self, jobid):
        """Called whenever a job finishes."""
        with self.jobs_lock:
            fut, workerid = self.jobs.pop(jobid)
            if not self.jobs:
                self.jobs_empty_cond.notify_all()
        if self.debug:
            print("job completed: %i" % jobid, file=sys.stderr)
        try:   # THIS IS NEW
            try:
                with open(OUTFILE_FMT % workerid, 'rb') as f:
                    outdata = f.read()
            except FileNotFoundError:
                fut.set_exception(
                    JobDied(f"Cluster job {jobid} finished without writing a result")
                )
            else:
                success, result = cloudpickle.loads(outdata)
                if success:
                    fut.set_result(result)
                else:
                    fut.set_exception(RemoteException(result))
                os.unlink(OUTFILE_FMT % workerid)
            # Clean up communication files.
            os.unlink(INFILE_FMT % workerid)

            self._cleanup(jobid)
        # THE FOLLOWING BLOCK IS NEW
        except InvalidStateError:
            print(f"[_completion] {fut=} is cancelled, skip")

after this change (and after adding a bit of logging also in the Executor.map() method from concurrent/futures/_base.py), the following script

import cfut
import time


def square_or_raise(n):
    time.sleep(2)
    if n == 3:
        raise ValueError(f"error with {n=}")
    else:
        return n * n


if __name__ == "__main__":
    with cfut.SlurmExecutor() as exc:
        result_generator = exc.map(square_or_raise, range(10))
        for result in result_generator:
            print(f"{result=}")

        print("I reached the end of the result list (this line is never executed)")

gives the following output

result=0
result=1
result=4
[map] try block failed, let's cancel all futures
[map] now cancel future=<Future at 0x7f2b8c2bc8b0 state=pending>
[map] now cancel future=<Future at 0x7f2b8c2bc5b0 state=pending>
[map] now cancel future=<Future at 0x7f2b8c2bc940 state=pending>
[map] now cancel future=<Future at 0x7f2b8c37abe0 state=pending>
[map] now cancel future=<Future at 0x7f2b8c2b3820 state=pending>
[map] now cancel future=<Future at 0x7f2b8c2adfd0 state=pending>
[_completion] fut=<Future at 0x7f2b8c2adfd0 state=cancelled> is cancelled, skip
[_completion] fut=<Future at 0x7f2b8c2b3820 state=cancelled> is cancelled, skip
[_completion] fut=<Future at 0x7f2b8c37abe0 state=cancelled> is cancelled, skip
[_completion] fut=<Future at 0x7f2b8c2bc940 state=cancelled> is cancelled, skip
[_completion] fut=<Future at 0x7f2b8c2bc5b0 state=cancelled> is cancelled, skip
[_completion] fut=<Future at 0x7f2b8c2bc8b0 state=cancelled> is cancelled, skip
Traceback (most recent call last):
  File "slurm_example.py", line 16, in <module>
    for result in result_generator:
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 620, in result_iterator
    yield fs.pop().result()
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 444, in result
    return self.__get_result()
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
    raise self._exception
cfut.RemoteException: 
Traceback (most recent call last):
  File "slurm_example.py", line 8, in square_or_raise
ValueError: error with n=3

Some comments:

  1. The proposed change is also meaningful in case of a SlurmExecutor.submit, and not only for SlurmExecutor.map. If (for whatever reason) a future obtained with fut = exc.submit(fun, args) is cancelled, then (with cfut=0.5) we would hit the same InvalidStateError described above. With the proposed change, we would only see a log similar to
[_completion] fut=<Future at 0x7fdd1131e490 state=cancelled> is cancelled, return
  1. The fact that all other futures of a map are cancelled as soon as one of the tasks fail is common to older and more recent python versions. The minor difference is that starting with python 3.10 also the future corresponding to a failing task (that is, the one for which we will call set_exception) is cancelled, via the _result_or_cancel function. For older python, that future is never cancelled - as far as I understand. This does not change anything in the proposed update to clusterfutures.

tcompa avatar Feb 13 '23 11:02 tcompa

(I can obviously prepare a PR with the small change I implemented, or something similar, if that's useful)

tcompa avatar Feb 13 '23 11:02 tcompa

I'd probably make the try/except smaller, just around the bit where we call set_result and set_exception. And any prints should be behind an if self.debug, like the existing messages (or we should use logging, but let's stick with the mechanism that's there for now). Other than that, it seems like a reasonable improvement to the status quo.

takluyver avatar Feb 14 '23 18:02 takluyver