stable-baselines3-contrib
stable-baselines3-contrib copied to clipboard
SubProcVecEnv with MaskablePPO
First of all thank you for creating this repo, I've been trying to implement masking for a couple weeks until I found you already had it going!
Anyways, I was wondering if MaskablePPO was coded to work with vectorised environments? I have tried using SubProcVecEnv over CartPole. Minimal code:
def make_env(env_id, rank, seed=0): def _init(): env = gym.make(env_id) env.seed(seed + rank) return env set_random_seed(seed) return _init env_id='CartPole-v1' nproc=2 env = SubprocVecEnv([make_env(env_id, i+total_procs) for i in range(nproc)], start_method='fork') def mask_fn(env): return [[0,1]] ## simple mask env = ActionMasker(env, mask_fn) model = MaskablePPO(MaskableActorCriticPolicy, env).learn(1e5)
Error that I get:
File "/Users/ak2135_admin/Dropbox/gymEOEnv/learn_maskable.py", line 82, in main model.learn(time_steps) File "/Users/ak2135_admin/opt/anaconda3/envs/Hybrid_Framework/lib/python3.7/site-packages/sb3_contrib/ppo_mask/ppo_mask.py", line 528, in learn use_masking, File "/Users/ak2135_admin/opt/anaconda3/envs/Hybrid_Framework/lib/python3.7/site-packages/sb3_contrib/ppo_mask/ppo_mask.py", line 251, in _setup_learn self._last_obs = self.env.reset() File "/Users/ak2135_admin/opt/anaconda3/envs/Hybrid_Framework/lib/python3.7/site-packages/stable_baselines3/common/vec_env/dummy_vec_env.py", line 62, in reset self._save_obs(env_idx, obs) File "/Users/ak2135_admin/opt/anaconda3/envs/Hybrid_Framework/lib/python3.7/site-packages/stable_baselines3/common/vec_env/dummy_vec_env.py", line 92, in _save_obs self.buf_obs[key][env_idx] = obs ValueError: could not broadcast input array from shape (2,4) into shape (4)
I figure it would be straightforward to vectorise the environment as it is in sb3. Not sure if this is from my end or if parallel processing is not yet implemented for MaskblePPO. I would love to help (given some pointers) if it's something that can be added.
all packages should be up to date.
Hello,
Your issue is the same as @vwxyzjn in https://github.com/Stable-Baselines-Team/stable-baselines3-contrib/pull/25#issuecomment-922592839 : you can use VecEnv but the action masker must be on each single env for now, not on the VecEnv.
we would appreciate a PR that add support for vectorized action masks.
Hi @araffin, Just to clarify. So we must apply a mask wrapper to the environment before applying VecEnv to it?
So we must apply a mask wrapper to the environment before applying VecEnv to it?
you have two options:
- Implement the
action_masksmethod at the single env level: https://github.com/Stable-Baselines-Team/stable-baselines3-contrib/blob/master/sb3_contrib/common/envs/invalid_actions_env.py#L35-L36 - Wrap each env with
ActionMasker(can be done usingenv_wrapperargument ofmake_vec_env): https://github.com/Stable-Baselines-Team/stable-baselines3-contrib/blob/master/sb3_contrib/common/wrappers/action_masker.py#L7
Last option is to implement proper support for vectorized action mask and make a PR to SB3 contrib ;)
I am still having this issue. i think i am doing the second one above.
This code works.
env = PortfolioEnv()
env = ActionMasker(env,action_mask_fn=PortfolioEnv.action_masks)
env = DummyVecEnv([lambda: env])
And replacing the above with this line doesnt respect action masks. I see they are not being called.
env = make_vec_env(PortfolioEnv, wrapper_class=ActionMasker,wrapper_kwargs={'action_mask_fn':PortfolioEnv.action_masks} ,n_envs=2)
@araffin Can you please confirm if this is supposed to be working or we need the PR even for this. In my case above, i see that the action masks function is not getting invoked at runtime.
This code works.
if it is implemented at the single env level (that means you have a action_masks method), you don't need the action wrapper.
The way you do it look a bit weird too, you pass PortfolioEnv.action_masks as if it was a static method.
Can you please confirm if this is supposed to be working
Please check with built-in envs first, we do have tests for mutli-env support here: https://github.com/Stable-Baselines-Team/stable-baselines3-contrib/blob/master/tests/test_invalid_actions.py#L153-L172
Ah, that helps a ton. Looks like its all automatic as long as the function with the right name is there! Fantastic.
env = make_vec_env(PortfolioEnv,n_envs=2) works perfectly.
The way you do it look a bit weird too, you pass PortfolioEnv.action_masks as if it was a static method.
This is true. I wish i knew better what i am doing. Realtive python noob here.
Hello!
Indeed the MaskablePPO does not work with SubprocVecEnv although it works with DummyVecEnv.
SubprocVecEnv throws a pickle exception when reading the action_masks attribute.
To reproduce.
The following code works when the default VecEnv cls is DummyVecEnv but fails with it is a SubProcVecEnv
This is the working code
1 def mask_fn(env: gym.Env) -> np.ndarray:
2 return np.ones((1,2))
3 def make_env(env_id: str):
4 def _init() -> gym.Env:
5 env = gym.make(env_id)
6 env = ActionMasker(env, mask_fn)
7 return env
8 return _init
9
10 num_envs = 2
11 env = make_vec_env(make_env("CartPole-v1"), num_envs, vec_env_cls=DummyVecEnv)
12 env.reset()
13
14 model = MaskablePPO(MaskableActorCriticPolicy, env, verbose=1)
15 model.learn(1000)
When replacing line 11 for:
env = make_vec_env(make_env("CartPole-v1"), num_envs, vec_env_cls=SubprocVecEnv)
It throws the following exception.
Process ForkServerProcess-12:
Traceback (most recent call last):
File "/opt/conda/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
self.run()
File "/opt/conda/lib/python3.7/multiprocessing/process.py", line 99, in run
self._target(*self._args, **self._kwargs)
File "/opt/conda/lib/python3.7/site-packages/stable_baselines3/common/vec_env/subproc_vec_env.py", line 53, in _worker
remote.send(getattr(env, data))
File "/opt/conda/lib/python3.7/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/opt/conda/lib/python3.7/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function mask_fn at 0x7f3d94dd7710>: attribute lookup mask_fn on __main__ failed
Another experiment : changing the action wrapper with a simplified custom gym wrapper works with DummyVecEnv but not with SubprocVecEnv.
class Custom(gym.Wrapper):
def __init__(self, env):
super().__init__(env)
def action_masks(self):
return np.ones((1,2))
def make_env(env_id: str):
def _init() -> gym.Env:
env = gym.make(env_id)
env = Custom(env)
return env
return _init
env = make_vec_env(make_env("CartPole-v1"), num_envs, vec_env_cls=SubprocVecEnv)
Full output
Traceback (most recent call last):
File "/opt/conda/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
self.run()
File "/opt/conda/lib/python3.7/multiprocessing/process.py", line 99, in run
self._target(*self._args, **self._kwargs)
File "/opt/conda/lib/python3.7/site-packages/stable_baselines3/common/vec_env/subproc_vec_env.py", line 53, in _worker
remote.send(getattr(env, data))
File "/opt/conda/lib/python3.7/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/opt/conda/lib/python3.7/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <class '__main__.Custom'>: attribute lookup Custom on __main__ failed
---------------------------------------------------------------------------
EOFError Traceback (most recent call last)
/tmp/ipykernel_259/3574518961.py in <module>
32
33 model = MaskablePPO(MaskableActorCriticPolicy, env, verbose=1)
---> 34 model.learn(1000)
/opt/conda/lib/python3.7/site-packages/sb3_contrib/ppo_mask/ppo_mask.py in learn(self, total_timesteps, callback, log_interval, tb_log_name, reset_num_timesteps, use_masking, progress_bar)
519
520 while self.num_timesteps < total_timesteps:
--> 521 continue_training = self.collect_rollouts(self.env, callback, self.rollout_buffer, self.n_steps, use_masking)
522
523 if continue_training is False:
/opt/conda/lib/python3.7/site-packages/sb3_contrib/ppo_mask/ppo_mask.py in collect_rollouts(self, env, callback, rollout_buffer, n_rollout_steps, use_masking)
282 rollout_buffer.reset()
283
--> 284 if use_masking and not is_masking_supported(env):
285 raise ValueError("Environment does not support action masking. Consider using ActionMasker wrapper")
286
/opt/conda/lib/python3.7/site-packages/sb3_contrib/common/maskable/utils.py in is_masking_supported(env)
31 try:
32 # TODO: add VecEnv.has_attr()
---> 33 env.get_attr(EXPECTED_METHOD_NAME)
34 return True
35 except AttributeError:
/opt/conda/lib/python3.7/site-packages/stable_baselines3/common/vec_env/subproc_vec_env.py in get_attr(self, attr_name, indices)
162 for remote in target_remotes:
163 remote.send(("get_attr", attr_name))
--> 164 return [remote.recv() for remote in target_remotes]
165
166 def set_attr(self, attr_name: str, value: Any, indices: VecEnvIndices = None) -> None:
/opt/conda/lib/python3.7/site-packages/stable_baselines3/common/vec_env/subproc_vec_env.py in <listcomp>(.0)
162 for remote in target_remotes:
163 remote.send(("get_attr", attr_name))
--> 164 return [remote.recv() for remote in target_remotes]
165
166 def set_attr(self, attr_name: str, value: Any, indices: VecEnvIndices = None) -> None:
/opt/conda/lib/python3.7/multiprocessing/connection.py in recv(self)
248 self._check_closed()
249 self._check_readable()
--> 250 buf = self._recv_bytes()
251 return _ForkingPickler.loads(buf.getbuffer())
252
/opt/conda/lib/python3.7/multiprocessing/connection.py in _recv_bytes(self, maxsize)
405
406 def _recv_bytes(self, maxsize=None):
--> 407 buf = self._recv(4)
408 size, = struct.unpack("!i", buf.getvalue())
409 if maxsize is not None and size > maxsize:
/opt/conda/lib/python3.7/multiprocessing/connection.py in _recv(self, size, read)
381 if n == 0:
382 if remaining == size:
--> 383 raise EOFError
384 else:
385 raise OSError("got end of file during message")
EOFError:
The problem is related to the multiprocessing library that can only transfer Python objects to worker processes which can be pickled. Possible solutions can be found here.
I can help with this issue. I've found a potential fix.
The idea is (i) to replace the action_masks method for an action_masks attribute (which is pickable) and (ii) to update the action_masks attribute in each step by overriding the gym.Wrapper.step method.
This grealty simplifies the action_mask update process and avoids executing this update (using the actions_masks method) outside the environment. Recall that action_masks (method) is currently executed within the get_action_masks inside the MaskablePPO instance.
In this approach...
- The
ActionMaskershould be changed.
class ActionMasker(gym.Wrapper):
"""
Env wrapper providing the method required to support masking.
Exposes a method called action_masks(), which returns masks for the wrapped env.
This wrapper is not needed if the env exposes the expected method itself.
:param env: the Gym environment to wrap
:param action_mask_fn: A function that takes a Gym environment and returns an action mask,
or the name of such a method provided by the environment.
"""
def __init__(self, env: gym.Env, action_mask_fn: Union[str, Callable[[gym.Env], np.ndarray]]):
super().__init__(env)
self.action_masks: np.ndarray = None
if isinstance(action_mask_fn, str):
found_method = getattr(self.env, action_mask_fn)
if not callable(found_method):
raise ValueError(f"Environment attribute {action_mask_fn} is not a method")
self._action_mask_fn = found_method
else:
self._action_mask_fn = action_mask_fn
def step(self, action):
obs,rewards, dones, info = self.env.step(action)
self.action_masks = self._action_mask_fn(self)
return obs, rewards, dones, info
def reset(self, **kwargs):
obs = self.env.reset(**kwargs)
self.action_masks = self._action_mask_fn(self)
return obs
- Then the
get_actions_maskfromsb3_contrib.common.maskable.utilsshould be changed
def get_action_masks(env: GymEnv) -> np.ndarray:
"""
Checks whether gym env exposes a method returning invalid action masks
:param env: the Gym environment to get masks from
:return: A numpy array of the masks
"""
if isinstance(env, VecEnv):
#previous --> return np.stack(env.env_method(EXPECTED_METHOD_NAME))
return np.stack(env.get_attr(EXPECTED_METHOD_NAME))
else:
return getattr(env, EXPECTED_METHOD_NAME)
Then training with multiple process gives no error.
def action_masks_fn(env: gym.Env) -> np.ndarray:
return np.ones((1,2))
def make_env(env_id):
def _init():
env = gym.make(env_id)
env = ActionMasker(env, action_masks_fn)
return env
return _init
num_envs = 2
if __name__ == "__main__":
env = SubprocVecEnv([make_env("CartPole-v1") for i in range(num_envs)])
obs = env.reset()
model = MaskablePPO(MaskableActorCriticPolicy, env, verbose=1)
model.learn(1000)
Note. Other changes must be done in InvalidActionEnvDiscrete to account for this changes.
Please let me know if you find this approach useful and we can continue with the PR.
Thank you!
I didn't bump into this one - maybe because I am returning a python list instead of a numpy one in my action_masks function?
I have also met the same issue, that if I use Maskable PPO, I can not use SubProvVecEnv with it, same error showing up.
Indeed the MaskablePPO does not work with SubprocVecEnv although it works with DummyVecEnv. SubprocVecEnv throws a pickle exception when reading the action_masks attribute.
quick remark (as I won't have the time to look into the details of that one before some days), the recommended way to use Maskable PPO is not with the wrapper but by defining the method directly in the env (as done in https://github.com/Stable-Baselines-Team/stable-baselines3-contrib/blob/master/sb3_contrib/common/envs/invalid_actions_env.py), this should work.
The wrapper is there in case you cannot modify/access the underlying env.
Still It does not work with SubprocVect as methods can not be pickle.
SubprocVect uses the multiprocessing library to launch different processes to speed up the RL training. Now, when requesting the action_masks method (get_attr('action_masks'), the multiprocessing library cannot send it through the queue and throws an error as class methods cannot be pickle. The reason then lies on the multiprocessing library.
Still It does not work with SubprocVect as methods can not be pickle.
This does run for me:
from sb3_contrib import MaskablePPO
from sb3_contrib.common.envs import InvalidActionEnvDiscrete
from sb3_contrib.common.maskable.evaluation import evaluate_policy
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.vec_env import SubprocVecEnv
def make_env():
return InvalidActionEnvDiscrete(dim=20, n_invalid_actions=10)
if __name__ == "__main__":
env = make_vec_env(make_env, n_envs=2, vec_env_cls=SubprocVecEnv)
model = MaskablePPO("MlpPolicy", env, n_steps=256, gamma=0.4, seed=32, verbose=1)
model.learn(1000, progress_bar=True)
evaluate_policy(model, env, warn=False)
but it fails if you replace the make_env by:
def action_masks(env) -> List[bool]:
return env.action_masks()
def make_env():
env = InvalidActionEnvDiscrete(dim=20, n_invalid_actions=10)
env = ActionMasker(env, action_masks)
return env
For me same error happens, and with @CristoJV method theoratically should work.
Error message:
ens5 | Process ForkServerProcess-1:
ens5 | Process ForkServerProcess-2:
ens5 | Traceback (most recent call last):
ens5 | Traceback (most recent call last):
ens5 | File "/usr/lib/python3.10/multiprocessing/process.py", line 314, in _bootstrap
ens5 | self.run()
ens5 | File "/usr/lib/python3.10/multiprocessing/process.py", line 108, in run
ens5 | self._target(*self._args, **self._kwargs)
ens5 | File "/home/ftuser/.local/lib/python3.10/site-packages/stable_baselines3/common/vec_env/subproc_vec_env.py", line 53, in _worker
ens5 | remote.send(getattr(env, data))
ens5 | File "/usr/lib/python3.10/multiprocessing/process.py", line 314, in _bootstrap
ens5 | self.run()
ens5 | File "/usr/lib/python3.10/multiprocessing/connection.py", line 206, in send
ens5 | self._send_bytes(_ForkingPickler.dumps(obj))
ens5 | File "/usr/lib/python3.10/multiprocessing/process.py", line 108, in run
ens5 | self._target(*self._args, **self._kwargs)
ens5 | File "/usr/lib/python3.10/multiprocessing/reduction.py", line 51, in dumps
ens5 | cls(buf, protocol).dump(obj)
ens5 | File "/home/ftuser/.local/lib/python3.10/site-packages/stable_baselines3/common/vec_env/subproc_vec_env.py", line 53, in _worker
ens5 | remote.send(getattr(env, data))
ens5 | File "/usr/lib/python3.10/multiprocessing/connection.py", line 206, in send
ens5 | self._send_bytes(_ForkingPickler.dumps(obj))
ens5 | File "/usr/lib/python3.10/multiprocessing/reduction.py", line 51, in dumps
ens5 | cls(buf, protocol).dump(obj)
ens5 | _pickle.PicklingError: Can't pickle <class 'ppo_test.MyRLEnv'>: import of module 'ppo_test' failed
I thinks that changing action_masks from being a function to being an attribute will fix these issues, without compromising or impacting the rest of the code, allowing Multiprocessing instead of Multithreading that it is currently available. Recall Python GIL.
Also, the InvalidActionDiscrete class action_masks method just return the list as seen in:
def action_masks(self) -> List[bool]:
return [action not in self.invalid_actions for action in self.possible_actions]
and update it within
def _choose_next_state(self) -> None:
self.state = self.action_space.sample()
# Randomly choose invalid actions that are not the current state
potential_invalid_actions = [i for i in self.possible_actions if i != self.state]
self.invalid_actions = np.random.choice(potential_invalid_actions, self.n_invalid_actions, replace=False)
Why do not change directly the action_masks attribute instead of using a method that just return the action_masks list.
The potential fix for this class can be.
Solution 1
class InvalidActionEnvDiscrete(IdentityEnv):
"""
Identity env with a discrete action space. Supports action masking.
"""
def __init__(
self,
dim: Optional[int] = None,
ep_length: int = 100,
n_invalid_actions: int = 0,
):
if dim is None:
dim = 1
assert n_invalid_actions < dim, f"Too many invalid actions: {n_invalid_actions} < {dim}"
space = spaces.Discrete(dim)
self.n_invalid_actions = n_invalid_actions
self.possible_actions = np.arange(space.n)
self.invalid_actions: List[int] = []
# Policy checks this attribute
self.actions_masks : List[Any]= []
super().__init__(space=space, ep_length=ep_length)
def _choose_next_state(self) -> None: # Keep this method as it is from IdentityEnv
self.state = self.action_space.sample()
def _update_action_masks(self: Env) -> List[Any]: # or actions_masks_fn(self: Env) -> List[Any] is preferred
#Updates the actions masks acording to the environment. Equivalent to actions_masks
# Randomly choose invalid actions that are not the current state
potential_invalid_actions = [i for i in self.possible_actions if i != self.state]
self.invalid_actions = np.random.choice(potential_invalid_actions, self.n_invalid_actions, replace=False)
return [action not in self.invalid_actions for action in self.possible_actions]
def step(self, action): #Updates the actions masks at each step.
obs,rewards, dones, info = super().step(action)
self.action_masks = self._update_action_masks(self)
return obs, rewards, dones, info
def reset(self, **kwargs): #Updates the actions masks at reset
obs = super().reset(**kwargs)
self.action_masks = self._update_action_masks(self)
return obs
Another less elegant solution but simplier can be:
Solution 2
class InvalidActionEnvDiscrete(IdentityEnv):
"""
Identity env with a discrete action space. Supports action masking.
"""
def __init__(
self,
dim: Optional[int] = None,
ep_length: int = 100,
n_invalid_actions: int = 0,
):
if dim is None:
dim = 1
assert n_invalid_actions < dim, f"Too many invalid actions: {n_invalid_actions} < {dim}"
space = spaces.Discrete(dim)
self.n_invalid_actions = n_invalid_actions
self.possible_actions = np.arange(space.n)
self.invalid_actions: List[int] = []
# Policy checks this attribute
self.actions_masks : List[Any]= []
super().__init__(space=space, ep_length=ep_length)
def _choose_next_state(self) -> None:
self.state = self.action_space.sample()
potential_invalid_actions = [i for i in self.possible_actions if i != self.state]
self.invalid_actions = np.random.choice(potential_invalid_actions, self.n_invalid_actions, replace=False)
self.action_masks=[action not in self.invalid_actions for action in self.possible_actions]
I think the last solution is less suitable as it adds another additional functionality to the _choose_next_state private function which also it does not "theoretically" allows is access.
@CristoJV can you patch it up somehow? in a PR or something? I just want to confirm that it also works for me for example.
Hi! @CristoJV , I've tried your method, changing action_masks from a function to a list, and it works well! Howerver, it is just suitable when the len(list) is the same in parallel envs. I met the ”TypeError: can't convert np.ndarray of type numpy.object_. The only supported types are: float64, float32, float16, complex64, complex128...... “ if there are different len(list) in each env. Is there any possible way to fix this?
Maybe it could work with a bool list which has the same len as Discrete action_space. How about convey the bool mask to "action = env.action_space.sample(mask=mask)"(gym>=0.25.0)?
Bumping this up.... anyone got a PR or something to try? I am struggling to get MPPO to work with subproc :/
Bumping this up.... anyone got a PR or something to try? I am struggling to get MPPO to work with subproc :/
https://github.com/Stable-Baselines-Team/stable-baselines3-contrib/issues/49#issuecomment-1422869188
https://github.com/Stable-Baselines-Team/stable-baselines3-contrib/issues/49#issuecomment-1436841759
To bumping it up again, I believe there is still a problem with multiprocessing/connection.py: My Code is:
class Custom(gym.Env):
def __init__(self, *args, **kwargs):
\\\\\\
self.reset()
def _STEP(self, action):
\\\\\
return self.state, self.reward, done, self.info, {}
def _RESET(self):
\\\\\
return self.state
def action_masks(self) -> List[bool]:
return ...
def step(self, action):
return self._STEP(action)
def reset(self, seed: Optional[int] = None, options: Optional[Dict] = None):
if seed is not None:
super().reset(seed=seed)
return self._RESET(), {}
def make_env():
return Custom()
if __name__ == "__main__":
env = make_vec_env(make_env, n_envs=2, vec_env_cls=SubprocVecEnv)
model = MaskablePPO(MaskableActorCriticPolicy, env, n_steps=256, gamma=0.4, seed=32, verbose=1)
model.learn(1000)
and the error is:
[/usr/lib/python3.10/multiprocessing/connection.py](https://localhost:8080/#) in _recv_bytes(self, maxsize)
412
413 def _recv_bytes(self, maxsize=None):
--> 414 buf = self._recv(4)
415 size, = struct.unpack("!i", buf.getvalue())
416 if size == -1:
[/usr/lib/python3.10/multiprocessing/connection.py](https://localhost:8080/#) in _recv(self, size, read)
381 if n == 0:
382 if remaining == size:
--> 383 raise EOFError
384 else:
385 raise OSError("got end of file during message")
EOFError: