data icon indicating copy to clipboard operation
data copied to clipboard

`PrototypeMultiProcessingReadingService` does not properly terminate on MacOS

Open NivekT opened this issue 1 year ago • 1 comments

🐛 Describe the bug

Running the code snippet below in MacOS will print the correct values but will not automatically terminate afterwards without manual interruption. I don't think it is related to the recent prefetch update.

from torchdata.datapipes.iter import IterableWrapper
from torchdata.dataloader2 import DataLoader2
from torchdata.dataloader2.reading_service import PrototypeMultiProcessingReadingService

if __name__ == '__main__':
    dp = IterableWrapper([0, 1, 2, 3, 4])
    rs = PrototypeMultiProcessingReadingService(num_workers=2)
    dl = DataLoader2(datapipe=dp, reading_service=rs)
    for x in dl:
        print(x)
    print("End")

Trace:

0
0
1
1
2
2
3
3
4
4
End
^CError in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/Users/ktse/miniconda3/envs/pytorch/lib/python3.9/multiprocessing/popen_fork.py", line 27, in poll
Process SpawnProcess-2:
Process SpawnProcess-1:
    pid, sts = os.waitpid(self.pid, flag)
KeyboardInterrupt
Traceback (most recent call last):
Traceback (most recent call last):
  File "/Users/ktse/miniconda3/envs/pytorch/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/Users/ktse/miniconda3/envs/pytorch/lib/python3.9/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/ktse/data/torchdata/dataloader2/communication/eventloop.py", line 49, in DataPipeToQueuesLoop
    for _ in pipe_type.DataPipeBehindQueues(
  File "/Users/ktse/miniconda3/envs/pytorch/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/Users/ktse/data/torchdata/dataloader2/communication/iter.py", line 121, in DataPipeBehindQueues
    request = protocol.get_new_request(block=blocking_request_get)
  File "/Users/ktse/miniconda3/envs/pytorch/lib/python3.9/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/ktse/data/torchdata/dataloader2/communication/protocol.py", line 66, in get_new_request
    response = self.request_queue.get(block=block)
  File "/Users/ktse/data/torchdata/dataloader2/communication/eventloop.py", line 49, in DataPipeToQueuesLoop
    for _ in pipe_type.DataPipeBehindQueues(
  File "/Users/ktse/data/torchdata/dataloader2/communication/iter.py", line 121, in DataPipeBehindQueues
    request = protocol.get_new_request(block=blocking_request_get)
  File "/Users/ktse/data/torchdata/dataloader2/communication/protocol.py", line 66, in get_new_request
    response = self.request_queue.get(block=block)
  File "/Users/ktse/miniconda3/envs/pytorch/lib/python3.9/multiprocessing/queues.py", line 103, in get
    res = self._recv_bytes()
  File "/Users/ktse/miniconda3/envs/pytorch/lib/python3.9/multiprocessing/connection.py", line 221, in recv_bytes
    buf = self._recv_bytes(maxlength)
  File "/Users/ktse/miniconda3/envs/pytorch/lib/python3.9/multiprocessing/queues.py", line 103, in get
    res = self._recv_bytes()
  File "/Users/ktse/miniconda3/envs/pytorch/lib/python3.9/multiprocessing/connection.py", line 419, in _recv_bytes
    buf = self._recv(4)
  File "/Users/ktse/miniconda3/envs/pytorch/lib/python3.9/multiprocessing/connection.py", line 221, in recv_bytes
    buf = self._recv_bytes(maxlength)
  File "/Users/ktse/miniconda3/envs/pytorch/lib/python3.9/multiprocessing/connection.py", line 384, in _recv
    chunk = read(handle, remaining)
  File "/Users/ktse/miniconda3/envs/pytorch/lib/python3.9/multiprocessing/connection.py", line 419, in _recv_bytes
    buf = self._recv(4)
  File "/Users/ktse/miniconda3/envs/pytorch/lib/python3.9/multiprocessing/connection.py", line 384, in _recv
    chunk = read(handle, remaining)
KeyboardInterrupt
KeyboardInterrupt
Exception ignored in: <function DataLoader2.__del__ at 0x7fc500214af0>
Traceback (most recent call last):
  File "/Users/ktse/data/torchdata/dataloader2/dataloader2.py", line 134, in __del__
  File "/Users/ktse/data/torchdata/dataloader2/dataloader2.py", line 142, in shutdown
  File "/Users/ktse/data/torchdata/dataloader2/reading_service.py", line 208, in finalize
  File "/Users/ktse/data/torchdata/dataloader2/reading_service.py", line 203, in clean_me
AttributeError: 'NoneType' object has no attribute 'TerminateRequest'
Exception ignored in: <function PrototypeMultiProcessingReadingService.__del__ at 0x7fc50020ef70>
Traceback (most recent call last):
  File "/Users/ktse/data/torchdata/dataloader2/reading_service.py", line 196, in __del__
  File "/Users/ktse/data/torchdata/dataloader2/reading_service.py", line 208, in finalize
  File "/Users/ktse/data/torchdata/dataloader2/reading_service.py", line 203, in clean_me
AttributeError: 'NoneType' object has no attribute 'TerminateRequest'
Exception ignored in: <Finalize object, dead>
Traceback (most recent call last):
  File "/Users/ktse/miniconda3/envs/pytorch/lib/python3.9/multiprocessing/util.py", line 224, in __call__
  File "/Users/ktse/miniconda3/envs/pytorch/lib/python3.9/multiprocessing/queues.py", line 198, in _finalize_join
TypeError: 'NoneType' object is not callable
Exception ignored in: <Finalize object, dead>
Traceback (most recent call last):
  File "/Users/ktse/miniconda3/envs/pytorch/lib/python3.9/multiprocessing/util.py", line 224, in __call__
  File "/Users/ktse/miniconda3/envs/pytorch/lib/python3.9/multiprocessing/queues.py", line 198, in _finalize_join
TypeError: 'NoneType' object is not callable

Versions

main

cc: @VitalyFedyunin

NivekT avatar Oct 04 '22 20:10 NivekT

If you call dl.shutdown() at the end, is the problem still persistent?

ejguan avatar Oct 14 '22 18:10 ejguan

If you call dl.shutdown() at the end, is the problem still persistent?

I can confirm that it does fix it. I think the issue is that reading_service.finalize() never get called for PrototypeRS, such that process.join() isn't called and it hangs.

I have considered calling process.join() in finalize_iteration() but that is not a good option.

NivekT avatar Oct 17 '22 21:10 NivekT

What we might be able to do is to register all cleanup functions to 'atexit'. This technically should guarantee cleanup functions called before Python exits.

ejguan avatar Oct 18 '22 13:10 ejguan