litdata icon indicating copy to clipboard operation
litdata copied to clipboard

SAME optimized dataset for DIFFERENT num_workers

Open Phimos opened this issue 1 week ago • 1 comments

🚀 Feature

Current the optimized dataset seems different if I'm using different num_workers chunk-$worker-$chunkid, can I make the final optimized always the same for different num_workers? Or can I reorganize the optimized dataset?

Phimos avatar Nov 22 '25 01:11 Phimos

Hi @Phimos,

During the optimization process, the total items are distributed across the workers, and each worker writes its own chunk files (chunk-$worker-$chunkid). Because of this parallelization, the final optimized dataset is always the same, but the number and naming of chunk files can vary depending on num_workers, chunk size, and how items are partitioned.

ref: https://github.com/Lightning-AI/litData/blob/07705955e698f18a5921e173710a3c726c10b6d2/src/litdata/streaming/writer.py#L307-L312

Also, during streaming, the reading process does not depend on how many workers were used during optimization.

So, to better understand how we can help, could you clarify and elaborate a bit more on your use case?

Thanks!

bhimrazy avatar Nov 23 '25 07:11 bhimrazy

Hi @bhimrazy, it is about the sampler of the streaming dataset & dataloader, and here is an example, and I'm using litdata==0.2.58

Here is the generator:

import litdata as ld
import torch


def getter(index: int):
    return torch.Tensor(torch.full((4096, 128), index, dtype=torch.long))


if __name__ == "__main__":
    ld.optimize(fn=getter, inputs=list(range(7 * 64)), chunk_size=64, output_dir="worker_size_1", num_workers=1)
    ld.optimize(fn=getter, inputs=list(range(7 * 64)), chunk_size=64, output_dir="worker_size_2", num_workers=2)

Here is the loader:

import litdata as ld

if __name__ == "__main__":
    print(" Worker num = 1 ".center(60, "-"))
    dataset = ld.StreamingDataset("worker_size_1/", shuffle=True)
    dataloader = ld.StreamingDataLoader(dataset, batch_size=64, num_workers=1)
    for i, batch in enumerate(dataloader):
        index = batch[:, 0, 0]
        print(f"{i} ~ max-min-diff:", (index.max() - index.min()).item())
    print(" Worker num = 2 ".center(60, "-"))
    dataset = ld.StreamingDataset("worker_size_2/", shuffle=True)
    dataloader = ld.StreamingDataLoader(dataset, batch_size=64, num_workers=1)
    for i, batch in enumerate(dataloader):
        index = batch[:, 0, 0]
        print(f"{i} ~ max-min-diff:", (index.max() - index.min()).item())

if the worker num is 1, I know every batch are from the same chunk, so the max-min-diff should be less than 64, but if the worker num is 2, there exist two chunk with chunk-size 32, so the max-min-diff may be greater than 64, and here is the output:

---------------------- Worker num = 1 ----------------------
0 ~ max-min-diff: 63
1 ~ max-min-diff: 63
2 ~ max-min-diff: 63
3 ~ max-min-diff: 63
4 ~ max-min-diff: 63
5 ~ max-min-diff: 63
6 ~ max-min-diff: 63
---------------------- Worker num = 2 ----------------------
0 ~ max-min-diff: 63
1 ~ max-min-diff: 63
2 ~ max-min-diff: 63
3 ~ max-min-diff: 63
4 ~ max-min-diff: 63
5 ~ max-min-diff: 92
6 ~ max-min-diff: 223

Here I will set the batch size be multiplier of the chunk-size, and I want every chunk should be in the same batch, so I hope the optimized dataset can be genereated as num_worker=1, or there is an function that I can reorganize the optimized dataset

Phimos avatar Nov 28 '25 13:11 Phimos

I think if the user specifies chunk_size instead of chunk_bytes, or specifies chunk_bytes but the sample size is fixed, we can determine which chunk each sample belongs to before the optimize operation. Otherwise, we could provide an interface to reorganize the generated chunks after the dump is complete.

Phimos avatar Nov 29 '25 01:11 Phimos