pytest-catchlog
pytest-catchlog copied to clipboard
I/O operation on closed file with lots of threads
This is similar to #41 but suggests something perhaps a bit more nefarious. The code below spawns lots of threads (on my machine, ~100 was enough to do it, but I see the problem intermittently with ~8 threads) and then logs from each thread intermittently.
This causes lots of "I/O operation on closed file" errors from the threads, which seem to log after the handler has been removed and closed.
It appears to be a race condition, as the error is intermittent with smaller number of threads.
I discovered this problem when running a bunch of pytest tests which spun up pools of worker threads for each test, then shut them all down again.
Here is the code to cause the problem:
# -*- coding: utf-8 -*-
import threading
import logging
import time
import pytest_catchlog
import pytest_catchlog.common
import pytest_catchlog.plugin
def worker(n, shutdown):
"""A worker thread."""
logger = logging.getLogger("worker.{0:d}".format(n))
while not shutdown.isSet():
logger.debug("Working")
shutdown.wait(0.01)
def log_from_many_threads(n=10):
"""docstring for log_from_many_threads"""
threads = {}
for i in range(n):
event = threading.Event()
thread = threading.Thread(target=worker, args=(i, event))
thread.start()
threads[i] = (thread, event)
handler = pytest_catchlog.plugin.LogCaptureHandler()
with pytest_catchlog.common.catching_logs(handler):
print("Running")
time.sleep(1.0)
print("Shutting down...")
for thread, event in threads.values():
event.set()
thread.join()
if __name__ == '__main__':
log_from_many_threads(100)
The errors are all of the variety
Traceback (most recent call last):
File ".../python2.7/logging/__init__.py", line 874, in emit
stream.write(fs % msg)
File ".../site-packages/py/_io/capture.py", line 16, in write
StringIO.write(self, data)
ValueError: I/O operation on closed file
Logged from file break_me.py, line 15
@alexrudy, thank you for reporting this!
If I understand correctly, the error should gone, if you move thread joining logic inside the with catching_logs(...)
block:
handler = pytest_catchlog.plugin.LogCaptureHandler()
with pytest_catchlog.common.catching_logs(handler):
print("Running")
time.sleep(1.0)
print("Shutting down...")
for thread, event in threads.values():
event.set()
thread.join()
Am I missing something?
Is this the original code that causes the error? If not, could you paste the particular test cases that shows that error, please? Pseudo-code would be OK too.
@abusalimov I agree, moving the thread join inside the the context manager does make this problem go away. However, my particular case happens because I start threads during a test setup or test body, then shut them down during a test teardown. The test case itself is quite a bit more complicated, but you can find the thread pool here: https://github.com/alexrudy/Cauldron/blob/feature/async-dispatch/Cauldron/zmq/responder.py#L79
However, the gist of what I'm going for can be reproduced by e.g.
handler_setup = pytest_catchlog.plugin.LogCaptureHandler()
handler_body = pytest_catchlog.plugin.LogCaptureHandler()
handler_teardown = pytest_catchlog.plugin.LogCaptureHandler()
with pytest_catchlog.common.catching_logs(handler_setup):
for i in range(n):
event = threading.Event()
thread = threading.Thread(target=worker, args=(i, event))
thread.start()
threads[i] = (thread, event)
with pytest_catchlog.common.catching_logs(handler_body):
print("Running")
time.sleep(1.0)
with pytest_catchlog.common.catching_logs(handler_teardown):
print("Shutting down...")
for thread, event in threads.values():
event.set()
thread.join()
With a bit more thought, I think I could make an actual test case which might reproduce this, instead of just a script.
I think we could add a check to ensure the stream is still alive. Effectively, this would make errors be swallowed silently. I don't like that kind of approach, but I guess in this case it would OK to do like that.
The thing is, since the standard logging module does not synchronize addHandler
/removeHandler
with actual access to a handler, a worker thread can indeed call an already detached and closed handler.
There is an inevitable race condition anyway:
- worker thread sees the handler attached the logger (
for handler in logger.handlers: ...
) - main thread removes the handler and closes it (
logger.handlers.remove(handler); handler.close()
) - worker thread calls
handler.handle(...)
, which throws the error in question
Hence, there is no fundamental difference, whether the handler is invoked due to a race condition just after it has been closed, or it is not invoked at all because it has been removed already. Let me show that on your example:
with pytest_catchlog.common.catching_logs(handler_setup):
...
thread.start()
# <--- if a worker thread logs now, the `handler_setup` captures the record
# <------- if somewhere at catching_logs(...).__exit__, you get the "I/O operation on closed file" error
# <------- if a worker logs in between these blocks, a record is lost anyway
with pytest_catchlog.common.catching_logs(handler_body):
# <--- a record is captured by the `handler_body`
...
Does this make sense for your case?
I think it's also worth removing the closing(...)
context from the catching_logs(...)
to let a caller do that explicitly.
I had to take a long leave of absence from this project, so sorry about the delayed response.
Yes, I think both of these suggestions make sense.
- Swallow errors if the stream isn't alive is fine in my case. Thats the best that can be done given the current state of the logging module's thread safety.
- Removing
closing(...)
and having the caller do that explicitly would also be good.
Thanks for your incredibly clear explanation of the race condition that I'm chasing. I think you've nailed it. Perhaps there is an upstream bug here, but I'll have to check the python logging PEP to see if there is any guarantee of thread safety to begin with.