returnn icon indicating copy to clipboard operation
returnn copied to clipboard

PostprocessingDataset with multi-processing

Open albertz opened this issue 8 months ago • 2 comments

This could be an alternative to MultiProcDataset. In most cases (OggZipDataset etc), the data loading part of the dataset is not really the bottleneck, but any postprocessing is the bottleneck, and the reason to use MultiProcDataset.

So, the idea is to have a single source dataset (e.g. OggZipDataset), but then only do the post-processing with multi-processing.

For that, we can add such functionality to PostprocessingDataset, some num_workers: int option. If not used, it would do everything in the process itself, and otherwise, it would spawn the number of worker procs. This should be fairly straightforward. At least for the map_seq case. For map_seq_stream probably not really, but we don't need that.

My use case would be a standard OggZipDataset, but then doing the speed perturbation with PostprocessingDataset with multi-processing.

(cc @NeoLegends @dorian-K)

albertz avatar Mar 20 '25 13:03 albertz

One small problem might be the rng arg to map_seq, and how this should behave. I don't see a good way that this would be consistent to how it behaved before. Even making this deterministic is already tricky and probably would compromise some speed (e.g. we could deterministically map each seq to worker_idx = seq_idx % num_workers). Making it nondeterministic is the simplest approach. But probably we would not want that this is nondeterministic.

albertz avatar Mar 20 '25 22:03 albertz

  1. I agree this should be deterministic from run to run as long as the parameters stay the same.
  2. I think even map_seq_stream is technically not a problem, we just need to shard the underlying seq iter that we feed into the stream processors on a per-worker basis (i.e. create one iter per worker that receives the items for worker_idx = seq_idx % num_workers). A multi process map_seq_stream, however, would not work well with LaplaceOrdering, and you would need a second, non-multi-proc dataset for that.
  3. I don't think that multi proc PPDataset necessarily needs to be transparent to single-proc PPDataset, i.e. it might be fine that the randomness behavior changes slightly if you go from a single process to multiple processes. If we accept that then we can just create one RNG per worker and seed it from the main proc RNG + worker offset.

NeoLegends avatar Mar 21 '25 11:03 NeoLegends

Another data point: I have a setup where I use MultiProcDataset + DistributeFilesDataset around postprocessing datasets postprocessing data from HDFDataset. Since DFD prefetches one subepoch of data, this ends up doubling memory consumption (e.g. 4 workers + DFD = 8x memory consumption vs. single-process dataset). With sufficiently large source datasets this can sometimes approach/exhaust the total memory of the GPU nodes (or the per-GPU amount, at least). I think only multi-processing the postprocessing here will help greatly, as it helps avoid the memory overhead of the source datasets.

Order: DistributeFilesDataset(MultiProcDataset(PostprocessingDataset(HDFDataset))).

NeoLegends avatar Jul 01 '25 12:07 NeoLegends

Just to clarify: Which is the inner and which is the outer dataset? (Just edit your post.)

albertz avatar Jul 01 '25 12:07 albertz

We just had another case at AppTek where the memory consumption of the workers became a bottleneck in combination w/ DistributeFilesDataset. I think this is due an implementation in the coming weeks if AppTek goes on to train more CTC models. I might work on this.

NeoLegends avatar Sep 18 '25 13:09 NeoLegends

Wrt. implementation, I'm currently thinking about the following:

  1. Spawn a number of worker processes. Each worker process gets a (separate) connection to the main proc, and a (separate) Q of incoming seqs. The workers pull from the incoming-seq-queue and postprocess each item from it. Every process gets its own RNG initialized from the PPDataset RNG, mixing in the worker index.
  2. I spawn an additional thread inside the main proc, whose only responsibility is driving the wrapped dataset and (evenly) forwarding the seqs of that dataset into worker_queue[seq_idx % num_workers]. By using a queue instead of a simple pipe, there is going to be backpressure, which eventually stops the dataset thread from driving the dataset too far/consuming 100% CPU.
  3. Whenever the trainer wants to load a seq, it fetches a seq from worker_processes[seq_idx % num_workers] via the main proc <-> worker connection. To produce that seq, that worker either fetches one from its cache, or "pulls" via the queue on the dataset thread for until that has produced enough seqs so that the worker process itself can produce a seq. The dataset thread can work in that time, because the main thread is blocked on the pipe.recv() and has released the GIL. Alternatively the worker process goes out of data.

I think this would work, and would provide deterministic randomness for our purposes. Of course the randomness is not the same as without any workers.

However, it also requires a separate thread to drive the wrapped dataset. This is because I don't see any other way to get the pull-based dataset interface to distribute its segments to the subprocesses whenever they have spare capacity.

Certainly this is more memory-efficient than including a copy of the dataset in every worker process, but it's still a separate control flow and additional complexity. Maybe this would even warrant a full/dedicated process. In any case I'm not a big fan of the separate control flow just for the dataset, but I currently don't see a good alternative way of implementing this.

@albertz Do you think the implementation would make sense this way?

NeoLegends avatar Sep 18 '25 15:09 NeoLegends

Every process gets its own RNG initialized from the PPDataset RNG

How exactly? This is basically #1762. We don't really have a solution for that yet.

I spawn an additional thread inside the main proc

~Why is this needed? MultiProcDataset does it without. I don't really understand why you need it.~ Ah sorry, this is for feeding the workers, which is different in MultiProcDataset, where they all use their own sub dataset.

What you describe after that is very similar to MultiProcDataset. Or where/how is it different? (Except the feeding.)

Maybe we should also share some code then?

albertz avatar Sep 19 '25 12:09 albertz

Ah sorry, this is for feeding the workers, which is different in MultiProcDataset, where they all use their own sub dataset.

Yes! The point here is to avoid duplicating the wrapped dataset into every worker process.

Besides that it's not very different. But we have noticed that the memory consumption when using MultiProcDataset can be quite large and with this implementation I want to reduce the memory usage again.

NeoLegends avatar Sep 19 '25 12:09 NeoLegends

Besides that it's not very different. But we have noticed that the memory consumption when using MultiProcDataset can be quite large and with this implementation I want to reduce the memory usage again.

Yes of course it is. That's not what I'm saying. You can anyway share code (maybe), as the logic is very similar.

albertz avatar Sep 19 '25 12:09 albertz

How exactly? This is basically https://github.com/rwth-i6/returnn/issues/1762. We don't really have a solution for that yet.

In this PR (https://github.com/rwth-i6/returnn/pull/1765) I've used (conceptually) rng_seed_for_worker=self.get_random_seed_for_epoch(epoch=epoch * num_workers + worker_idx). I think the function simply uses the provided epoch number as a seed. This makes every worker get its own random seed.

I have not thought about this much yet, but here this might be fine, since we're not doing any communication between the workers, so they should each have a different random seed, so that e.g. speed perturbation and sampling, etc. is different between all the workers. Right?

You can anyway share code (maybe), as the logic is very similar.

Yes, I can try to share more of the code.

NeoLegends avatar Sep 19 '25 12:09 NeoLegends

In this PR (#1765) I've used (conceptually) rng_seed_for_worker=self.get_random_seed_for_epoch(epoch=epoch * num_workers + worker_idx). I think the function simply uses the provided epoch number as a seed. This makes every worker get its own random seed.

This function can potentially do more. I don't think it's a good idea to pass a wrong epoch here just include the worker_idx into the seed. There should be a better way for that. You could use get_random_seed_for_epoch(epoch=epoch) * BIG_CONST1 + worker_idx * BIG_CONST2 or so.

But you are right, https://github.com/rwth-i6/returnn/issues/1762 is actually not so relevant, as #1762 is specifically about how to get the right seeds in sub datasets in each worker, while here we directly control and pass the RNG, so it is easy to do.

I can try to share more of the code.

Well, at least see if it makes sense. See specifically the details on multi-processing logic, on the queues, etc. If it just makes it more complicated, then don't do it.

albertz avatar Sep 19 '25 12:09 albertz