agents
agents copied to clipboard
ParallelPyEnvironment doesn't work when subprocess calling a tensorflow model
Hi, TF_Agents Teams!
I am trianing my agent with SAC. Something different with general situation is that the reward is gived by a neural netwok(implemented by tensorflow 2.0) in some stage.So there is something like this in my custom PyEnvironment.
platform
Python 3.7.5 / Tensorflow 2.0.0 / tf_agents 0.3.0
def _step(self, action):
if self._episode_ended:
return self.reset()
state = make_state(action)
reward = evaluate_model(state) # get reward from a neural network
...
If run this Env parallelly, it complains that:
...
File "/home/llouice/Projects/SE/Research/agents/tf_agents/environments/tf_py_environment.py", line 298, in _step_py
self._time_step = self._env.step(packed)
File "/home/llouice/Projects/SE/Research/agents/tf_agents/environments/py_environment.py", line 174, in step
self._current_time_step = self._step(action)
File "/home/llouice/Projects/SE/Research/agents/tf_agents/environments/parallel_py_environment.py", line 136, in _step
time_steps = [promise() for promise in time_steps]
File "/home/llouice/Projects/SE/Research/agents/tf_agents/environments/parallel_py_environment.py", line 136, in <listcomp>
time_steps = [promise() for promise in time_steps]
File "/home/llouice/Projects/SE/Research/agents/tf_agents/environments/parallel_py_environment.py", line 334, in _receive
message, payload = self._conn.recv()
File "/home/llouice/anaconda3/envs/tf/lib/python3.7/multiprocessing/connection.py", line 250, in recv
buf = self._recv_bytes()
File "/home/llouice/anaconda3/envs/tf/lib/python3.7/multiprocessing/connection.py", line 407, in _recv_bytes
buf = self._recv(4)
File "/home/llouice/anaconda3/envs/tf/lib/python3.7/multiprocessing/connection.py", line 383, in _recv
raise EOFError
EOFError:
In call to configurable 'train_eval' (<function train_eval at 0x7ff80165ab90>)
I think it's the issue about tensorflow multiprocessing.I tested it and found the key is:
import multiprocessing as mp
def test_mp():
pool = mp.Pool(4)
for i in range(5):
pool.apply_async(run_model)
pool.close()
pool.join()
if __name__ == "__main__":
mp.set_start_method('spawn') # this is the key
test_mp()
It works. But i find the result of mp.get_start_method(allow_none=True) in ParallelPyEnvironment is fork. However, when I set it as spawn , I come to an infinite wait.
I appreciate any suggestion that make it run parallelly!
Since you're relying on tensorflow to perform the reward calculation, you get GIL-free multithreading for free. This means you should be able to get away with using the BatchedPyEnvironment - this uses python multithreading instead of multiprocessing. Since running TF calls releases the GIL, you won't get any advantage (and only overhead) from using the ParallelPyEnvironment.
The error is pretty opaque though, @oars is it possible there's an error message from the other process that we are missing when this is raised? and it gets hidden by this error?
@ebrevdo Is there an example on how to use the BatchedPyEnvironment?
YEs, the EOFError is caught and exceptions are logged. @LLouice do you have full logs? There should be a message with:
Error in enviroonment process: ....
batched_py_environment.BatchedPyEnvironment(envs=[suite.load(...) for _ in range(n)])