sgkit
sgkit copied to clipboard
Write zarr to HDFS
I have set up HDFS and can store(hdfs dfs -put) data to it, read dataset for Sgkit using load_dataset("hdfs://node0:9000/chr21.zarr"), but it seems that I cannot write the output or say the resulting dataset to HDFS.
I do it in this way:
sg.save_dataset(ds, "hdfs://node0:9000/sgkit_output.zarr")
But it does not work and then I also tried underlying to_zarr(), they have similar error messages:
hdfsOpenFile(/sgkit_output.zarr/variant_allele_frequency/110.0): FileSystem#open((Lorg/apache/hadoop/fs/Path;I)Lorg/apache/hadoop/fs/FSDataInputStream;) error:
RemoteException: File does not exist: /sgkit_output.zarr/variant_allele_frequency/110.0
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:86)
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:76)
at org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.getBlockLocations(FSDirStatAndListingOp.java:156)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1990)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:768)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:442)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:528)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1086)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1029)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:957)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1762)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2957)
java.io.FileNotFoundException: File does not exist: /sgkit_output.zarr/variant_allele_frequency/110.0
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:86)
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:76)
at org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.getBlockLocations(FSDirStatAndListingOp.java:156)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1990)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:768)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:442)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:528)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1086)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1029)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:957)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1762)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2957)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:121)
at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:88)
at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:871)
at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:858)
at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:847)
at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1015)
at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:322)
at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:318)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:330)
Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /sgkit_output.zarr/variant_allele_frequency/110.0
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:86)
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:76)
at org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.getBlockLocations(FSDirStatAndListingOp.java:156)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1990)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:768)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:442)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:528)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1086)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1029)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:957)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1762)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2957)
at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1562)
at org.apache.hadoop.ipc.Client.call(Client.java:1508)
at org.apache.hadoop.ipc.Client.call(Client.java:1405)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:233)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:118)
at com.sun.proxy.$Proxy9.getBlockLocations(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:327)
at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
at com.sun.proxy.$Proxy10.getBlockLocations(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:869)
... 7 more
and
Traceback (most recent call last):
File "distributed_sgkit.py", line 33, in <module>
sg.save_dataset(ds, "hdfs://node0:9000/sgkit_output.zarr")
File "/users/Liangde/.local/lib/python3.8/site-packages/sgkit/io/dataset.py", line 41, in save_dataset
ds.to_zarr(store, **kwargs)
File "/users/Liangde/.local/lib/python3.8/site-packages/xarray/core/dataset.py", line 2037, in to_zarr
return to_zarr(
File "/users/Liangde/.local/lib/python3.8/site-packages/xarray/backends/api.py", line 1432, in to_zarr
writes = writer.sync(compute=compute)
File "/users/Liangde/.local/lib/python3.8/site-packages/xarray/backends/common.py", line 166, in sync
delayed_store = da.store(
File "/users/Liangde/.local/lib/python3.8/site-packages/dask/array/core.py", line 1118, in store
compute_as_if_collection(Array, store_dsk, map_keys, **kwargs)
File "/users/Liangde/.local/lib/python3.8/site-packages/dask/base.py", line 315, in compute_as_if_collection
return schedule(dsk2, keys, **kwargs)
File "/users/Liangde/.local/lib/python3.8/site-packages/dask/threaded.py", line 79, in get
results = get_async(
File "/users/Liangde/.local/lib/python3.8/site-packages/dask/local.py", line 507, in get_async
raise_exception(exc, tb)
File "/users/Liangde/.local/lib/python3.8/site-packages/dask/local.py", line 315, in reraise
raise exc
File "/users/Liangde/.local/lib/python3.8/site-packages/dask/local.py", line 220, in execute_task
result = _execute_task(task, data)
File "/users/Liangde/.local/lib/python3.8/site-packages/dask/core.py", line 119, in _execute_task
return func(*(_execute_task(a, cache) for a in args))
File "/users/Liangde/.local/lib/python3.8/site-packages/dask/array/core.py", line 4098, in store_chunk
return load_store_chunk(x, out, index, lock, return_stored, False)
File "/users/Liangde/.local/lib/python3.8/site-packages/dask/array/core.py", line 4085, in load_store_chunk
out[index] = x
File "/users/Liangde/.local/lib/python3.8/site-packages/zarr/core.py", line 1224, in __setitem__
self.set_basic_selection(selection, value, fields=fields)
File "/users/Liangde/.local/lib/python3.8/site-packages/zarr/core.py", line 1319, in set_basic_selection
return self._set_basic_selection_nd(selection, value, fields=fields)
File "/users/Liangde/.local/lib/python3.8/site-packages/zarr/core.py", line 1610, in _set_basic_selection_nd
self._set_selection(indexer, value, fields=fields)
File "/users/Liangde/.local/lib/python3.8/site-packages/zarr/core.py", line 1682, in _set_selection
self._chunk_setitems(lchunk_coords, lchunk_selection, chunk_values,
File "/users/Liangde/.local/lib/python3.8/site-packages/zarr/core.py", line 1874, in _chunk_setitems
self.chunk_store.setitems(values)
File "/users/Liangde/.local/lib/python3.8/site-packages/fsspec/mapping.py", line 113, in setitems
self.fs.pipe(values)
File "/users/Liangde/.local/lib/python3.8/site-packages/fsspec/spec.py", line 705, in pipe
self.pipe_file(self._strip_protocol(k), v, **kwargs)
File "/users/Liangde/.local/lib/python3.8/site-packages/fsspec/spec.py", line 684, in pipe_file
with self.open(path, "wb") as f:
File "/users/Liangde/.local/lib/python3.8/site-packages/fsspec/spec.py", line 1030, in open
f = self._open(
File "/users/Liangde/.local/lib/python3.8/site-packages/fsspec/implementations/arrow.py", line 17, in wrapper
return func(*args, **kwargs)
File "/users/Liangde/.local/lib/python3.8/site-packages/fsspec/implementations/hdfs.py", line 210, in _open
return HDFSFile(
File "/users/Liangde/.local/lib/python3.8/site-packages/fsspec/implementations/hdfs.py", line 248, in __init__
self.fh = fs.client.open(path, mode, block_size, **kwargs)
File "pyarrow/_hdfsio.pyx", line 409, in pyarrow._hdfsio.HadoopFileSystem.open
File "pyarrow/error.pxi", line 112, in pyarrow.lib.check_status
OSError: [Errno 255] Opening HDFS file '/sgkit_output.zarr/variant_allele_count/58.0' failed. Detail: [errno 255] Unknown error 255
Also, for those files reported RemoteException: File does not exist:, I do can find it using command line hdfs dfs -ls ....
@eric-czech @tomwhite I found some of your examples, which are writing to gs using to_zarr(fsspec.get_mapper(path), mode="w", consolidated=True), but still got the same errors for me.
@LiangdeLI why are you writing to HDFS? I don’t think that will be a common storage system for our users.
- If you mean why HDFS? Because it seems to be the most easily get NFS for me, and we discussed it a bit in our Sep 16th meeting to use HDFS in benchmark.
- If you mean why writing? Because I use it to trigger computation(stop lazy evaluation).
I tried with no writing, and use persist() to trigger computation, for example in HWE, (with 80 cores on 2 machines, totally 320 GB memory, running 16 workers, each with 5 threads):
client = Client("xxx.xxx.xxx.xx:xxxx")
ds = sg.load_dataset("hdfs://node0:9000/1kg.zarr")
ds = sg.variant_stats(ds)
ds = sg.hardy_weinberg_test(ds, alleles=2)
ds = ds.sel(variants=((ds.variant_allele_frequency[:,1] > 0.01) & (ds.variant_hwe_p_value > 1e-6)))
dsHWE = client.persist(ds['variant_hwe_p_value'])
wait(dsHWE)
It fails and I can see from the dashboard that workers are using ~90% of memory. Workers have TimeoutError and CommClosedError errors, and scheduler removes those workers. I'm trying to tune the configurations in distributed.yaml to solve it, but still don't have luck yet.
How do you trigger computation when running on clusters?
Does it work on a much smaller dataset?
Using persist should be fine. And if it works on a smaller dataset, then those problems are probably related to chunk sizes. You could try rechunking the genotypes array in 1kg.zarr to something smaller.
Yes, it works on chromosome 21.
Currently, it has chunk size {"variants": 10000, "samples": 1000} as default values, I'm rechunking the variants dimension to 1000 to see what happens.
It still occurs those timeout/comm errors, but not that many anymore. And it still loses 2~3 out of 16 workers while running, and the workers have messages:
distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 12.72 GiB -- Worker memory limit: 18.00 GiB
So I found that workers are using worker.memory.pause percentage of memory and then stopped/frozen there, the CPU usage dropped to 4% and not making any more progress on tasks. Although worker.memory.pause means scheduler stops assigning new tasks to the worker once the worker has used this percentage of memory, I think worker should finish some assigned tasks and free the memory at this stage and be ready for new tasks.
80 cores on 2 machines, totally 320 GB memory, running 16 workers, each with 5 threads
I would try doubling the amount of memory that each worker has until you no longer get memory issues. For example, try with 5, 2, then 1 threads_per_worker for 16 workers.
Thanks. Sure, I can try allocating more memory for each worker, like 8 workers each with 10 threads to share 320 GB memory.
Maybe calling persist(ds['variant_hwe_p_value']) is using too much memory, is there any other ways to trigger computation but more memory efficient?
HWE on 1kg.zarr can run in single node (160 GB memory), using writing of output to trigger computation. I found dask's default single node scheduler is threaded scheduler, so when running sgkit on one machine, dask would have one worker and as many as ncores of threads?