ray_shuffling_data_loader icon indicating copy to clipboard operation
ray_shuffling_data_loader copied to clipboard

[Shuffle] Support num_mappers != num_files

Open clarkzinzow opened this issue 4 years ago • 0 comments

Right now, the number of shuffle mappers is equal to the number of input files, with each mapper reading a single input file. However:

  1. For very large files, we may want multiple mappers reading segments of a file in parallel for the sake of memory utilization and/or maximizing read throughput.
  2. For many very small files, we may want multiple files to be read by a single mapper for the sake of reducing task overhead and/or possibly maximizing read throughput.

(1) has already been brought up by a user as a concrete need. We should therefore support this by allowing num_mappers != num_files, and letting num_mappers be set by the user with a best-effort default as a fallback.

Implementation Notes

(2) (i.e. num_files > num_mappers) won't involve much more than the typical np.array_split() pattern, e.g.

for files in np.array_split(filenames, num_mappers):
    reducer_parts = shuffle_map.options(num_returns=num_reducers).remote(files, ...)

(1) (i.e. num_files < num_mappers), however, will require us to read a file starting at a certain offset; given that the Pandas read_parquet() API doesn't expose that functionality, the most likely course of action would be using the pyarrow Parquet reading facilities directly. Here is an example of such usage, although we will most likely want to use a ParquetDataset with split_row_groups=True and rely on the row group granularity for the MVP.

clarkzinzow avatar Jul 13 '21 08:07 clarkzinzow