sgkit icon indicating copy to clipboard operation
sgkit copied to clipboard

Investigate ragged arrays to represent alleles

Open tomwhite opened this issue 4 years ago • 6 comments

Alleles are a challenge to represent efficiently in fixed-length arrays. There are a couple of problems:

  1. the number of alleles is not known until the whole VCF file has been processed
  2. there can be a very wide variation in the number of alt alleles (most variants will have one, but a few could have thousands

Both these problems could be solved by using ragged arrays.

Zarr has support for ragged arrays, but these don't currently work with variable length strings (needed for alleles), and they don't fit the Xarray data model, which assumes fixed sized dimensions. There is a good discussion of the problem in https://github.com/pydata/xarray/issues/4285, in the context of Awkward Array.

tomwhite avatar Jul 19 '21 09:07 tomwhite

Tricky one. I guess an option we should consider also is a home-grown ragged array encoding for alleles (although it would be pretty icky to expose users to this). Could we (in principle) do an encoding ourselves and expose the alleles dataset variable as a numpy object array (or similar)?

jeromekelleher avatar Jul 19 '21 09:07 jeromekelleher

Could we (in principle) do an encoding ourselves and expose the alleles dataset variable as a numpy object array (or similar)?

Yes, I think it would be worth trying this out to see if it's possible, and how it works in Xarray.

tomwhite avatar Jul 19 '21 10:07 tomwhite

FWIW we've been doing this in tskit like this. I think the underlying encoding (a data and offset array) is basically the same as what Arrow uses, and it works well in practise.

jeromekelleher avatar Jul 19 '21 10:07 jeromekelleher

FYI @tomwhite. Following today's "standup", "open sourcing" some of our internal comments about awkward array, these are likely outdated (they were made about a year ago) and might not fully apply in the sgkit context. It might be still useful tho.

From @eric-czech:

There aren't any docs for awkward-1.0 yet, just empty stubs at https://awkward-array.org, so I looked at the original project instead. Here are a few questions I wanted to answer:

  • How do we parallelize with awkward?

It looks like dask.Bag (or even just delayed) wrapping awkward would make the most sense. I was hoping initially that it would be possible for awkward to act on dask arrays, but it just converts everything to numpy afaict:

import pyarrow, awkward
import dask.array as da
arrow_buffer = pyarrow.ipc.open_file(open("tests/samples/exoplanets.arrow", "rb")).get_batch(0)
stars = awkward.fromarrow(arrow_buffer)
stars['id'] = da.arange(len(stars), chunks=3)
type(stars['id']) # -> numpy.ndarray

That would then mean that we'd have to use the awkward api within partitions, which would be like using Pandas without Dask.DataFrame. Have you found a deeper way to integrate the two @ravwojdyla? This looks promising in 1.0: https://awkward-array.org/how-to-create-lazy-dask.html but I'm not sure what to expect once they add anything there.

  • How do we infer the schema for a table stored as json objects in partitioned files?

I'm not sure on this one -- it looks we can get a layout for one file, but I don't know how we'd merge this across partitions:

import pyarrow, awkward
arrow_buffer = pyarrow.ipc.open_file(open("tests/samples/exoplanets.arrow", "rb")).get_batch(0)
stars = awkward.fromarrow(arrow_buffer)
stars.layout
 layout 
[           ()] Table(dec=layout[0], dist=layout[1], mass=layout[2], name=layout[3], planets=layout[4], ra=layout[5], radius=layout[6])
[            0]   ndarray(shape=2935, dtype=dtype('float64'))
[            1]   ndarray(shape=2935, dtype=dtype('float64'))
[            2]   ndarray(shape=2935, dtype=dtype('float64'))
[            3]   StringArray(content=layout[3, 0], generator=<function tostring at 0x11510bd30>, args=(<function decode at 0x1011ae9d0>,), kwargs={})
[         3, 0]     JaggedArray(starts=layout[3, 0, 0], stops=layout[3, 0, 1], content=layout[3, 0, 2])
[      3, 0, 0]       ndarray(shape=2935, dtype=dtype('int32'))
[      3, 0, 1]       ndarray(shape=2935, dtype=dtype('int32'))
...

I imagine there's a function somewhere in the API that would let us get the schema like the one that is saved in HDF5 metadata, but I can't find it.

import h5py, json
f = h5py.File("stars.hdf5", "w")
g = awkward.hdf5(f)
g['stars'] = stars
f.close()
f = h5py.File("stars.hdf5", "r")
json.loads(f["stars"]["schema.json"][:].tostring())['schema']
{'call': ['awkward', 'Table', 'frompairs'],
 'args': [{'pairs': [['dec',
     {'call': ['awkward', 'numpy', 'frombuffer'],
      'args': [{'read': '1'}, {'dtype': 'float64'}, {'json': 2935, 'id': 2}],
      'id': 1}],
    ['dist',
     {'call': ['awkward', 'numpy', 'frombuffer'],
      'args': [{'read': '3'}, {'dtype': 'float64'}, {'json': 2935, 'id': 4}],
      'id': 3}],
    ['mass',
     {'call': ['awkward', 'numpy', 'frombuffer'],
      'args': [{'read': '5'}, {'dtype': 'float64'}, {'json': 2935, 'id': 6}],
      'id': 5}],
...
  • How can we save nested tables?

    • You can't do it with parquet
    • You can with awkd, HDF5 or arrow buffers
      • awkd is a zip archive of columns as binary blobs (zlib compressed, though it looks like there are ways to use other compressors)
      • HDF5 serialization uses the same protocol as pickling
      • Note on how HDF5 is stored:

      The HDF5 format does not include columnar representations of arbitrary nested data, as awkward-array does, so what we're actually storing are plain Numpy arrays and the metadata necessary to reconstruct the awkward array

      • Arrow buffers can't be compressed since they're only meant for ephemeral communication / memory mapping
  • How do we join two tables?

Dask.Bag.join looks like a possibility except that Bag is intended to operate on individual records so I'm not sure how awkward wrapping batches of records would fit in. One possibility could be to call .tolist() on the awkard arrays, then Bag.flatten, and then use the Bag API functions as normal.

  • How do we do something like explode an inner table, group by something in that table, and produce a count?

I think this too only appears to be possible (like joins) by using awkward within to partitions to either directly create lists of python objects, or use Pandas as an intermediary, for generating individual dict objects or pandas dataframes that we then process with dask.Bag or dask.DataFrame.


From me:

I found a bit more documentation in the jupyter notebooks here.

Regarding lazy arrays and Dask integration, I believe they are referring to the VirtualArray and PartitionedArray, which I believe you can see used with Dask here, but even then afaiu we still need to operate on awkward array at the partition level (unless we further integrate it into Dask).

Regarding groupby/join - looking at how reducers were implemented in awkward, I am not sure we would be able to use awkward as a 1st class citizen in Dask without as you are saying - converting down to list/dict/etc.

ravwojdyla avatar Jul 22 '21 19:07 ravwojdyla

A different way to approach this problem is to export the fields that need ragged string arrays to a different storage backend, such as Parquet, then use tools to query that dataset to produce a list of IDs that can be used to restrict the array data in Zarr. (I suggested this in #1059.)

I've explored this a bit more in this notebook: https://github.com/pystatgen/sgkit/blob/17a24cf8ad755bb499b3a4a0caeca15390ef1a7e/ragged.ipynb

And I've also written a small prototype to export a couple of VCF fields to Parquet in https://github.com/pystatgen/sgkit/commit/517a67e18bacd2fdcf568ca8523c9120cfc2be71. This could be easily extended to export all VCF fixed and INFO fields, which are the ones that would be useful for filtering.

Not sure what to do with this yet, just sharing as it might be useful.

tomwhite avatar Apr 26 '23 11:04 tomwhite

I've generalised the Parquet export code in #1083

tomwhite avatar May 04 '23 09:05 tomwhite