distributed
distributed copied to clipboard
flaky `test_quiet_close_process[True]`
_______________________ test_quiet_close_process[True] ________________________
processes = True
tmp_path = WindowsPath('C:/Users/runneradmin/AppData/Local/Temp/pytest-of-runneradmin/pytest-0/test_quiet_close_process_True_0')
@pytest.mark.slow
@pytest.mark.parametrize("processes", [True, False])
def test_quiet_close_process(processes, tmp_path):
with open(tmp_path / "script.py", mode="w") as f:
f.write(client_script % processes)
with popen([sys.executable, tmp_path / "script.py"], capture_output=True) as proc:
> out, err = proc.communicate(timeout=10)
distributed\tests\test_client.py:7554:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
C:\Miniconda3\envs\dask-distributed\lib\subprocess.py:1134: in communicate
stdout, stderr = self._communicate(input, endtime, timeout)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Popen: returncode: 3221225786 args: ['C:\\Miniconda3\\envs\\dask-distribute...>
input = None, endtime = 1556.765, orig_timeout = 10
def _communicate(self, input, endtime, orig_timeout):
# Start reader threads feeding into a list hanging off of this
# object, unless they've already been started.
if self.stdout and not hasattr(self, "_stdout_buff"):
self._stdout_buff = []
self.stdout_thread = \
threading.Thread(target=self._readerthread,
args=(self.stdout, self._stdout_buff))
self.stdout_thread.daemon = True
self.stdout_thread.start()
if self.stderr and not hasattr(self, "_stderr_buff"):
self._stderr_buff = []
self.stderr_thread = \
threading.Thread(target=self._readerthread,
args=(self.stderr, self._stderr_buff))
self.stderr_thread.daemon = True
self.stderr_thread.start()
if self.stdin:
self._stdin_write(input)
# Wait for the reader threads, or time out. If we time out, the
# threads remain reading and the fds left open in case the user
# calls communicate again.
if self.stdout is not None:
self.stdout_thread.join(self._remaining_time(endtime))
if self.stdout_thread.is_alive():
> raise TimeoutExpired(self.args, orig_timeout)
E subprocess.TimeoutExpired: Command '['C:\\Miniconda3\\envs\\dask-distributed\\python.exe', WindowsPath('C:/Users/runneradmin/AppData/Local/Temp/pytest-of-runneradmin/pytest-0/test_quiet_close_process_True_0/script.py')]' timed out after 10 seconds
C:\Miniconda3\envs\dask-distributed\lib\subprocess.py:1510: TimeoutExpired
---------------------------- Captured stdout call -----------------------------
------ stdout: returncode 3221225786, ['C:\\Miniconda3\\envs\\dask-distributed\\python.exe', WindowsPath('C:/Users/runneradmin/AppData/Local/Temp/pytest-of-runneradmin/pytest-0/test_quiet_close_process_True_0/script.py')] ------
2022-08-09 06:19:33,073 - tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x000001B8C28EF070>>, <Task finished name='Task-62' coro=<SpecCluster._correct_state_internal() done, defined at d:\a\distributed\distributed\distributed\deploy\spec.py:330> exception=RuntimeError('cannot schedule new futures after shutdown')>)
Traceback (most recent call last):
File "C:\Miniconda3\envs\dask-distributed\lib\site-packages\tornado\ioloop.py", line 741, in _run_callback
ret = callback()
File "C:\Miniconda3\envs\dask-distributed\lib\site-packages\tornado\ioloop.py", line 765, in _discard_future_result
future.result()
File "d:\a\distributed\distributed\distributed\deploy\spec.py", line 423, in _close
await self._correct_state()
File "d:\a\distributed\distributed\distributed\deploy\spec.py", line 337, in _correct_state_internal
await self.scheduler_comm.retire_workers(workers=list(to_close))
File "d:\a\distributed\distributed\distributed\core.py", line 1071, in send_recv_from_rpc
comm = await self.live_comm()
File "d:\a\distributed\distributed\distributed\core.py", line 1030, in live_comm
comm = await connect(
File "d:\a\distributed\distributed\distributed\comm\core.py", line 291, in connect
comm = await asyncio.wait_for(
File "C:\Miniconda3\envs\dask-distributed\lib\asyncio\tasks.py", line 479, in wait_for
return fut.result()
File "d:\a\distributed\distributed\distributed\comm\tcp.py", line 451, in connect
stream = await self.client.connect(
File "C:\Miniconda3\envs\dask-distributed\lib\site-packages\tornado\tcpclient.py", line 265, in connect
addrinfo = await self.resolver.resolve(host, port, af)
File "d:\a\distributed\distributed\distributed\comm\tcp.py", line 436, in resolve
for fam, _, _, _, address in await asyncio.get_running_loop().getaddrinfo(
File "C:\Miniconda3\envs\dask-distributed\lib\asyncio\base_events.py", line 861, in getaddrinfo
return await self.run_in_executor(
File "C:\Miniconda3\envs\dask-distributed\lib\asyncio\base_events.py", line 819, in run_in_executor
executor.submit(func, *args), loop=self)
File "C:\Miniconda3\envs\dask-distributed\lib\concurrent\futures\thread.py", line 167, in submit
raise RuntimeError('cannot schedule new futures after shutdown')
RuntimeError: cannot schedule new futures after shutdown
https://github.com/dask/distributed/runs/7739979508?check_suite_focus=true#step:11:1265
This is possibly closed by https://github.com/dask/distributed/pull/6847
10s is also just too short of a timeout IMO for windows and macOS when creating clusters in subprocesses.