arrow-julia icon indicating copy to clipboard operation
arrow-julia copied to clipboard

add kwarg chunksize for default data partitioning for write

Open svilupp opened this issue 2 years ago • 4 comments

This PR proposes to introduce automated partitioning of the provided tables when writing. It follows my findings from benchmarking against PyArrow

Nowadays, most machines are multithreaded and Arrow.write() provides multithreaded writing for partitioned data. However, a user must explicitly partition their data. Unfortunately, most users do not realize that both their write and subsequent read operations will not be multithreaded without such partitioning (there is an issue to improve the docs).

This PR defaults to partitioning data if it's larger than 64K rows (should be beneficial on most systems) to enable better Arrow.jl performance on both read and write.

Implementation:

  • the new kwarg is called chunksize (maps to PyArrow and should be broadly understood)
  • uses default chunksize of 64000 rows, as per PyArrow.write_feather
  • allows users to opt-out by providing chunksize=nothing
  • ~~partitioning is done via Iterators.partition(Tables.rows(tbl),chunksize) for all Tables.jl-compatible sources (checks Tables.istable)~~ changed to Iterators.partition(tbl,chunksize) to avoid missingness getting lost (eg, for DataFrames)

Some resources:

svilupp avatar Mar 12 '23 15:03 svilupp

I've changed the condition for automatic partitioning to be Tables.rowaccess()=true as well, to prevent accepting some columntables without row iterators.

In addition, I've changed Iterators.partition(Tables.rows(tbl),chunksize) to Iterators.partition(tbl,chunksize) to avoid missingness type getting lost (eg, for DataFrames)

svilupp avatar Mar 13 '23 08:03 svilupp

In addition, I've changed Iterators.partition(Tables.rows(tbl),chunksize) to Iterators.partition(tbl,chunksize) to avoid missingness type getting lost (eg, for DataFrames)

Okay, I was wrong. I misunderstood what rowaccess requirements are -- Iterators.partition() still needs to be defined separately.

I've moved back to Tables.rows to ensure we get rows out.

I'm not sure what the best solution is here. By far the simplest option would be to pass the schema down - because we have access to it before Tables.columns is called in the arrow construction (that's how we lose the schema, because we "materialize" the chunk as is)

EDIT:

  • The second simplest option would be to add compat entry for DataFrames>1.5.0 and use the Iterators.partition directly (with some safety check that it indeed chunked rows, not columns... if some unknown type defines it over columns)

svilupp avatar Mar 13 '23 09:03 svilupp

Added compat for DataFrames via Extras

svilupp avatar Mar 13 '23 09:03 svilupp

By far the simplest option would be to pass the schema down - because we have access to it before Tables.columns is called in the arrow construction (that's how we lose the schema, because we "materialize" the chunk as is)

We could allow users to optionally provide the Schema in the Base.open constructor of the Writer object. If a user makes use of this then we should validate the the actual schema of each chunk matches that of the expected schema.

baumgold avatar Mar 13 '23 23:03 baumgold