rlpyt icon indicating copy to clipboard operation
rlpyt copied to clipboard

Correct way to send AsyncReplayBuffers to new processes

Open MaxASchwarzer opened this issue 5 years ago • 9 comments

I'm trying to do multi-GPU DistributedDataParallel training using an AsyncPrioritizedSequenceReplayFrameBuffer, and I'm having trouble passing a buffer created in the parent to child processes. When I directly pass the buffer at initialization I receive the error

   File "/home/schwarzm/anaconda3/envs/pytorch/lib/python3.7/multiprocessing/spawn.py", line 115, in _main
    self = reduction.pickle.load(from_parent)
_pickle.UnpicklingError: NEWOBJ class argument isn't a type object

When I wrap the buffer in a cloudpickle wrapper before passing it, I instead see TypeError: can't pickle mmap.mmap objects.

From what I can tell, the function launch_optimizer_workers is directly passing such a buffer to it the created worker processes via self.algo -- is there some trick necessary to make this work? Does each worker need to create a new copy of the buffer at process startup? Launching workers with fork appears to avoid this issue, but it in turn makes it impossible to use CUDA in the child processes.

MaxASchwarzer avatar Feb 13 '20 21:02 MaxASchwarzer

Hi, sorry for the slow reply! This looks the same as #99, which talks about using spawn instead of fork, and it’s probably a problem with the namedarraytuple types. Currently working on an alternative for this!

astooke avatar Feb 21 '20 13:02 astooke

Hey @MaxASchwarzer, just pushed a bunch of work to master which introduces the NamedArrayTuple which should work with spawn as an alternative to namedarraytuple. Want to give that a try and see if it works? I think the simplest place to make the change will be to add the new kwarg like the following, wherever it calls: buffer_from_example(...,use_NatSchema=True), such as inside the replay buffer and probably in the sampler where it builds the samples buffer....hopefully not too many edits for you.

astooke avatar Mar 03 '20 00:03 astooke

Hi, @astooke , i 'v run into the same problem, already update code according to #99,

like in replays.py: batch = SamplesFromReplay(all_observation=self.extract_observation(T_idxs, B_idxs, T + self.n_step_return), ...) should edit to? --> batch = SamplesFromReplay(all_observation=buffer_from_example(self.extract_observation(T_idxs, B_idxs, T + self.n_step_return)), ...)

BufferSamples = namedarraytuple("BufferSamples", field_names) should edit to? --> BufferSamples = NamedArrayTuple("BufferSamples", field_names)

am I doing it right? NamedArrayTuple() seems got different params with namedarraytuple()

mxwells avatar Mar 18 '20 06:03 mxwells

Almost! Instead of BufferSamples = NamedArrayTuple(...), use: BufferSamples = NamedArrayTupleSchema(...)

Then when you call x = BufferSamples(...), x will be an instance of NamedArrayTuple. I can see how that's kind of confusing...might be worth renaming so that you can use NamedArrayTuple in place of namedarraytuple....hmm..

astooke avatar Mar 21 '20 01:03 astooke

@astooke Got it, It's working! one more question, I am trying envs fast to step but very slow to reset(like 30 secs), is there any way make collectors entirely async(one env reseting does not effect others envs sample), any advice?

mxwells avatar Mar 23 '20 12:03 mxwells

@mxwells Oops somehow I missed your last question for a bit. That is a very slow reset! First thing to do would be to use any of the WaitReset collectors. These will skip env instances which need to reset during sampling, and will instead reset them in the background while the algorithm is training the agent. (the data in a sample batch after the env hits done will be invalid, and all that masking is in place in the algos if you look for valid_from_done())

If even that seems too slow, maybe there is some scheme where you keep extra environment instances on hand, and only reset once all of the extra ones are used up, so you do them together and there is less straggler effect. Could get kinda complicated.

Or if you can do anything to the env to speed up the reset :D ?

astooke avatar Mar 30 '20 17:03 astooke

cool! it was really helpful.

mxwells avatar Apr 01 '20 00:04 mxwells

Great!

One thing to watch out for, if you're storing samples to a replay buffer and training from there, the usual uniform-sampling from replay might return invalid samples if you use the WaitReset collectors. This could be fixed by adding a valid mask field to the replay buffer, storing into that using the result from valid_from_done() in the algo, and then using the stored valid value to mask to the training batch.

Come to think of it I should probably implement this, or if you'd like to give it a go let us know!

astooke avatar Apr 06 '20 20:04 astooke

yeah i'd like to, might take sometime cuz i wanna let this extra environment idea lend first :D

mxwells avatar Apr 09 '20 07:04 mxwells