acme icon indicating copy to clipboard operation
acme copied to clipboard

How to modify acme.adders.reverb.base.Adder for adding several environments' transitions in Acme?

Open fmxFranky opened this issue 5 years ago • 8 comments

I wanna use acme's algorithms to interact with the vectorized environments(just like baselines.VectorEnv), however maintaining N adders for N environments will decrease the efficiency when collecting samples and sending them to reverb_server(when I use my VecEnv(num_envs=64), sending transitions to replay buffer will use 60% time of the total sampling time). So how to modify the code of acme.adders.reverb.base.Adder and its subclasses to make it can add several environments' transitions in one adder? Thank you very much~

fmxFranky avatar Jul 14 '20 02:07 fmxFranky

Hi @fmxFranky ,

Did you happen to try out parallelizing the environments (as in baselines SubprocVec) using ACME codebase? If so, could you share how you did it?

jaejaywoo avatar Jul 31 '20 21:07 jaejaywoo

Hi, @jaejaywoo , my method is that using the interfaces in dm_env to wrap each transition in SubprocVec into dm_env.Timestep instances and create N adders to add them into the replay server. However, this way is also slowly because the adding process is serial so that it will cost more time. And i am sorry that i have no idea about how to achieve the goal in a efficient way....

fmxFranky avatar Aug 01 '20 12:08 fmxFranky

Thanks for your reply @fmxFranky :)

jaejaywoo avatar Aug 01 '20 19:08 jaejaywoo

Hi @fmxFranky, I would recommend maintaining one Adder per environment. Each adder is linked to a Reverb client, which maintains a gRPC stream that the Reverb server uses for keeping track of episodes.

Also, I just did a quick estimation of how long it takes to add data to Reverb using the tutorial colab and it appears to take ~0.37 ms per call to Adder.add. This is using the gym mountaincar environment, which may have smaller state than your environments, but I don't expect this to change much unless you're using images. This means that it'd take approximately 23.68ms to add the state from all 64 environments, is this what you're seeing?

You can use a thread pool to parallelize the calls to Adder.add without having to change the internal code at all. You'll need to be careful about propagating errors, but this will be similar to any multi-threaded code.

For example:

pool = concurrent.futures.ThreadPoolExecutor(num_workers=...)
futures = []
for timestep, action, adder in zip(timesteps, actions, adders):
  f = pool.submit(adder.add(action, timestep))
  futures.append(f)

# Do some error checking on futures
...

fastturtle avatar Aug 03 '20 10:08 fastturtle

Thanks a lot, @fastturtle , I will add a thread pool to make the calls parallelized~

fmxFranky avatar Aug 03 '20 11:08 fmxFranky

Great, let us know how it goes :)

fastturtle avatar Aug 03 '20 11:08 fastturtle

Great, let us know how it goes :) I find that using thread pool doesn't give me any time savings

      with concurrent.futures.ThreadPoolExecutor(max_workers=len(self._adder)) as pool:
        futures = []
        for (i, next_timestep) in enumerate(next_timestep_list):
          f = pool.submit(self._adder[i].add, action[i], next_timestep)
          futures.append(f)
        concurrent.futures.wait(futures, return_when=concurrent.futures.ALL_COMPLETED)

The above code sets env_num= 3, 30, 300 , and the time is still multiplied.

feidieufo avatar Mar 28 '22 07:03 feidieufo

ThreadPools are subject to GIL, which means only one thread can run at a time. (It's concurrent but not parallel), so you won't have any speedup beyond 1x~1.2x. Multi-processing is the way to go; acme has been using launchpad to deal with parallel actors (see distributed agent examples).

wookayin avatar Apr 19 '22 19:04 wookayin