kgtk
kgtk copied to clipboard
Error in text-embedding multiprocessing
The command:
kgtk --debug text-embedding -i sample.tsv --model roberta-large-nli-mean-tokens \
--property-labels-file labels.en.tsv.gz \
--isa-properties P31 P279 P106 P39 P1382 P373 P452 \
--save-embedding-sentence -o wikidatadwd-text-embeddings-sample.tsv.gz \
--parallel 8
Error:
File "/data/amandeep/github/kgtk/kgtk/cli/text_embedding.py", line 249, in main
process.read_input(input_file_path=input_file_path,
File "/data/amandeep/github/kgtk/kgtk/gt/embedding_utils.py", line 528, in read_input
pp.task_done()
File "/nas/home/amandeep/miniconda3/envs/kgtk-env-ckg08/lib/python3.9/site-packages/pyrallel/parallel_processor.py", line 367, in task_done
self.mapper_queues[i].put((ParallelProcessor.CMD_STOP,))
File "/nas/home/amandeep/miniconda3/envs/kgtk-env-ckg08/lib/python3.9/site-packages/multiprocess/queues.py", line 91, in put
raise ValueError(f"Queue {self!r} is closed")
ValueError: Queue <multiprocess.queues.Queue object at 0x7fcb04f44e20> is closed
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/data/amandeep/github/kgtk/kgtk/exceptions.py", line 70, in __call__
return_code = func(*args, **kwargs) or 0
File "/data/amandeep/github/kgtk/kgtk/cli/text_embedding.py", line 398, in run
main(**kwargs)
File "/data/amandeep/github/kgtk/kgtk/cli/text_embedding.py", line 268, in main
raise KGTKException(str(e))
kgtk.exceptions.KGTKException: Queue <multiprocess.queues.Queue object at 0x7fcb04f44e20> is closed
Queue <multiprocess.queues.Queue object at 0x7fcb04f44e20> is closed
sys:1: ResourceWarning: unclosed file <_io.TextIOWrapper name='sample.tsv' mode='r' encoding='UTF-8'>
On the Pyrallel's end, if task_done()
and join()
are called, no more new data should be added (by calling add_task()
).
pp = ParallelProcessor(...)
pp.start()
for i in range(100):
pp.add_task(i)
# this makes the exception
if i == 20:
pp.task_done()
pp.join()
pp.task_done()
pp.join()
Running the above code throws:
Traceback (most recent call last):
File "t1.py", line 53, in <module>
pp.task_done()
File ".../pyrallel/pyrallel/parallel_processor.py", line 413, in task_done
self.mapper_queues[i].put((ParallelProcessor.CMD_STOP,))
File ".../envs/py38/lib/python3.8/site-packages/multiprocess/queues.py", line 85, in put
raise ValueError(f"Queue {self!r} is closed")
ValueError: Queue <multiprocess.queues.Queue object at 0x101fc45b0> is closed