cylc-flow
cylc-flow copied to clipboard
Replace `select`/`poll` with `DefaultSelector`
Follow-up to #7088
Check List
- [x] I have read
CONTRIBUTING.mdand added my name as a Code Contributor. - [x] Contains logically grouped changes (else tidy your branch by rebase).
- [x] Does not contain off-topic changes (use other PRs for other changes).
- [x] No dependency changes
- [x] Tests, changelog, docs not needed
- [x] Not a confirmed bug, so raising on master
Got a couple of failures to look into
Traceback (most recent call last):
File "/home/runner/work/cylc-flow/cylc-flow/cylc/flow/scheduler_cli.py", line 741, in cylc_play
asyncio.get_running_loop()
RuntimeError: no running event loop
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/runner/work/cylc-flow/cylc-flow/cylc/flow/scheduler.py", line 711, in run_scheduler
await self._main_loop()
File "/home/runner/work/cylc-flow/cylc-flow/cylc/flow/scheduler.py", line 1739, in _main_loop
await self.workflow_shutdown()
File "/home/runner/work/cylc-flow/cylc-flow/cylc/flow/scheduler.py", line 1493, in workflow_shutdown
self.proc_pool.process()
File "/home/runner/work/cylc-flow/cylc-flow/cylc/flow/subprocpool.py", line 320, in process
self._poll_proc_pipes(proc, ctx)
File "/home/runner/work/cylc-flow/cylc-flow/cylc/flow/subprocpool.py", line 443, in _poll_proc_pipes
self.pipepoller.register(handle, selectors.EVENT_READ)
File "/home/runner/micromamba/envs/cylc-functional-test/lib/python3.12/selectors.py", line 359, in register
self._selector.register(key.fd, poller_events)
ValueError: I/O operation on closed epoll object
and
async def test_log_xtrigger_stdout(
flow, scheduler, run_dir, start, log_filter
):
"""Output from xtriggers should appear in the scheduler log:
(As per the toy example in the Cylc Docs)
"""
# Setup a workflow:
id_ = flow({
'scheduler': {'allow implicit tasks': True},
'scheduling': {
'graph': {'R1': '@myxtrigger => foo'},
'xtriggers': {'myxtrigger': 'myxtrigger()'}
}
})
# Create an xtrigger:
xt_lib = run_dir / id_ / 'lib/python/myxtrigger.py'
xt_lib.parent.mkdir(parents=True, exist_ok=True)
xt_lib.write_text(dedent(r"""
from sys import stderr
def myxtrigger():
print('Hello World')
print('Hello Hades', file=stderr)
return True, {}
"""))
schd = scheduler(id_)
async with start(schd, level=DEBUG):
# Set off check for x-trigger:
task = schd.pool.get_tasks()[0]
schd.xtrigger_mgr.call_xtriggers_async(task)
# while not schd.xtrigger_mgr._get_xtrigs(task):
while schd.proc_pool.is_not_done():
> schd.proc_pool.process()
tests/integration/test_subprocctx.py:57:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
cylc/flow/subprocpool.py:320: in process
self._poll_proc_pipes(proc, ctx)
cylc/flow/subprocpool.py:443: in _poll_proc_pipes
self.pipepoller.register(handle, selectors.EVENT_READ)
../../../micromamba/envs/cylc-fast-test/lib/python3.12/selectors.py:352: in register
key = super().register(fileobj, events, data)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <selectors.EpollSelector object at 0x7f3ba4c38200>
fileobj = <_io.FileIO [closed]>, events = 1, data = None
def register(self, fileobj, events, data=None):
if (not events) or (events & ~(EVENT_READ | EVENT_WRITE)):
raise ValueError("Invalid events: {!r}".format(events))
key = SelectorKey(fileobj, self._fileobj_lookup(fileobj), events, data)
if key.fd in self._fd_to_key:
> raise KeyError("{!r} (FD {}) is already registered"
.format(fileobj, key.fd))
E KeyError: "<_io.FileIO name=94 mode='rb' closefd=True> (FD 94) is already registered"
../../../micromamba/envs/cylc-fast-test/lib/python3.12/selectors.py:241: KeyError