sgkit icon indicating copy to clipboard operation
sgkit copied to clipboard

Write zarr to HDFS

Open LiangdeLI opened this issue 3 years ago • 9 comments

I have set up HDFS and can store(hdfs dfs -put) data to it, read dataset for Sgkit using load_dataset("hdfs://node0:9000/chr21.zarr"), but it seems that I cannot write the output or say the resulting dataset to HDFS.

I do it in this way:

sg.save_dataset(ds, "hdfs://node0:9000/sgkit_output.zarr")

But it does not work and then I also tried underlying to_zarr(), they have similar error messages:

hdfsOpenFile(/sgkit_output.zarr/variant_allele_frequency/110.0): FileSystem#open((Lorg/apache/hadoop/fs/Path;I)Lorg/apache/hadoop/fs/FSDataInputStream;) error:
RemoteException: File does not exist: /sgkit_output.zarr/variant_allele_frequency/110.0
        at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:86)
        at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:76)
        at org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.getBlockLocations(FSDirStatAndListingOp.java:156)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1990)
        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:768)
        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:442)
        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:528)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1086)
        at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1029)
        at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:957)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1762)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2957)
java.io.FileNotFoundException: File does not exist: /sgkit_output.zarr/variant_allele_frequency/110.0
        at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:86)
        at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:76)
        at org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.getBlockLocations(FSDirStatAndListingOp.java:156)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1990)
        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:768)
        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:442)
        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:528)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1086)
        at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1029)
        at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:957)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1762)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2957)

        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:121)
        at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:88)
        at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:871)
        at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:858)
        at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:847)
        at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1015)
        at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:322)
        at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:318)
        at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
        at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:330)
Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /sgkit_output.zarr/variant_allele_frequency/110.0
        at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:86)
        at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:76)
        at org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.getBlockLocations(FSDirStatAndListingOp.java:156)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1990)
        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:768)
        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:442)
        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:528)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1086)
        at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1029)
        at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:957)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1762)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2957)

        at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1562)
        at org.apache.hadoop.ipc.Client.call(Client.java:1508)
        at org.apache.hadoop.ipc.Client.call(Client.java:1405)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:233)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:118)
        at com.sun.proxy.$Proxy9.getBlockLocations(Unknown Source)
        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:327)
        at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
        at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
        at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
        at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
        at com.sun.proxy.$Proxy10.getBlockLocations(Unknown Source)
        at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:869)
        ... 7 more

and

Traceback (most recent call last):
  File "distributed_sgkit.py", line 33, in <module>
    sg.save_dataset(ds, "hdfs://node0:9000/sgkit_output.zarr")
  File "/users/Liangde/.local/lib/python3.8/site-packages/sgkit/io/dataset.py", line 41, in save_dataset
    ds.to_zarr(store, **kwargs)
  File "/users/Liangde/.local/lib/python3.8/site-packages/xarray/core/dataset.py", line 2037, in to_zarr
    return to_zarr(
  File "/users/Liangde/.local/lib/python3.8/site-packages/xarray/backends/api.py", line 1432, in to_zarr
    writes = writer.sync(compute=compute)
  File "/users/Liangde/.local/lib/python3.8/site-packages/xarray/backends/common.py", line 166, in sync
    delayed_store = da.store(
  File "/users/Liangde/.local/lib/python3.8/site-packages/dask/array/core.py", line 1118, in store
    compute_as_if_collection(Array, store_dsk, map_keys, **kwargs)
  File "/users/Liangde/.local/lib/python3.8/site-packages/dask/base.py", line 315, in compute_as_if_collection
    return schedule(dsk2, keys, **kwargs)
  File "/users/Liangde/.local/lib/python3.8/site-packages/dask/threaded.py", line 79, in get
    results = get_async(
  File "/users/Liangde/.local/lib/python3.8/site-packages/dask/local.py", line 507, in get_async
    raise_exception(exc, tb)
  File "/users/Liangde/.local/lib/python3.8/site-packages/dask/local.py", line 315, in reraise
    raise exc
  File "/users/Liangde/.local/lib/python3.8/site-packages/dask/local.py", line 220, in execute_task
    result = _execute_task(task, data)
  File "/users/Liangde/.local/lib/python3.8/site-packages/dask/core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/users/Liangde/.local/lib/python3.8/site-packages/dask/array/core.py", line 4098, in store_chunk
    return load_store_chunk(x, out, index, lock, return_stored, False)
  File "/users/Liangde/.local/lib/python3.8/site-packages/dask/array/core.py", line 4085, in load_store_chunk
    out[index] = x
  File "/users/Liangde/.local/lib/python3.8/site-packages/zarr/core.py", line 1224, in __setitem__
    self.set_basic_selection(selection, value, fields=fields)
  File "/users/Liangde/.local/lib/python3.8/site-packages/zarr/core.py", line 1319, in set_basic_selection
    return self._set_basic_selection_nd(selection, value, fields=fields)
  File "/users/Liangde/.local/lib/python3.8/site-packages/zarr/core.py", line 1610, in _set_basic_selection_nd
    self._set_selection(indexer, value, fields=fields)
  File "/users/Liangde/.local/lib/python3.8/site-packages/zarr/core.py", line 1682, in _set_selection
    self._chunk_setitems(lchunk_coords, lchunk_selection, chunk_values,
  File "/users/Liangde/.local/lib/python3.8/site-packages/zarr/core.py", line 1874, in _chunk_setitems
    self.chunk_store.setitems(values)
  File "/users/Liangde/.local/lib/python3.8/site-packages/fsspec/mapping.py", line 113, in setitems
    self.fs.pipe(values)
  File "/users/Liangde/.local/lib/python3.8/site-packages/fsspec/spec.py", line 705, in pipe
    self.pipe_file(self._strip_protocol(k), v, **kwargs)
  File "/users/Liangde/.local/lib/python3.8/site-packages/fsspec/spec.py", line 684, in pipe_file
    with self.open(path, "wb") as f:
  File "/users/Liangde/.local/lib/python3.8/site-packages/fsspec/spec.py", line 1030, in open
    f = self._open(
  File "/users/Liangde/.local/lib/python3.8/site-packages/fsspec/implementations/arrow.py", line 17, in wrapper
    return func(*args, **kwargs)
  File "/users/Liangde/.local/lib/python3.8/site-packages/fsspec/implementations/hdfs.py", line 210, in _open
    return HDFSFile(
  File "/users/Liangde/.local/lib/python3.8/site-packages/fsspec/implementations/hdfs.py", line 248, in __init__
    self.fh = fs.client.open(path, mode, block_size, **kwargs)
  File "pyarrow/_hdfsio.pyx", line 409, in pyarrow._hdfsio.HadoopFileSystem.open
  File "pyarrow/error.pxi", line 112, in pyarrow.lib.check_status
OSError: [Errno 255] Opening HDFS file '/sgkit_output.zarr/variant_allele_count/58.0' failed. Detail: [errno 255] Unknown error 255

LiangdeLI avatar Jan 08 '22 22:01 LiangdeLI

Also, for those files reported RemoteException: File does not exist:, I do can find it using command line hdfs dfs -ls ....

@eric-czech @tomwhite I found some of your examples, which are writing to gs using to_zarr(fsspec.get_mapper(path), mode="w", consolidated=True), but still got the same errors for me.

LiangdeLI avatar Jan 08 '22 22:01 LiangdeLI

@LiangdeLI why are you writing to HDFS? I don’t think that will be a common storage system for our users.

hammer avatar Jan 12 '22 14:01 hammer

  1. If you mean why HDFS? Because it seems to be the most easily get NFS for me, and we discussed it a bit in our Sep 16th meeting to use HDFS in benchmark.
  2. If you mean why writing? Because I use it to trigger computation(stop lazy evaluation).

I tried with no writing, and use persist() to trigger computation, for example in HWE, (with 80 cores on 2 machines, totally 320 GB memory, running 16 workers, each with 5 threads):

client = Client("xxx.xxx.xxx.xx:xxxx")
ds = sg.load_dataset("hdfs://node0:9000/1kg.zarr")
ds = sg.variant_stats(ds)
ds = sg.hardy_weinberg_test(ds, alleles=2)
ds = ds.sel(variants=((ds.variant_allele_frequency[:,1] > 0.01) & (ds.variant_hwe_p_value > 1e-6)))
dsHWE = client.persist(ds['variant_hwe_p_value'])
wait(dsHWE)

It fails and I can see from the dashboard that workers are using ~90% of memory. Workers have TimeoutError and CommClosedError errors, and scheduler removes those workers. I'm trying to tune the configurations in distributed.yaml to solve it, but still don't have luck yet.

How do you trigger computation when running on clusters?

LiangdeLI avatar Jan 17 '22 09:01 LiangdeLI

Does it work on a much smaller dataset?

Using persist should be fine. And if it works on a smaller dataset, then those problems are probably related to chunk sizes. You could try rechunking the genotypes array in 1kg.zarr to something smaller.

eric-czech avatar Jan 17 '22 10:01 eric-czech

Yes, it works on chromosome 21.

Currently, it has chunk size {"variants": 10000, "samples": 1000} as default values, I'm rechunking the variants dimension to 1000 to see what happens.

LiangdeLI avatar Jan 17 '22 21:01 LiangdeLI

It still occurs those timeout/comm errors, but not that many anymore. And it still loses 2~3 out of 16 workers while running, and the workers have messages:

distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 12.72 GiB -- Worker memory limit: 18.00 GiB

LiangdeLI avatar Jan 17 '22 22:01 LiangdeLI

So I found that workers are using worker.memory.pause percentage of memory and then stopped/frozen there, the CPU usage dropped to 4% and not making any more progress on tasks. Although worker.memory.pause means scheduler stops assigning new tasks to the worker once the worker has used this percentage of memory, I think worker should finish some assigned tasks and free the memory at this stage and be ready for new tasks.

LiangdeLI avatar Jan 18 '22 00:01 LiangdeLI

80 cores on 2 machines, totally 320 GB memory, running 16 workers, each with 5 threads

I would try doubling the amount of memory that each worker has until you no longer get memory issues. For example, try with 5, 2, then 1 threads_per_worker for 16 workers.

tomwhite avatar Jan 18 '22 08:01 tomwhite

Thanks. Sure, I can try allocating more memory for each worker, like 8 workers each with 10 threads to share 320 GB memory.

Maybe calling persist(ds['variant_hwe_p_value']) is using too much memory, is there any other ways to trigger computation but more memory efficient?

HWE on 1kg.zarr can run in single node (160 GB memory), using writing of output to trigger computation. I found dask's default single node scheduler is threaded scheduler, so when running sgkit on one machine, dask would have one worker and as many as ncores of threads?

LiangdeLI avatar Jan 19 '22 00:01 LiangdeLI