pycisTopic
pycisTopic copied to clipboard
export_pseudobulk [BUG]
Thanks for making this package available. I submitted a slurm job with this function:
bw_paths, bed_paths = export_pseudobulk(input_data=metadata,
variable='subclass_label_2',
sample_id_col='sample_id',
chromsizes=chromsizes,
bed_path=base_path+f'ATAC/pseudobulks/{region}/bed',
bigwig_path=base_path+f'ATAC/pseudobulks/{region}/bigwig',
path_to_fragments=fragments_dict,
n_cpu=16,
normalize_bigwig=True,
remove_duplicates=True,
_temp_dir=base_path+f'ATAC/pseudobulks/{region}/tmp',
split_pattern='-')
I've been waiting for this step for about 96h. The log file reads:
2023-01-20 19:40:23,007 cisTopic INFO Reading fragments from ...sample1/outs/atac_fragments.tsv.gz
2023-01-20 19:46:34,790 cisTopic INFO Reading fragments from ...sample2/outs/atac_fragments.tsv.gz
2023-01-20 19:52:13,172 cisTopic INFO Reading fragments from ...sample3/outs/atac_fragments.tsv.gz
2023-01-20 19:58:06,604 cisTopic INFO Reading fragments from ...sample4/outs/atac_fragments.tsv.gz
2023-01-20 20:05:07,534 cisTopic INFO Reading fragments from ...sample5/outs/atac_fragments.tsv.gz
2023-01-20 20:10:28,990 cisTopic INFO Reading fragments from ...sample6/outs/atac_fragments.tsv.gz
2023-01-20 20:16:22,397 cisTopic INFO Reading fragments from ...sample7/outs/atac_fragments.tsv.gz
2023-01-20 20:23:02,828 cisTopic INFO Reading fragments from ...sample8/outs/atac_fragments.tsv.gz
And nothing else -- hasn't changed in 96h. Inspecting the job, it's using 2% of only 1 of the 48 cores, and 60GB RAM (of 260GB provided).
The tutorial suggests it should launch Ray jobs or something, something like
(export_pseudobulk_ray pid=12108) 2022-08-08 18:30:07,800 cisTopic INFO Creating pseudobulk for AST
but I'm not seeing that in my output. Does anything stick out to you as a mistake here?
Hi @alexlenail !
Do you get any ray message at all? This message for instance:
2022-08-05 16:50:45,105 INFO services.py:1470 -- View the Ray dashboard at http://127.0.0.1:8265
If not could you try:
- Type ray.init()
- Set n_cpu=1
Can you also show how metadata looks like?
Thanks!
C
Thanks for the help debugging, @cbravo93 !
- I don't get a
View the Ray dashboard
message. - It works with
n_cpu=1
! -
ray.init
alone seems to work:

Metadata looks like:

When I use n_cpu=1 though, I get this error message:

Which I can probably fix by changing the chromsizes inputs. But that leads me to wonder: maybe these errors weren't properly bubbling up when you use Ray / n_jobs > 1
?
Traceback:
---------------------------------------------------------------------------
KeyboardInterrupt Traceback (most recent call last)
Cell In[42], line 1
----> 1 narrow_peaks_dict = peak_calling(macs_path,
2 bed_paths,
3 output_dir,
4 genome_size='mm',
5 n_cpu=8,
6 input_format='BEDPE',
7 shift=73,
8 ext_size=146,
9 keep_dup='all',
10 q_value=0.05,
11 _temp_dir='/home/gridsan/lenail/tmp/ray_spill')
File /data1/groups/neuroTF/tools/pycisTopic/pycisTopic/pseudobulk_peak_calling.py:413, in peak_calling(macs_path, bed_paths, outdir, genome_size, n_cpu, input_format, shift, ext_size, keep_dup, q_value, nolambda, **kwargs)
410 os.makedirs(outdir)
412 if n_cpu > 1:
--> 413 ray.init(num_cpus=n_cpu, **kwargs)
414 narrow_peaks = ray.get(
415 [
416 macs_call_peak_ray.remote(
(...)
430 ]
431 )
432 ray.shutdown()
File ~/.conda/envs/py39/lib/python3.9/site-packages/ray/_private/client_mode_hook.py:105, in client_mode_hook.<locals>.wrapper(*args, **kwargs)
103 if func.__name__ != "init" or is_client_mode_enabled_by_default:
104 return getattr(ray, func.__name__)(*args, **kwargs)
--> 105 return func(*args, **kwargs)
File ~/.conda/envs/py39/lib/python3.9/site-packages/ray/_private/worker.py:1439, in init(address, num_cpus, num_gpus, resources, object_store_memory, local_mode, ignore_reinit_error, include_dashboard, dashboard_host, dashboard_port, job_config, configure_logging, logging_level, logging_format, log_to_driver, namespace, runtime_env, storage, **kwargs)
1397 ray_params = ray._private.parameter.RayParams(
1398 node_ip_address=node_ip_address,
1399 raylet_ip_address=raylet_ip_address,
(...)
1433 node_name=_node_name,
1434 )
1435 # Start the Ray processes. We set shutdown_at_exit=False because we
1436 # shutdown the node in the ray.shutdown call that happens in the atexit
1437 # handler. We still spawn a reaper process in case the atexit handler
1438 # isn't called.
-> 1439 _global_node = ray._private.node.Node(
1440 head=True, shutdown_at_exit=False, spawn_reaper=True, ray_params=ray_params
1441 )
1442 else:
1443 # In this case, we are connecting to an existing cluster.
1444 if num_cpus is not None or num_gpus is not None:
File ~/.conda/envs/py39/lib/python3.9/site-packages/ray/_private/node.py:242, in Node.__init__(self, ray_params, head, shutdown_at_exit, spawn_reaper, connect_only)
235 self._plasma_store_socket_name = self._prepare_socket_file(
236 self._ray_params.plasma_store_socket_name, default_prefix="plasma_store"
237 )
238 self._raylet_socket_name = self._prepare_socket_file(
239 self._ray_params.raylet_socket_name, default_prefix="raylet"
240 )
--> 242 self.metrics_agent_port = self._get_cached_port(
243 "metrics_agent_port", default_port=ray_params.metrics_agent_port
244 )
245 self._metrics_export_port = self._get_cached_port(
246 "metrics_export_port", default_port=ray_params.metrics_export_port
247 )
249 ray_params.update_if_absent(
250 metrics_agent_port=self.metrics_agent_port,
251 metrics_export_port=self._metrics_export_port,
252 )
File ~/.conda/envs/py39/lib/python3.9/site-packages/ray/_private/node.py:801, in Node._get_cached_port(self, port_name, default_port)
798 # Maps a Node.unique_id to a dict that maps port names to port numbers.
799 ports_by_node: Dict[str, Dict[str, int]] = defaultdict(dict)
--> 801 with FileLock(file_path + ".lock"):
802 if not os.path.exists(file_path):
803 with open(file_path, "w") as f:
File ~/.conda/envs/py39/lib/python3.9/site-packages/filelock/_api.py:220, in BaseFileLock.__enter__(self)
214 def __enter__(self) -> BaseFileLock:
215 """
216 Acquire the lock.
217
218 :return: the lock object
219 """
--> 220 self.acquire()
221 return self
File ~/.conda/envs/py39/lib/python3.9/site-packages/filelock/_api.py:187, in BaseFileLock.acquire(self, timeout, poll_interval, poll_intervall, blocking)
185 msg = "Lock %s not acquired on %s, waiting %s seconds ..."
186 _LOGGER.debug(msg, lock_id, lock_filename, poll_interval)
--> 187 time.sleep(poll_interval)
188 except BaseException: # Something did go wrong, so decrement the counter.
189 with self._thread_lock:
KeyboardInterrupt:
The filesystem I'm using doesn't support file locking on the home directory. Am I reading correctly that Ray is trying to lock a file to write metrics to? Is there a way to disable that? Maybe one of the **kwargs
passed to ray.init()
?
https://docs.ray.io/en/latest/ray-core/package-ref.html
Hi Alex!
Can you put your _temp_dir path somewhere that is not home? That should fix the problem :)
C
@cbravo93 just a note, I'm also having difficulty using Ray. The issue looks very similar to what @alexlenail is posting, and it took me a while to troubleshoot.
-
import ray ; ray init()
works - I am using
tmp_dir = /tmp
(BTW, could you just use the tempfile builtin module ?)
However, I do get a different error that suggests nodes aren't starting up.
---------------------------------------------------------------------------
TimeoutError Traceback (most recent call last)
File /srv/conda/envs/saturn/lib/python3.9/site-packages/ray/_private/node.py:310, in Node.__init__(self, ray_params, head, shutdown_at_exit, spawn_reaper, connect_only)
309 try:
--> 310 ray._private.services.wait_for_node(
311 self.gcs_address,
312 self._plasma_store_socket_name,
313 )
314 except TimeoutError:
File /srv/conda/envs/saturn/lib/python3.9/site-packages/ray/_private/services.py:434, in wait_for_node(gcs_address, node_plasma_store_socket_name, timeout)
433 time.sleep(0.1)
--> 434 raise TimeoutError("Timed out while waiting for node to startup.")
TimeoutError: Timed out while waiting for node to startup.
During handling of the above exception, another exception occurred:
Exception Traceback (most recent call last)
Cell In[13], line 5
2 BW_PATHS_PICKLE = os.path.join(work_dir, 'scATAC/consensus_peak_calling/pseudobulk_bed_files/bw_paths.pkl')
4 if True:#not os.path.exists(BED_PATHS_PICKLE):
----> 5 bw_paths, bed_paths = export_pseudobulk(input_data = rna_obs_small,
6 variable = 'leiden', # variable by which to generate pseubulk profiles, in this case we want pseudobulks per celltype
7 sample_id_col = 'sample_id',
8 chromsizes = small_chromsizes,
9 bed_path = os.path.join(work_dir, 'scATAC/consensus_peak_calling/pseudobulk_bed_files/'), # specify where pseudobulk_bed_files should be stored
10 bigwig_path = os.path.join(work_dir, 'scATAC/consensus_peak_calling/pseudobulk_bw_files/'),# specify where pseudobulk_bw_files should be stored
11 path_to_fragments = {"cd34_multiome": "./data/smaller_fragments.tsv.gz"}, # location of fragment fiels
12 n_cpu = 16, # specify the number of cores to use, we use ray for multi processing
13 normalize_bigwig = True,
14 remove_duplicates = True,
15 _temp_dir = os.path.join(tmp_dir, 'ray_spill'),
16
17 )
19 pickle.dump(bed_paths,
20 open(os.path.join(work_dir, 'scATAC/consensus_peak_calling/pseudobulk_bed_files/bed_paths.pkl'), 'wb'))
21 pickle.dump(bw_paths,
22 open(os.path.join(work_dir, 'scATAC/consensus_peak_calling/pseudobulk_bed_files/bw_paths.pkl'), 'wb'))
File /srv/conda/envs/saturn/lib/python3.9/site-packages/pycisTopic/pseudobulk_peak_calling.py:165, in export_pseudobulk(input_data, variable, chromsizes, bed_path, bigwig_path, path_to_fragments, sample_id_col, n_cpu, normalize_bigwig, remove_duplicates, split_pattern, use_polars, **kwargs)
163 # Create pseudobulks
164 if n_cpu > 1:
--> 165 ray.init(num_cpus=n_cpu, **kwargs)
166 ray_handle = ray.wait(
167 [
168 export_pseudobulk_ray.remote(
(...)
182 num_returns=len(groups),
183 )
184 ray.shutdown()
File /srv/conda/envs/saturn/lib/python3.9/site-packages/ray/_private/client_mode_hook.py:105, in client_mode_hook.<locals>.wrapper(*args, **kwargs)
103 if func.__name__ != "init" or is_client_mode_enabled_by_default:
104 return getattr(ray, func.__name__)(*args, **kwargs)
--> 105 return func(*args, **kwargs)
File /srv/conda/envs/saturn/lib/python3.9/site-packages/ray/_private/worker.py:1439, in init(address, num_cpus, num_gpus, resources, object_store_memory, local_mode, ignore_reinit_error, include_dashboard, dashboard_host, dashboard_port, job_config, configure_logging, logging_level, logging_format, log_to_driver, namespace, runtime_env, storage, **kwargs)
1397 ray_params = ray._private.parameter.RayParams(
1398 node_ip_address=node_ip_address,
1399 raylet_ip_address=raylet_ip_address,
(...)
1433 node_name=_node_name,
1434 )
1435 # Start the Ray processes. We set shutdown_at_exit=False because we
1436 # shutdown the node in the ray.shutdown call that happens in the atexit
1437 # handler. We still spawn a reaper process in case the atexit handler
1438 # isn't called.
-> 1439 _global_node = ray._private.node.Node(
1440 head=True, shutdown_at_exit=False, spawn_reaper=True, ray_params=ray_params
1441 )
1442 else:
1443 # In this case, we are connecting to an existing cluster.
1444 if num_cpus is not None or num_gpus is not None:
File /srv/conda/envs/saturn/lib/python3.9/site-packages/ray/_private/node.py:315, in Node.__init__(self, ray_params, head, shutdown_at_exit, spawn_reaper, connect_only)
310 ray._private.services.wait_for_node(
311 self.gcs_address,
312 self._plasma_store_socket_name,
313 )
314 except TimeoutError:
--> 315 raise Exception(
316 "The current node has not been updated within 30 "
317 "seconds, this could happen because of some of "
318 "the Ray processes failed to startup."
319 )
320 node_info = ray._private.services.get_node_to_connect_for_driver(
321 self.gcs_address,
322 self._raylet_ip_address,
323 )
324 if self._ray_params.node_manager_port == 0:
Exception: The current node has not been updated within 30 seconds, this could happen because of some of the Ray processes failed to startup.
Hi @dburkhardt !
Can you paste your full command?. Can you try:
- ray.init()
- ray.init(n_cpu=2)
- ray.init(_temp_dir=XXX)
We normally do not work in /tmp in our system because you need some space for files to be written.
C
@dburkhardt can you check https://github.com/aertslab/scenicplus/issues/31 https://github.com/aertslab/scenicplus/issues/24 ?
Hi @dburkhardt !
Can you paste your full command?. Can you try:
* ray.init() * ray.init(n_cpu=2) * ray.init(_temp_dir=XXX)
We normally do not work in /tmp in our system because you need some space for files to be written.
C

These work
@dburkhardt can you check with n_cpu=1 and see if any error pops up? Can you check chromsizes match with the fragments file?
This does work with n_cpu=1
with correct output, and I can proceed with the rest of the analysis. The issue only comes up when trying to run with n_cpu>1
.
Does os.path.join(tmp_dir, 'ray_spill') exist? I just managed to reproduce the error with
ray.init(_temp_dir='blah')
Interesting, if I just omit _temp_dir
from the export_pseudobulk
command, then it works fine. It seems like there are multiple issues with manually specifying this parameter, rather than using the default.
Looking at the ray documentation (https://docs.ray.io/en/latest/ray-core/configure.html):
(There is not currently a stable way to change the root temporary directory when calling ray.init(), but if you need to, you can provide the _temp_dir argument to ray.init().)
So maybe best just to omit this?
If you dont define _temp_dir it will use /tmp. If you have limited space in /tmp I would recommend to specify _temp_dir, especially downstream. What is your value for os.path.join(tmp_dir, 'ray_spill')?
No, the temporary directory didn't exist before calling export_pseudobulk
. I wouldn't expect it to need to exist before, though, because it's temporary. You don't check for that before passing the _temp_dir
to ray
?
It is checked internally by ray. What is your value for os.path.join(tmp_dir, 'ray_spill')?
@cbravo93 did you run into this issue at all? https://github.com/ray-project/ray/issues/7724
when I set _temp_dir = '/state/partition1/user/lenail'
I get:
OSError: AF_UNIX path length cannot exceed 107 bytes: '/state/partition1/slurm_tmp/21322766.4294967291.0/ray/session_2023-02-02_13-15-02_941811_78876/sockets/plasma_store'
What do you set as _temp_dir
?
Hi,
I essentially noticed the same observation that were posted in this issue. To sum it up:
- when I remove _temp_dir an specify n_cpu>1, it sucessfully creates bed and bw files
- _temp_dir must exist prior to function call export_pseudobulk with n_cpu>1 or else it fails creating bed and bw files
- If _temp_dir is a "long path", an error is thrown
OSError: AF_UNIX path length cannot exceed 107 bytes:
. This is a known ray bug but doent seem to be resolve in version 2.4 https://github.com/ray-project/ray/issues/7724
Thank for all the comment in this issue. I agree with @dburkhardt, I think this directory should be automatically created when calling export_pseudobulk
Hi @cbravo93, thanks for developing this amazing tool!
I had a similar problem with ray and the export_pseudobulk function. In my case, I was not even able to initialize ray following the code you provided. Running:
import ray
ray.init()
Raised the following error:
2023-08-22 11:22:18,012 ERROR services.py:1207 -- Failed to start the dashboard , return code 1
2023-08-22 11:22:18,013 ERROR services.py:1232 -- Error should be written to 'dashboard.log' or 'dashboard.err'. We are printing the last 20 lines for you. See 'https://docs.ray.io/en/master/ray-observability/ray-logging.html#logging-directory-structure' to find where the log file is.
2023-08-22 11:22:18,014 ERROR services.py:1276 --
The last 20 lines of /scratch_tmp/10407352/ray/session_2023-08-22_11-22-13_701675_119199/logs/dashboard.log (it contains the error message from the dashboard):
File "/home/groups/singlecell/rmassoni/anaconda3/envs/scenicplus/lib/python3.8/importlib/__init__.py", line 127, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "<frozen importlib._bootstrap>", line 1014, in _gcd_import
File "<frozen importlib._bootstrap>", line 991, in _find_and_load
File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 671, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 843, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/home/groups/singlecell/rmassoni/anaconda3/envs/scenicplus/lib/python3.8/site-packages/ray/dashboard/modules/log/log_manager.py", line 8, in <module>
from ray.util.state.common import (
File "/home/groups/singlecell/rmassoni/anaconda3/envs/scenicplus/lib/python3.8/site-packages/ray/util/state/__init__.py", line 1, in <module>
from ray.util.state.api import (
File "/home/groups/singlecell/rmassoni/anaconda3/envs/scenicplus/lib/python3.8/site-packages/ray/util/state/api.py", line 17, in <module>
from ray.util.state.common import (
File "/home/groups/singlecell/rmassoni/anaconda3/envs/scenicplus/lib/python3.8/site-packages/ray/util/state/common.py", line 120, in <module>
@dataclass(init=True)
File "/home/groups/singlecell/rmassoni/anaconda3/envs/scenicplus/lib/python3.8/site-packages/pydantic/dataclasses.py", line 139, in dataclass
assert init is False, 'pydantic.dataclasses.dataclass only supports init=False'
AssertionError: pydantic.dataclasses.dataclass only supports init=False
2023-08-22 11:22:18,142 INFO worker.py:1636 -- Started a local Ray instance.
I checked this issue in the ray github, and the problem seems to be with the version 2 of pydantic. I downgraded pydantic as follows:
conda activate scenicplus
which python
~/anaconda3/envs/scenicplus/bin/python
~/anaconda3/envs/scenicplus/bin/pip install "pydantic<2"
And then everything worked. This should be fixed with the version 2.6 of pydantic.