adaptive icon indicating copy to clipboard operation
adaptive copied to clipboard

ipyparallel raises concurrent.futures._base.InvalidStateError: CANCELLED

Open basnijholt opened this issue 5 years ago • 1 comments
trafficstars

Running the following script in Python 3.8

#!/usr/bin/env python3
# ipcluster start --n=10 --profile=test --cluster-id=''
# python run_learner.py --n=10 --profile=test
from adaptive_scheduler.utils import connect_to_ipyparallel
from adaptive import Runner, Learner1D

if __name__ == "__main__":  # ← use this, see warning @ https://bit.ly/2HAk0GG

    def peak(x, a=0.01):
        return x + a**2 / (a**2 + x**2)

    learner = Learner1D(peak, bounds=(-1, 1))
    executor = connect_to_ipyparallel(profile="test", n=10)
    runner = Runner(learner, goal=lambda l: l.loss() < 0.01, executor=executor)
    runner.ioloop.run_until_complete(runner.task)

raises

Connected to 10 out of 10 engines after 0 seconds.
exception calling callback for <Future at 0x2ac131ebd220 state=finished returned list>
Traceback (most recent call last):
  File "/gscratch/home/a-banijh/miniconda3/envs/py38/lib/python3.8/site-packages/ipyparallel/client/asyncresult.py", line 230, in _resolve_result
    self.set_result(self._reconstruct_result(results))
  File "/gscratch/home/a-banijh/miniconda3/envs/py38/lib/python3.8/concurrent/futures/_base.py", line 524, in set_result
    raise InvalidStateError('{}: {!r}'.format(self._state, self))
concurrent.futures._base.InvalidStateError: CANCELLED: <AsyncResult: peak:finished>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/gscratch/home/a-banijh/miniconda3/envs/py38/lib/python3.8/concurrent/futures/_base.py", line 328, in _invoke_callbacks
    callback(self)
  File "/gscratch/home/a-banijh/miniconda3/envs/py38/lib/python3.8/site-packages/ipyparallel/client/asyncresult.py", line 233, in _resolve_result
    self.set_exception(e)
  File "/gscratch/home/a-banijh/miniconda3/envs/py38/lib/python3.8/concurrent/futures/_base.py", line 539, in set_exception
    raise InvalidStateError('{}: {!r}'.format(self._state, self))
concurrent.futures._base.InvalidStateError: CANCELLED: <AsyncResult: peak:finished>

I am trying to make a more minimal example but I will leave this here for now.

basnijholt avatar Aug 18 '20 08:08 basnijholt

A more minimal example that triggers the same error:

#!/usr/bin/env python3

# ipython profile create test
# ipcluster start --n=10 --profile=test --cluster-id=''
# python fail.py --n=10 --profile=test

import asyncio
import time
from random import random

from ipyparallel import Client
from ipyparallel.error import NoEnginesRegistered


def f(x):
    return x


def connect_to_ipyparallel(profile, n):
    client = Client(profile=profile)
    while True:
        try:
            dview = client[:]
            if len(dview) == n:
                dview.use_dill()
                return client
        except NoEnginesRegistered:
            time.sleep(0.1)


async def _run(loop, executor, ncores):
    pending = set()
    done = set()

    for _ in range(10):  # do some loops that submit futures
        while len(pending) + len(done) <= ncores:
            fut = loop.run_in_executor(executor, f, random())
            pending.add(fut)
        done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)

        for fut in done:
            fut.result()  # process results that are done

    for fut in pending:  # cancel the results that are pending
        fut.cancel()

    return done


def do(loop, executor, ncores):
    coro = _run(loop, executor, ncores)
    task = loop.create_task(coro)
    loop.run_until_complete(task)


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    ncores = 10
    executor = connect_to_ipyparallel(profile="test", n=ncores).executor()

    for i in range(10):
        do(loop, executor, ncores)

basnijholt avatar Aug 18 '20 09:08 basnijholt