data
data copied to clipboard
`PrototypeMultiProcessingReadingService` does not properly terminate on MacOS
🐛 Describe the bug
Running the code snippet below in MacOS will print the correct values but will not automatically terminate afterwards without manual interruption. I don't think it is related to the recent prefetch update.
from torchdata.datapipes.iter import IterableWrapper
from torchdata.dataloader2 import DataLoader2
from torchdata.dataloader2.reading_service import PrototypeMultiProcessingReadingService
if __name__ == '__main__':
dp = IterableWrapper([0, 1, 2, 3, 4])
rs = PrototypeMultiProcessingReadingService(num_workers=2)
dl = DataLoader2(datapipe=dp, reading_service=rs)
for x in dl:
print(x)
print("End")
Trace:
0
0
1
1
2
2
3
3
4
4
End
^CError in atexit._run_exitfuncs:
Traceback (most recent call last):
File "/Users/ktse/miniconda3/envs/pytorch/lib/python3.9/multiprocessing/popen_fork.py", line 27, in poll
Process SpawnProcess-2:
Process SpawnProcess-1:
pid, sts = os.waitpid(self.pid, flag)
KeyboardInterrupt
Traceback (most recent call last):
Traceback (most recent call last):
File "/Users/ktse/miniconda3/envs/pytorch/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/Users/ktse/miniconda3/envs/pytorch/lib/python3.9/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/Users/ktse/data/torchdata/dataloader2/communication/eventloop.py", line 49, in DataPipeToQueuesLoop
for _ in pipe_type.DataPipeBehindQueues(
File "/Users/ktse/miniconda3/envs/pytorch/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/Users/ktse/data/torchdata/dataloader2/communication/iter.py", line 121, in DataPipeBehindQueues
request = protocol.get_new_request(block=blocking_request_get)
File "/Users/ktse/miniconda3/envs/pytorch/lib/python3.9/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/Users/ktse/data/torchdata/dataloader2/communication/protocol.py", line 66, in get_new_request
response = self.request_queue.get(block=block)
File "/Users/ktse/data/torchdata/dataloader2/communication/eventloop.py", line 49, in DataPipeToQueuesLoop
for _ in pipe_type.DataPipeBehindQueues(
File "/Users/ktse/data/torchdata/dataloader2/communication/iter.py", line 121, in DataPipeBehindQueues
request = protocol.get_new_request(block=blocking_request_get)
File "/Users/ktse/data/torchdata/dataloader2/communication/protocol.py", line 66, in get_new_request
response = self.request_queue.get(block=block)
File "/Users/ktse/miniconda3/envs/pytorch/lib/python3.9/multiprocessing/queues.py", line 103, in get
res = self._recv_bytes()
File "/Users/ktse/miniconda3/envs/pytorch/lib/python3.9/multiprocessing/connection.py", line 221, in recv_bytes
buf = self._recv_bytes(maxlength)
File "/Users/ktse/miniconda3/envs/pytorch/lib/python3.9/multiprocessing/queues.py", line 103, in get
res = self._recv_bytes()
File "/Users/ktse/miniconda3/envs/pytorch/lib/python3.9/multiprocessing/connection.py", line 419, in _recv_bytes
buf = self._recv(4)
File "/Users/ktse/miniconda3/envs/pytorch/lib/python3.9/multiprocessing/connection.py", line 221, in recv_bytes
buf = self._recv_bytes(maxlength)
File "/Users/ktse/miniconda3/envs/pytorch/lib/python3.9/multiprocessing/connection.py", line 384, in _recv
chunk = read(handle, remaining)
File "/Users/ktse/miniconda3/envs/pytorch/lib/python3.9/multiprocessing/connection.py", line 419, in _recv_bytes
buf = self._recv(4)
File "/Users/ktse/miniconda3/envs/pytorch/lib/python3.9/multiprocessing/connection.py", line 384, in _recv
chunk = read(handle, remaining)
KeyboardInterrupt
KeyboardInterrupt
Exception ignored in: <function DataLoader2.__del__ at 0x7fc500214af0>
Traceback (most recent call last):
File "/Users/ktse/data/torchdata/dataloader2/dataloader2.py", line 134, in __del__
File "/Users/ktse/data/torchdata/dataloader2/dataloader2.py", line 142, in shutdown
File "/Users/ktse/data/torchdata/dataloader2/reading_service.py", line 208, in finalize
File "/Users/ktse/data/torchdata/dataloader2/reading_service.py", line 203, in clean_me
AttributeError: 'NoneType' object has no attribute 'TerminateRequest'
Exception ignored in: <function PrototypeMultiProcessingReadingService.__del__ at 0x7fc50020ef70>
Traceback (most recent call last):
File "/Users/ktse/data/torchdata/dataloader2/reading_service.py", line 196, in __del__
File "/Users/ktse/data/torchdata/dataloader2/reading_service.py", line 208, in finalize
File "/Users/ktse/data/torchdata/dataloader2/reading_service.py", line 203, in clean_me
AttributeError: 'NoneType' object has no attribute 'TerminateRequest'
Exception ignored in: <Finalize object, dead>
Traceback (most recent call last):
File "/Users/ktse/miniconda3/envs/pytorch/lib/python3.9/multiprocessing/util.py", line 224, in __call__
File "/Users/ktse/miniconda3/envs/pytorch/lib/python3.9/multiprocessing/queues.py", line 198, in _finalize_join
TypeError: 'NoneType' object is not callable
Exception ignored in: <Finalize object, dead>
Traceback (most recent call last):
File "/Users/ktse/miniconda3/envs/pytorch/lib/python3.9/multiprocessing/util.py", line 224, in __call__
File "/Users/ktse/miniconda3/envs/pytorch/lib/python3.9/multiprocessing/queues.py", line 198, in _finalize_join
TypeError: 'NoneType' object is not callable
Versions
main
cc: @VitalyFedyunin
If you call dl.shutdown()
at the end, is the problem still persistent?
If you call
dl.shutdown()
at the end, is the problem still persistent?
I can confirm that it does fix it. I think the issue is that reading_service.finalize()
never get called for PrototypeRS, such that process.join()
isn't called and it hangs.
I have considered calling process.join()
in finalize_iteration()
but that is not a good option.
What we might be able to do is to register all cleanup functions to 'atexit'. This technically should guarantee cleanup functions called before Python exits.