scenicplus
scenicplus copied to clipboard
A Parallel calculation error with joblib when execute the export_pseudobulk function
Dear authors and developers, Thanks for all your work.
Describe the bug When I was reproducing the export_pseudobulk step of the tutorial (10x multiome pbmc), I encountered the problem that the .bed.gz file and .bw file could not be generated normally. This issue was resolved by replacing pseudobulk_peak_calling.py with pycisTopic_improve/pycisTopic/python/pycisTopic/pseudobulk_peak_calling.py, as suggested in issue #277.
However, a new CPU parallel computing error appeared, which seems to be related to the joblib package. Specifically, when the parameter n_cpu of the export_pseudobulk function is greater than 1, an error will occur. When n_cpu=1, the function can run normally and output .bed.gz files and .bw files.
To Reproduce Commands relevant to reproduce the error.
from pycisTopic.pseudobulk_peak_calling import export_pseudobulk
bw_paths, bed_paths = export_pseudobulk(input_data = cell_data,
variable = 'celltype', # variable by which to generate pseubulk profiles, in this case we want pseudobulks per celltype
sample_id_col = 'sample_id',
chromsizes = chromsizes,
bed_path = os.path.join(work_dir, 'test/consensus_peak_calling/pseudobulk_bed_files/'), # specify where pseudobulk_bed_files should be stored
bigwig_path = os.path.join(work_dir, 'test/consensus_peak_calling/pseudobulk_bw_files/'),# specify where pseudobulk_bw_files should be stored
path_to_fragments = fragments_dict,
n_cpu = 8, # specify the number of cores to use, we use ray for multi processing
normalize_bigwig = True,
remove_duplicates = True,
#_temp_dir = tmp_dir,
split_pattern = '-')
Error output Paste the entire output of the command, including log information prior to the error.
Reading fragments for 10x_pbmc.
from: pbmc_tutorial/data/pbmc_granulocyte_sorted_3k_atac_fragments.tsv.gz
---------------------------------------------------------------------------
_RemoteTraceback Traceback (most recent call last)
_RemoteTraceback:
"""
Traceback (most recent call last):
File "/home/wangyue/software/miniconda3/envs/scenicplus/lib/python3.8/site-packages/joblib/externals/loky/process_executor.py", line 426, in _process_worker
call_item = call_queue.get(block=True, timeout=timeout)
File "/home/wangyue/software/miniconda3/envs/scenicplus/lib/python3.8/multiprocessing/queues.py", line 116, in get
return _ForkingPickler.loads(res)
File "/home/wangyue/software/miniconda3/envs/scenicplus/lib/python3.8/site-packages/pycisTopic/pseudobulk_peak_calling.py", line 10, in <module>
import pandas as pd
File "/home/wangyue/software/miniconda3/envs/scenicplus/lib/python3.8/site-packages/pandas/__init__.py", line 48, in <module>
from pandas.core.api import (
File "/home/wangyue/software/miniconda3/envs/scenicplus/lib/python3.8/site-packages/pandas/core/api.py", line 47, in <module>
from pandas.core.groupby import (
File "/home/wangyue/software/miniconda3/envs/scenicplus/lib/python3.8/site-packages/pandas/core/groupby/__init__.py", line 1, in <module>
from pandas.core.groupby.generic import (
File "/home/wangyue/software/miniconda3/envs/scenicplus/lib/python3.8/site-packages/pandas/core/groupby/generic.py", line 77, in <module>
from pandas.core.frame import DataFrame
File "/home/wangyue/software/miniconda3/envs/scenicplus/lib/python3.8/site-packages/pandas/core/frame.py", line 182, in <module>
from pandas.core.generic import NDFrame
File "/home/wangyue/software/miniconda3/envs/scenicplus/lib/python3.8/site-packages/pandas/core/generic.py", line 179, in <module>
from pandas.core.window import (
File "/home/wangyue/software/miniconda3/envs/scenicplus/lib/python3.8/site-packages/pandas/core/window/__init__.py", line 1, in <module>
from pandas.core.window.ewm import (
File "/home/wangyue/software/miniconda3/envs/scenicplus/lib/python3.8/site-packages/pandas/core/window/ewm.py", line 11, in <module>
import pandas._libs.window.aggregations as window_aggregations
ImportError: /lib/x86_64-linux-gnu/libstdc++.so.6: version `GLIBCXX_3.4.29' not found (required by /home/wangyue/software/miniconda3/envs/scenicplus/lib/python3.8/site-packages/pandas/_libs/window/aggregations.cpython-38-x86_64-linux-gnu.so)
"""
The above exception was the direct cause of the following exception:
BrokenProcessPool Traceback (most recent call last)
Cell In[4], line 2
1 from pycisTopic.pseudobulk_peak_calling import export_pseudobulk
----> 2 bw_paths, bed_paths = export_pseudobulk(input_data = cell_data,
3 variable = 'celltype', # variable by which to generate pseubulk profiles, in this case we want pseudobulks per celltype
4 sample_id_col = 'sample_id',
5 chromsizes = chromsizes,
6 bed_path = os.path.join(work_dir, 'test/consensus_peak_calling/pseudobulk_bed_files/'), # specify where pseudobulk_bed_files should be stored
7 bigwig_path = os.path.join(work_dir, 'test/consensus_peak_calling/pseudobulk_bw_files/'),# specify where pseudobulk_bw_files should be stored
8 path_to_fragments = fragments_dict,
9 n_cpu = 8, # specify the number of cores to use, we use ray for multi processing
10 normalize_bigwig = True,
11 remove_duplicates = True,
12 #_temp_dir = tmp_dir,
13 split_pattern = '-')
File ~/software/miniconda3/envs/scenicplus/lib/python3.8/site-packages/pycisTopic/pseudobulk_peak_calling.py:210, in export_pseudobulk(input_data, variable, chromsizes, bed_path, bigwig_path, path_to_fragments, sample_id_col, n_cpu, normalize_bigwig, remove_duplicates, split_pattern)
203 _sample_cell_data["barcode"] = prepare_tag_cells(
204 _sample_cell_data.index.tolist(), split_pattern
205 )
206 _cell_type_to_cell_barcodes = _sample_cell_data \
207 .groupby(variable, group_keys=False)["barcode"] \
208 .apply(set) \
209 .to_dict()
--> 210 _cell_type_to_fragments = _get_fragments_for_cell_barcodes(
211 path_to_fragments[sample], _cell_type_to_cell_barcodes, n_cores=n_cpu
212 )
213 for cell_type in _cell_type_to_fragments.keys():
214 cell_type_to_fragments_all_samples[cell_type].extend(
215 _cell_type_to_fragments[cell_type]
216 )
File ~/software/miniconda3/envs/scenicplus/lib/python3.8/site-packages/pycisTopic/pseudobulk_peak_calling.py:68, in _get_fragments_for_cell_barcodes(path_to_fragments, cell_type_to_cell_barcodes, n_cores)
66 contigs = tbx.contigs
67 tbx.close()
---> 68 cell_type_to_fragments_per_contig = joblib.Parallel(n_jobs=n_cores)(
69 joblib.delayed(_get_fragments_for_cell_barcodes_single_contig)(
70 path_to_fragments, contig, cell_type_to_cell_barcodes
71 )
72 for contig in contigs
73 )
74 cell_type_to_fragments = {
75 cell_type: [] for cell_type in cell_type_to_cell_barcodes.keys()
76 }
77 for i in range(len(cell_type_to_fragments_per_contig)):
File ~/software/miniconda3/envs/scenicplus/lib/python3.8/site-packages/joblib/parallel.py:1952, in Parallel.__call__(self, iterable)
1946 # The first item from the output is blank, but it makes the interpreter
1947 # progress until it enters the Try/Except block of the generator and
1948 # reach the first `yield` statement. This starts the aynchronous
1949 # dispatch of the tasks to the workers.
1950 next(output)
-> 1952 return output if self.return_generator else list(output)
File ~/software/miniconda3/envs/scenicplus/lib/python3.8/site-packages/joblib/parallel.py:1595, in Parallel._get_outputs(self, iterator, pre_dispatch)
1592 yield
1594 with self._backend.retrieval_context():
-> 1595 yield from self._retrieve()
1597 except GeneratorExit:
1598 # The generator has been garbage collected before being fully
1599 # consumed. This aborts the remaining tasks if possible and warn
1600 # the user if necessary.
1601 self._exception = True
File ~/software/miniconda3/envs/scenicplus/lib/python3.8/site-packages/joblib/parallel.py:1699, in Parallel._retrieve(self)
1692 while self._wait_retrieval():
1693
1694 # If the callback thread of a worker has signaled that its task
1695 # triggered an exception, or if the retrieval loop has raised an
1696 # exception (e.g. `GeneratorExit`), exit the loop and surface the
1697 # worker traceback.
1698 if self._aborting:
-> 1699 self._raise_error_fast()
1700 break
1702 # If the next job is not ready for retrieval yet, we just wait for
1703 # async callbacks to progress.
File ~/software/miniconda3/envs/scenicplus/lib/python3.8/site-packages/joblib/parallel.py:1734, in Parallel._raise_error_fast(self)
1730 # If this error job exists, immediatly raise the error by
1731 # calling get_result. This job might not exists if abort has been
1732 # called directly or if the generator is gc'ed.
1733 if error_job is not None:
-> 1734 error_job.get_result(self.timeout)
File ~/software/miniconda3/envs/scenicplus/lib/python3.8/site-packages/joblib/parallel.py:736, in BatchCompletionCallBack.get_result(self, timeout)
730 backend = self.parallel._backend
732 if backend.supports_retrieve_callback:
733 # We assume that the result has already been retrieved by the
734 # callback thread, and is stored internally. It's just waiting to
735 # be returned.
--> 736 return self._return_or_raise()
738 # For other backends, the main thread needs to run the retrieval step.
739 try:
File ~/software/miniconda3/envs/scenicplus/lib/python3.8/site-packages/joblib/parallel.py:754, in BatchCompletionCallBack._return_or_raise(self)
752 try:
753 if self.status == TASK_ERROR:
--> 754 raise self._result
755 return self._result
756 finally:
BrokenProcessPool: A task has failed to un-serialize. Please ensure that the arguments of the function are all picklable.
Version (please complete the following information):
- Python: [Python 3.8.18]
- SCENIC+: [1.0.1.dev6+ge5ba6fc.d20240107]
- pycistarget: [1.0.3.dev1+g3fde1ce]
- pycistopic: [1.0.3.dev21+ge9b0e1a.d20240109]
- pycistarget: [1.0.3.dev1+g3fde1ce]
- ray: [2.9.0]
- joblib: [1.3.2]