arrow icon indicating copy to clipboard operation
arrow copied to clipboard

[C++][Dataset] Preserve order when writing dataset

Open asfimport opened this issue 5 years ago • 6 comments

Currently, when writing a dataset, e.g. from a table consisting of a set of record batches, there is no guarantee that the row order is preserved when reading the dataset.

Small code example:


In [1]: import pyarrow.dataset as ds

In [2]: table = pa.table({"a": range(10)})

In [3]: table.to_pandas()
Out[3]: 
   a
0  0
1  1
2  2
3  3
4  4
5  5
6  6
7  7
8  8
9  9

In [4]: batches = table.to_batches(max_chunksize=2)

In [5]: ds.write_dataset(batches, "test_dataset_order", format="parquet")

In [6]: ds.dataset("test_dataset_order").to_table().to_pandas()
Out[6]: 
   a
0  4
1  5
2  8
3  9
4  6
5  7
6  2
7  3
8  0
9  1

Although this might seem normal in SQL world, typical dataframe users (R, pandas/dask, etc) will expect a preserved row order. Some applications might also rely on this, eg with dask you can have a sorted index column ("divisions" between the partitions) that would get lost this way (note, the dask parquet writer itself doesn't use pyarrow.dataset.write_dataset so isn't impacted by this.)

Some discussion about this started in https://github.com/apache/arrow/pull/8305 (ARROW-9782), which changed to write all fragments to a single file instead of a file per fragment.

I am not fully sure what the best way to solve this, but IMO at least having the option to preserve the order would be good.

cc @bkietz

Reporter: Joris Van den Bossche / @jorisvandenbossche Watchers: Rok Mihevc / @rok

Related issues:

Note: This issue was originally created as ARROW-10883. Please see the migration documentation for further details.

asfimport avatar Dec 11 '20 12:12 asfimport

Weston Pace / @westonpace: I deleted the link to ARROW-12873 because I don't know that "batch index" needs to rely on that arbitrary metadata mechanism (and, given that many nodes will need to manipulate it, I don't think it is arbitrary metadata)

asfimport avatar May 10 '22 17:05 asfimport

Hi @westonpace, Is there any updates on this? Current "FileSystemDataset::Write" is implemented by a sequenced plan of scan, filter, project, write. As referenced plan in #32991 , a batch_index has been added in the scanner and is used by "ordered_sink" to reorder exec_batches. Should we consider implementing an "ordered" node that functions similar to ordered_sink without sinking? This node could be injected any place between scan and project. I believe that the "ordered" node would be a more effective way to directly order the output of the "scan" node, providing a more flexible planning approach.

hu6360567 avatar Apr 10 '24 09:04 hu6360567

Could someone please solve this issue? This is clearly a bug in arrow, and it should least have an option to preserve order.

jerryqhyu avatar Jun 13 '24 19:06 jerryqhyu

Just got burnt by the same issue as I was trying to re-encode a parquet file with a different rowgroup size. Even if there is no plan to fix that issue, it might be a good idea to add a warning in the documentation, which currently mentions nothing about order not being preserved: https://arrow.apache.org/docs/python/generated/pyarrow.dataset.Dataset.html#

douglas-raillard-arm avatar Jul 12 '24 13:07 douglas-raillard-arm

Related: https://github.com/apache/arrow/issues/39030

user529481 avatar Jul 12 '24 20:07 user529481

Should we consider implementing an "ordered" node ...

Looks like we do not need to introduce a new node because the "write" can sequence exec batches already. For this to work, all we need is tell the "scan" to give batches and index (ImplicitOrdering) and the "write" will by default sequence the batches.

Here is the fix: #44470

EnricoMi avatar Oct 18 '24 11:10 EnricoMi

Issue resolved by pull request 44470 https://github.com/apache/arrow/pull/44470

rok avatar May 14 '25 07:05 rok

@rok Yayyyy =D

anjakefala avatar May 15 '25 05:05 anjakefala