rlpyt
rlpyt copied to clipboard
Correct way to send AsyncReplayBuffers to new processes
I'm trying to do multi-GPU DistributedDataParallel training using an AsyncPrioritizedSequenceReplayFrameBuffer, and I'm having trouble passing a buffer created in the parent to child processes. When I directly pass the buffer at initialization I receive the error
File "/home/schwarzm/anaconda3/envs/pytorch/lib/python3.7/multiprocessing/spawn.py", line 115, in _main
self = reduction.pickle.load(from_parent)
_pickle.UnpicklingError: NEWOBJ class argument isn't a type object
When I wrap the buffer in a cloudpickle wrapper before passing it, I instead see TypeError: can't pickle mmap.mmap objects
.
From what I can tell, the function launch_optimizer_workers
is directly passing such a buffer to it the created worker processes via self.algo
-- is there some trick necessary to make this work? Does each worker need to create a new copy of the buffer at process startup? Launching workers with fork
appears to avoid this issue, but it in turn makes it impossible to use CUDA in the child processes.
Hi, sorry for the slow reply! This looks the same as #99, which talks about using spawn instead of fork, and it’s probably a problem with the namedarraytuple types. Currently working on an alternative for this!
Hey @MaxASchwarzer, just pushed a bunch of work to master which introduces the NamedArrayTuple which should work with spawn as an alternative to namedarraytuple. Want to give that a try and see if it works? I think the simplest place to make the change will be to add the new kwarg like the following, wherever it calls: buffer_from_example(...,use_NatSchema=True)
, such as inside the replay buffer and probably in the sampler where it builds the samples buffer....hopefully not too many edits for you.
Hi, @astooke , i 'v run into the same problem, already update code according to #99,
like in replays.py: batch = SamplesFromReplay(all_observation=self.extract_observation(T_idxs, B_idxs, T + self.n_step_return), ...) should edit to? --> batch = SamplesFromReplay(all_observation=buffer_from_example(self.extract_observation(T_idxs, B_idxs, T + self.n_step_return)), ...)
BufferSamples = namedarraytuple("BufferSamples", field_names) should edit to? --> BufferSamples = NamedArrayTuple("BufferSamples", field_names)
am I doing it right? NamedArrayTuple() seems got different params with namedarraytuple()
Almost! Instead of
BufferSamples = NamedArrayTuple(...)
,
use:
BufferSamples = NamedArrayTupleSchema(...)
Then when you call x = BufferSamples(...)
, x
will be an instance of NamedArrayTuple
. I can see how that's kind of confusing...might be worth renaming so that you can use NamedArrayTuple
in place of namedarraytuple
....hmm..
@astooke Got it, It's working! one more question, I am trying envs fast to step but very slow to reset(like 30 secs), is there any way make collectors entirely async(one env reseting does not effect others envs sample), any advice?
@mxwells Oops somehow I missed your last question for a bit. That is a very slow reset! First thing to do would be to use any of the WaitReset
collectors. These will skip env instances which need to reset during sampling, and will instead reset them in the background while the algorithm is training the agent. (the data in a sample batch after the env hits done
will be invalid, and all that masking is in place in the algos if you look for valid_from_done()
)
If even that seems too slow, maybe there is some scheme where you keep extra environment instances on hand, and only reset once all of the extra ones are used up, so you do them together and there is less straggler effect. Could get kinda complicated.
Or if you can do anything to the env to speed up the reset :D ?
cool! it was really helpful.
Great!
One thing to watch out for, if you're storing samples to a replay buffer and training from there, the usual uniform-sampling from replay might return invalid samples if you use the WaitReset
collectors. This could be fixed by adding a valid
mask field to the replay buffer, storing into that using the result from valid_from_done()
in the algo, and then using the stored valid
value to mask to the training batch.
Come to think of it I should probably implement this, or if you'd like to give it a go let us know!
yeah i'd like to, might take sometime cuz i wanna let this extra environment idea lend first :D