adaptive
adaptive copied to clipboard
ipyparallel raises concurrent.futures._base.InvalidStateError: CANCELLED
trafficstars
Running the following script in Python 3.8
#!/usr/bin/env python3
# ipcluster start --n=10 --profile=test --cluster-id=''
# python run_learner.py --n=10 --profile=test
from adaptive_scheduler.utils import connect_to_ipyparallel
from adaptive import Runner, Learner1D
if __name__ == "__main__": # ← use this, see warning @ https://bit.ly/2HAk0GG
def peak(x, a=0.01):
return x + a**2 / (a**2 + x**2)
learner = Learner1D(peak, bounds=(-1, 1))
executor = connect_to_ipyparallel(profile="test", n=10)
runner = Runner(learner, goal=lambda l: l.loss() < 0.01, executor=executor)
runner.ioloop.run_until_complete(runner.task)
raises
Connected to 10 out of 10 engines after 0 seconds.
exception calling callback for <Future at 0x2ac131ebd220 state=finished returned list>
Traceback (most recent call last):
File "/gscratch/home/a-banijh/miniconda3/envs/py38/lib/python3.8/site-packages/ipyparallel/client/asyncresult.py", line 230, in _resolve_result
self.set_result(self._reconstruct_result(results))
File "/gscratch/home/a-banijh/miniconda3/envs/py38/lib/python3.8/concurrent/futures/_base.py", line 524, in set_result
raise InvalidStateError('{}: {!r}'.format(self._state, self))
concurrent.futures._base.InvalidStateError: CANCELLED: <AsyncResult: peak:finished>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/gscratch/home/a-banijh/miniconda3/envs/py38/lib/python3.8/concurrent/futures/_base.py", line 328, in _invoke_callbacks
callback(self)
File "/gscratch/home/a-banijh/miniconda3/envs/py38/lib/python3.8/site-packages/ipyparallel/client/asyncresult.py", line 233, in _resolve_result
self.set_exception(e)
File "/gscratch/home/a-banijh/miniconda3/envs/py38/lib/python3.8/concurrent/futures/_base.py", line 539, in set_exception
raise InvalidStateError('{}: {!r}'.format(self._state, self))
concurrent.futures._base.InvalidStateError: CANCELLED: <AsyncResult: peak:finished>
I am trying to make a more minimal example but I will leave this here for now.
A more minimal example that triggers the same error:
#!/usr/bin/env python3
# ipython profile create test
# ipcluster start --n=10 --profile=test --cluster-id=''
# python fail.py --n=10 --profile=test
import asyncio
import time
from random import random
from ipyparallel import Client
from ipyparallel.error import NoEnginesRegistered
def f(x):
return x
def connect_to_ipyparallel(profile, n):
client = Client(profile=profile)
while True:
try:
dview = client[:]
if len(dview) == n:
dview.use_dill()
return client
except NoEnginesRegistered:
time.sleep(0.1)
async def _run(loop, executor, ncores):
pending = set()
done = set()
for _ in range(10): # do some loops that submit futures
while len(pending) + len(done) <= ncores:
fut = loop.run_in_executor(executor, f, random())
pending.add(fut)
done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
for fut in done:
fut.result() # process results that are done
for fut in pending: # cancel the results that are pending
fut.cancel()
return done
def do(loop, executor, ncores):
coro = _run(loop, executor, ncores)
task = loop.create_task(coro)
loop.run_until_complete(task)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
ncores = 10
executor = connect_to_ipyparallel(profile="test", n=ncores).executor()
for i in range(10):
do(loop, executor, ncores)