OutStream.close hangs if `watchfd=True`
When attempting to close an OutStream object that was created with watchfd=True (the default), the close operation hangs if nothing has been sent to the stream being wrapped.
Python code to reproduce:
import sys
import zmq
from jupyter_client.session import Session
from ipykernel.iostream import OutStream, IOPubThread
def test():
context = zmq.Context()
pub_thread = IOPubThread(socket=context.socket(zmq.PUB))
pub_thread.start()
stream = OutStream(
session=Session(),
pub_thread=pub_thread,
name="stderr",
)
print("Closing stream", file=sys.__stdout__)
stream.close()
print("Done closing stream", file=sys.__stdout__)
pub_thread.stop()
context.destroy()
test()
When I run the code above, the "Closing stream" message is printed, but the "Done closing stream" message is not.
From some debugging, the close call is hanging when joining the auxiliary thread here: https://github.com/ipython/ipykernel/blob/221dca63d7b2e2b8fea32bf8a101d07645fc9d3c/ipykernel/iostream.py#L429
And the join is in turn hanging because the thread is stuck at a blocking read, here: https://github.com/ipython/ipykernel/blob/221dca63d7b2e2b8fea32bf8a101d07645fc9d3c/ipykernel/iostream.py#L326
I'm not an expert on this stuff, but it seems possible that making the read end of the pipe non-blocking would help. But then that results in the thread entering a busy loop, which isn't ideal.
ipykernel version: 6.9.1 Python version: 3.10.2 OS + hardware: macOS 11.6.3 (Big Sur), Intel MacBook Pro
See also #867.
In OutStream class,
def _setup_stream_redirects(self, name):
pr, pw = os.pipe()
fno = getattr(sys, name).fileno()
self._original_stdstream_copy = os.dup(fno)
os.dup2(pw, fno)
self._fid = pr
self._exc = None
self.watch_fd_thread = threading.Thread(target=self._watch_pipe_fd)
self.watch_fd_thread.daemon = True
self.watch_fd_thread.start()
Save pw in the above like self._pw = pw
And try this:
self._should_watch = False
os.write(self._pw, b"\n")
self.watch_fd_thread.join()
I think this is not the best solution but I used it for a temporary one