[C++][Dataset] Preserve order when writing dataset
Currently, when writing a dataset, e.g. from a table consisting of a set of record batches, there is no guarantee that the row order is preserved when reading the dataset.
Small code example:
In [1]: import pyarrow.dataset as ds
In [2]: table = pa.table({"a": range(10)})
In [3]: table.to_pandas()
Out[3]:
a
0 0
1 1
2 2
3 3
4 4
5 5
6 6
7 7
8 8
9 9
In [4]: batches = table.to_batches(max_chunksize=2)
In [5]: ds.write_dataset(batches, "test_dataset_order", format="parquet")
In [6]: ds.dataset("test_dataset_order").to_table().to_pandas()
Out[6]:
a
0 4
1 5
2 8
3 9
4 6
5 7
6 2
7 3
8 0
9 1
Although this might seem normal in SQL world, typical dataframe users (R, pandas/dask, etc) will expect a preserved row order.
Some applications might also rely on this, eg with dask you can have a sorted index column ("divisions" between the partitions) that would get lost this way (note, the dask parquet writer itself doesn't use pyarrow.dataset.write_dataset so isn't impacted by this.)
Some discussion about this started in https://github.com/apache/arrow/pull/8305 (ARROW-9782), which changed to write all fragments to a single file instead of a file per fragment.
I am not fully sure what the best way to solve this, but IMO at least having the option to preserve the order would be good.
cc @bkietz
Reporter: Joris Van den Bossche / @jorisvandenbossche Watchers: Rok Mihevc / @rok
Related issues:
- Pyarrow 8.0.0 write_dataset writes data in different order with use_threads=True (is duplicated by)
- [C++] Add ordering information to exec batches (requires)
Note: This issue was originally created as ARROW-10883. Please see the migration documentation for further details.
Weston Pace / @westonpace: I deleted the link to ARROW-12873 because I don't know that "batch index" needs to rely on that arbitrary metadata mechanism (and, given that many nodes will need to manipulate it, I don't think it is arbitrary metadata)
Hi @westonpace,
Is there any updates on this?
Current "FileSystemDataset::Write" is implemented by a sequenced plan of scan, filter, project, write.
As referenced plan in #32991 , a batch_index has been added in the scanner and is used by "ordered_sink" to reorder exec_batches.
Should we consider implementing an "ordered" node that functions similar to ordered_sink without sinking? This node could be injected any place between scan and project.
I believe that the "ordered" node would be a more effective way to directly order the output of the "scan" node, providing a more flexible planning approach.
Could someone please solve this issue? This is clearly a bug in arrow, and it should least have an option to preserve order.
Just got burnt by the same issue as I was trying to re-encode a parquet file with a different rowgroup size. Even if there is no plan to fix that issue, it might be a good idea to add a warning in the documentation, which currently mentions nothing about order not being preserved: https://arrow.apache.org/docs/python/generated/pyarrow.dataset.Dataset.html#
Related: https://github.com/apache/arrow/issues/39030
Should we consider implementing an "ordered" node ...
Looks like we do not need to introduce a new node because the "write" can sequence exec batches already. For this to work, all we need is tell the "scan" to give batches and index (ImplicitOrdering) and the "write" will by default sequence the batches.
Here is the fix: #44470
Issue resolved by pull request 44470 https://github.com/apache/arrow/pull/44470
@rok Yayyyy =D