awkward icon indicating copy to clipboard operation
awkward copied to clipboard

ability to normalize schema in conversion operations like `to_arrow()` / `to_parquet()`

Open lukasheinrich opened this issue 4 years ago • 18 comments

when processsing multiple batches of events it's possible that some fields are null / not present for fields that have variable size. Also the order of fields in a record may differ

in order to do large-scale format transformation it'd be great if one could provide a fixed schema that is used in order to write files such that they can then be merged later

cc @nikoladze

lukasheinrich avatar Jul 06 '21 16:07 lukasheinrich

I ran into this in the same scenario as the one that you describe. I'm still quite new to the space, but I wonder if what we need here is some kind of coerce_to_form function that can transform one Awkward Array to another with the given form, e.g.

form = ...
for batch in batch_generator():
    as_form = ak.coerce_to_form(batch, form)
    ak.to_parquet(as_form, ...)

For auto-generated forms this could be

from itertools import chain

batches = iter(batch_generator())
packed = (ak.packed(b) for b in batches)

first = next(packed)
form = first.layout.form

for batch in chain([first], packed):
    as_form = ak.coerce_to_form(batch, form)
    ak.to_parquet(as_form, ...)

This would have benefits in other areas such as partitioned arrays.

agoose77 avatar Jul 06 '21 16:07 agoose77

yes I agree that type of coercion would be perfect!

lukasheinrich avatar Jul 06 '21 16:07 lukasheinrich

There is type information associated with Arrow and Parquet, such as "this is an array of nullable lists." That type information is at the same level of abstraction as Awkward types, so the above would be something like 1000 * option[var * float32]. The high-level type information doesn't say whether the nullability is implemented by a BitMaskedArray or an UnmaskedArray, but if the type says "nullable" and the mask is None, then it becomes an UnmaskedArray to provide that option-type in Awkward without creating a mask.

If your application relies on high-level types, then one Parquet file with nullable type and an existing mask and another Parquet file with the same schema and a missing mask should be viewed as the same high-level type and everything works the same with both files. However:

  • If the Parquet file without a mask also has a non-nullable type, then they'll come out in Awkward Array as different types. In this case, the Parquet files are heterogeneous. The producer of these files (maybe ak.to_parquet?) ought to produce homogeneous files—all with the same schema.
  • If there's a bug in ak.from_arrow or ak.from_parquet that is ignoring the type specification, translating arrays with no mask as non-option-type (neither BitMaskedArray nor UnmaskedArray), then this is a problem that should get fixed, not hidden by a coersion function.
  • If your application relies on low-level Forms, rather than high-level Types, then you'll need a coersion function. You'll want to pick the right file to sample for the Form, to get one that's general enough. Or maybe there should be another function that produces the most general Arrow Forms from Types. For instance, given an OptionType, the most general Arrow Form is a BitMaskedForm, but there are non-Arrow/Parquet contexts in which IndexedOptionForm would be more general. The Form you want to generate for reading Arrow/Parquet would have all UnmaskedForms expanded to BitMaskedForms, but that functionality is specific to Arrow/Parquet sources.

There are applications that would require exact Forms to be matching. Numba, for instance, compiles specialized code for each Form—an abstract Type is not enough information to generate low-level IR. Lazy arrays would also require it, since VirtualArray checks to see if the generated array matches its expected Form (if the VirtualArray was given an expected Form). The agreement must be at the level of Forms so that lazy arrays can be used in Numba.

Implementing ak.coerce_to_form for VirtualArrays in such a way that they don't immediately materialize the VirtualArray would be tricky. We have all the pieces we need: the new Form and a function that would convert the array, so the VirtualArray's Form gets replaced by the new Form and its generator function would get chained with the function that converts the array (i.e. new generator is lambda: convert(old_generator())). But VirtualArrays can generate arrays containing VirtualArrays, so the translation would have to be lazily applied recursively. (The reason we want recursive lazy arrays is to read Parquet files with nested StructArrays or StructArrays in Arrow Tables. We don't want access of an outer field to cause all inner fields to be read.)

Given that ak.coerce_to_form is tricky for VirtualArrays and expanding to the most general Form for Arrow/Parquet is specific to Arrow/Parquet, maybe all that's needed for this application is to have a flag in ak.from_arrow and ak.from_parquet to expand all data to the most general Form? For Arrow/Parquet, that means

  • no UnmaskedArrays; always create np.zeros(int(np.ceil(np.log2(length))), np.uint8) whenever a mask is not provided but the type is nullable;
  • all 32-bit nodes (e.g. Arrow ListArray) should be converted into 64-bit nodes (Arrow LargeListArray) to allow for one of them to be 64-bit.

I can't think of any other Type-preserving differences that Arrow or Parquet could have. The advantage of making it a boolean flag for ak.from_arrow and ak.from_parquet, other than the fact that the choice of Form would be Arrow/Parquet-specific anyway, is that this integrates it into the code that makes all of the (recursive) VirtualArray generators. Adding to this code that has already solved the recursive-laziness problem would be easier than writing new code that would have to solve it again.

jpivarski avatar Jul 06 '21 17:07 jpivarski

@jpivarski I wonder whether we only need to solve the writing component of the problem. As long as writing uses a common schema, then upon reading these arrays will share a common schema. At least initially, existing incompatible arrays can be re-written to apply a common schema. Therefore, if we were to define some kind of coercion function, it would be "fine" to materialise virtual arrays in the same way that we do for unflatten and other routines.

I ran into this "issue" in the writing stage, when I was re-partitioning data with slightly different forms. There are some additional things we might need to address here, such as in-determinism of Record field names.

agoose77 avatar Jul 06 '21 18:07 agoose77

our use-case is converting a large dataset of O(10k) files O(100TB) from ROOT to parquet. It would be great to really ensurre that all files have the same low-level type as otherwise we'd be baking in the file-boundaries which we perhaps do not want to preserve (perhaps we want to merge parquet files later, etc)

I think the problem of finding the right low-level type could be factorized. Often we do know what that type should be from other considerations

lukasheinrich avatar Jul 06 '21 18:07 lukasheinrich

pyarrow.parquet.ParquetFileWriter won't want to give you the same low-level type in each file—what I'm calling its Form. It will want to optimize things by not writing bit-masks when zero entries in a row group (or even page?) have missing values in practice, and it could be hard to swim against that tide. However, it should give the same high-level type—what I'm calling its Type (capital T, this class) and Parquet and Arrow call schemas (two different schema languages, but equivalent to each other). As producer of the files, you should ensure that they have the same high-level schemas, but don't worry about the low-level questions of whether a mask is actually instantiated; let pyarrow optimize. As a consumer of the files, Awkward Array must map identical schemas to identical high-level types. That's how it ought to work, and any failure to do so is either a bug or a not-yet-implemented case.

But if a process downstream of your Parquet → Awkward reader additionally needs the low-level Forms to be the same, then that's something we should either have a special ak.coerce_to_form function for or a flag in ak.from_arrow/ak.from_parquet to expand all low-level data to fit the most general Form for the given Type/schema. Since all of your files have the same schema, such a flag would make them have the same Form, despite pyarrow's file optimizations. (We can "puff up" the in-memory format from "slimmed down" files.) I favor the ak.from_arrow/ak.from_parquet flag approach because it would be so much easier to get it right.

Tools that repartition a collection of Parquet files with the same schema know how to adjust for low-level differences in optimization. Things like including or excluding masks are only the tip of the iceberg for libparquet: it mixes all sorts of optimizations, like run-length encodings, variable-width encodings, and odd-bitlength encodings in seemingly random domains (the file-writer decides to run-length encode 5 values, variable-width the next 14, encode the next 8 with 5 bits each, etc.). Since libparquet assumes that all of these fine-granularity optimizations are going to be present anyway, it shouldn't have any problem with adjusting which parts need to get a mask and which don't. (It also needs to rewrite the page minimum/page maximum metadata, so the whole can of worms is open anyway.) That's why converting them into Awkward Arrays will (without a flag to say otherwise) sometimes produce BitMaskedArray nodes and sometimes UnmaskedArray nodes, but these represent the same high-level Type, which should be one-to-one with the Parquet schema.

jpivarski avatar Jul 06 '21 19:07 jpivarski

Thanks @jpivarski - so it seems like for our usecase having a ak.coerce_to_type would be suffficient then and we won't need a ak.coerce_to_form. Perhaps @nikoladze can confirm. Or are you saying a ak.coerce_to_type would not even be necessary?

lukasheinrich avatar Jul 06 '21 19:07 lukasheinrich

I'm saying that a "coerce to Type" should not be necessary, if there are no bugs or missing features in Awkward. There should be a one-to-one relationship between Parquet schemas and Awkward Types, if nothing is broken. Some partitions/row-groups/pages having masks while others don't is not a difference in Parquet schemas and it should not be a difference in Awkward Types because BitMaskedArray vs UnmaskedArray is not a difference in Types (just Forms).

The second question that @nikoladze can confirm is whether your application needs all Forms to be the same or if same Types is sufficient. If you need all the Forms to be the same, that would require new functionality, such as the same_form=True flag in ak.from_arrow/ak.from_parquet. (It would be worthwhile even if your application doesn't need it, since lazy arrays in Numba would need constant Forms and this has never been addressed.)

If this seems too academic (the language is reminding me of a philosophy book I've been reading recently), the reason there's a distinction between Types and Forms is because Types are all that a data analyst would usually care about: "could these values be missing or is that impossible?" but Forms describe how arrays are laid out in memory: "are they masked out with bits or bytes? Negative values in an indexed array? Or are they formally nullable but in this particular array, none are actually missing?" Numba needs the latter if it compiles a function that will be reaching into memory addresses, expecting to find the right kind of data, and the arrays with those memory addresses will be created after compilation is finished. Normal processing of same-Type, different-Form data runs different code for each Form, but each code path does "the same thing" as far as a data analyst is concerned (identifying missing values or whatever).

jpivarski avatar Jul 06 '21 20:07 jpivarski

>>> numpyarray = ak.layout.NumpyArray(np.array([1.1, 2.2, 3.3, 4.4, 5.5]))
>>> one = ak.Array(
...     ak.layout.BitMaskedArray(
...         ak.layout.IndexU8(np.array([0], np.uint8)),
...         numpyarray,
...         valid_when=False,
...         length=5,
...         lsb_order=True,
...     )
... )
>>> two = ak.Array(
...     ak.layout.UnmaskedArray(
...         numpyarray
...     )
... )
>>> one
<Array [1.1, 2.2, 3.3, 4.4, 5.5] type='5 * ?float64'>
>>> two
<Array [1.1, 2.2, 3.3, 4.4, 5.5] type='5 * ?float64'>
>>> ak.is_none(one)
<Array [False, False, False, False, False] type='5 * bool'>
>>> ak.is_none(two)
<Array [False, False, False, False, False] type='5 * bool'>
>>> one.type
5 * ?float64
>>> two.type
5 * ?float64
>>> one.layout.form
{
    "class": "BitMaskedArray",
    "mask": "u8",
    "content": "float64",
    "valid_when": false,
    "lsb_order": true
}
>>> two.layout.form
{
    "class": "UnmaskedArray",
    "content": "float64"
}

one and two have the same Type but different Forms. When we compute ak.is_none, it invokes a different code path for the BitMaskedArray (unpack those bits into bytes to make an array of booleans) as UnmaskedArray (generate an array of all False values), but data analysts shouldn't care which one is happening, since one and two represent the same data with the same Type.

If fully materialized data for one and two were passed one at a time into a Numba JIT-compiled function, Numba would compile two specializations of the function, though a user wouldn't need to be aware of this. ~~If a VirtualArray that produces one is used to compile the function and then a VirtualArray that produces two is used in it,~~ Nevermind: I thought that would be a problem, but Numba would still compile two specializations of the function since those two VirtualArrays would have different Forms.

So what could matter if the Forms are different among the partitions of a dataset? I can't think of anything—the one example I thought would matter doesn't, in fact, matter.

jpivarski avatar Jul 06 '21 20:07 jpivarski

I think I'm more concerned about this setting

In [18]: a = ak.Array([{'jets': [{'eta': 1.0, 'phi': 2.0}], 'electrons': [{'e': 1.23, 'pid': 11}]}])

In [19]: b = ak.Array([{'jets': [{'phi': 1.0, 'eta': 2.0}], 'electrons': []}])

In [20]: ak.type(a)
Out[20]: 1 * {"jets": var * {"eta": float64, "phi": float64}, "electrons": var * {"e": float64, "pid": int64}}

In [21]: ak.type(b)
Out[21]: 1 * {"jets": var * {"phi": float64, "eta": float64}, "electrons": var * unknown}

the two arrays would both fit into the type/schema N * {"jets": var * {"eta": float64, "phi": float64}, "electrons": var * {"e": float64, "pid": int64}} but the arrow schemas are different

In [29]: ak.to_arrow(a).type
Out[29]: StructType(struct<jets: large_list<item: struct<eta: double not null, phi: double not null> not null> not null, electrons: large_list<item: struct<e: double not null, pid: int64 not null> not null> not null>)

In [30]: ak.to_arrow(b).type
Out[30]: StructType(struct<jets: large_list<item: struct<phi: double not null, eta: double not null> not null> not null, electrons: large_list<item: double not null> not null>)

being able to normalize the two is I think what we're after

lukasheinrich avatar Jul 06 '21 20:07 lukasheinrich

Your a and b have different Types: "unknown" is not the same thing as those records. Those are different Parquet schemas because they're different Types. But ak.from_iter applied to untyped Python data is not expected to give the same Types.

>>> a = ak.Array([{'jets': [{'eta': 1.0, 'phi': 2.0}], 'electrons': [{'e': 1.23, 'pid': 11}]}])
>>> b = ak.Array([{'jets': [{'phi': 1.0, 'eta': 2.0}], 'electrons': []}])
>>> a.type == b.type
False

If your data are coming from ROOT/Uproot, rather than ak.from_iter, then they'll have the same Types because the Types are derived from the C++ types. Even if a whole file has no electrons, Uproot will assign a record Type to the empty data. (It will be represented by a length-0 RecordArray, not an EmptyArray.)


However, @agoose77 found a counterexample that we've been looking at in Gitter: https://gitter.im/Scikit-HEP/awkward-array

The ak.to_buffers function complains if the partitions of a PartitionedArray have different Forms, but they should be allowed to have different Forms if they have the same Type.

>>> onetwo = ak.Array(ak.partition.IrregularlyPartitionedArray([one.layout, two.layout]))
>>> onetwo
<Array [1.1, 2.2, 3.3, 4.4, ... 3.3, 4.4, 5.5] type='10 * ?float64'>
>>> pickle.dumps(onetwo)
ValueError: the Form of partition 1:

    {
    "class": "UnmaskedArray",
    "content": {
        "class": "NumpyArray",
        "itemsize": 8,
        "format": "d",
        "primitive": "float64",
        "form_key": "node1"
    },
    "form_key": "node0"
}

differs from the first Form:

    {
    "class": "BitMaskedArray",
    "mask": "u8",
    "content": {
        "class": "NumpyArray",
        "itemsize": 8,
        "format": "d",
        "primitive": "float64",
        "form_key": "node1"
    },
    "valid_when": false,
    "lsb_order": true,
    "form_key": "node0"
}

jpivarski avatar Jul 06 '21 20:07 jpivarski

right, but say I have 1000 events of type {"jets": var * {"eta": float64, "phi": float64}, "electrons": var * {"e": float64, "pid": int64}}

some of those events could be like my b (i.e. events without electrons, just by chance).. If I know what electrons look like, shouldn't I be able to change unknown to {"e": float64, "pid": int64}

it seems like we somehow implicitly rely on files being large enough to "saturate" the type, i.e. to eventually encounter a event with electrons

lukasheinrich avatar Jul 06 '21 20:07 lukasheinrich

Uproot derives the Awkward Type from the C++ type. (I hope it's not "losing" this information along the way.)

If you're getting data from iterating over Python objects, this will be a performance bottleneck that you should know about. But along these lines, we have a LayoutBuilder as an alternative to the ArrayBuilder that ak.from_iter uses that takes an explicit Form and only allows you to build arrays compatible with that Form.

Having a way to coerce the Type (as in the case of "unknowns") or Form (as in the case of BitMaskedArray vs UnmaskedArray) would also have good applications. They'd be generalizations of ak.values_astype.

jpivarski avatar Jul 06 '21 20:07 jpivarski

I should probably summarize here what i tried to do and what the obstacles were that i found. The plan was to merge multiple input ROOT files from ATLAS DAOD_PHYSLITE into a single parquet file. To do that i essentially copied over the code from ak.to_parquet and modified it such that it doesn't close the ParquetWriter, but instead takes a writer as an argument to append to an open parquet file. That works fine as long as the schema of the chunks is the same.

I think i actually did not have problems where i had the same type, but a different form/schema. It's just that i need to fill in missing fields and then the second problem was that the ordering of fields matters (the record fields in the schema are numbered).

The way these ROOT files are written, it can happen e.g. that if there was no single Electron fulfilling the criteria of the AnalysisElectrons collection then the corresponding branches won't be written at all. Here the most natural thing would be to take the form of that collection from another file and just make empty lists (that is, i think, also what our custom merging routines for these ROOT files do). I could do that maybe with ak.from_buffers or manual layout building, using an offset array filled with 0s and lot's of empty numpy arrays at the leaves. So i guess what we would need are convenience functions to do these things, e.g. if i have one array with Electrons and another one without i would like to get a merged array where the second have has only empty lists of Electrons according to the form of the first array.

The second problem (ordering of Record fields matters) is probably easier to solve, currently i just sort the record fields by name and call ak.zip with an OrderedDict.

nikoladze avatar Jul 07 '21 11:07 nikoladze

The key ordering issue I mentioned above should be distinct from anything related to parquet; it follows from the ordering in the JSON form not being guaranteed. Python 3.7+ does guarantee insertion ordering of dicts, so you may well not need OrderedDict there (if you target modern Python).

agoose77 avatar Jul 07 '21 11:07 agoose77

Python 3.7+ does guarantee insertion ordering of dicts, so you may well not need OrderedDict there (if you target modern Python).

good to know, thanks!

nikoladze avatar Jul 07 '21 15:07 nikoladze

3.6+, actually. This looms large for me because it's often the reason that the Python 2.7 and 3.5 tests fail.

jpivarski avatar Jul 07 '21 15:07 jpivarski

I think the ordering was only guaranteed in 3.7 and above, although that would mean that it's still there in 3.6!

agoose77 avatar Jul 07 '21 16:07 agoose77

I think this issue can be closed following the addition of #2365!

agoose77 avatar Jul 02 '23 17:07 agoose77