baselines icon indicating copy to clipboard operation
baselines copied to clipboard

SubProcVecEnv raises ConnectionResetError: [Errno 104] Connection reset by peer

Open MakisKans opened this issue 5 years ago • 11 comments

I'm trying to run the following code and test PPO with Sonic the hedghehog, running it in parralel with SubProcVecEnv Unfortunately I run in the following error:

Traceback (most recent call last):

  File "<ipython-input-2-9712559f6750>", line 3, in <module>
    env = SubprocVecEnv([foo(game,state) for _ in range(num_cpu)])

  File "/home/chryskan/anaconda3/lib/python3.7/site-packages/stable_baselines/common/vec_env/subproc_vec_env.py", line 96, in __init__
    observation_space, action_space = self.remotes[0].recv()

  File "/home/chryskan/anaconda3/lib/python3.7/multiprocessing/connection.py", line 250, in recv
    buf = self._recv_bytes()

  File "/home/chryskan/anaconda3/lib/python3.7/multiprocessing/connection.py", line 407, in _recv_bytes
    buf = self._recv(4)

  File "/home/chryskan/anaconda3/lib/python3.7/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)

ConnectionResetError: [Errno 104] Connection reset by peer

This is the code:

from stable_baselines.common.policies import MlpPolicy, CnnPolicy
from stable_baselines.common import make_vec_env
from stable_baselines import PPO2
from my_wrappers import wrap_sonic, make_sonic
from stable_baselines.common.env_checker import check_env
from stable_baselines.common.vec_env import SubprocVecEnv`

def foo(game, state):
        def bar():
            return wrap_sonic(make_sonic(game, state), episode_life=False, 
                  frame_stack=True, scale=True,
                  reward_scale = True, sonic_actions=True, 
                  max_x_reward=True)
        return bar

game , state = "SonicTheHedgehog-Genesis", "GreenHillZone.Act1"
num_cpu = 4
env = SubprocVecEnv([foo(game,state) for _ in range(num_cpu)])

model = PPO2(CnnPolicy, env, verbose=1)
model.learn(total_timesteps=1000_000)

And the my_wrappers module

import numpy as np
from collections import deque
import gym
from gym import spaces
import cv2
from retro_contest.local import make

height, width = 84, 84

class ActionsDiscretizer(gym.ActionWrapper):
    def __init__(self, env):
        super(ActionsDiscretizer, self).__init__(env)
        buttons = ["B", "A", "MODE", "START", "UP", "DOWN", "LEFT", 
                   "RIGHT", "C", "Y", "X", "Z"]
        actions = [['LEFT'], ['RIGHT'], ['LEFT', 'DOWN'], 
                   ['RIGHT', 'DOWN'], ['DOWN'],
                   ['DOWN', 'B'], ['B']]
        self._actions = []

        for action in actions:
            arr = np.array([False] * 12)
            for button in action:
                arr[buttons.index(button)] = True
            self._actions.append(arr)
        self.action_space = gym.spaces.Discrete(len(self._actions))

    def action(self, a):
        return self._actions[a].copy()

class RewardScaler(gym.RewardWrapper):
    """
    Bring rewards to a reasonable scale for PPO.
    This is incredibly important and effects performance
    drastically.
    """
    def reward(self, reward):
        return reward * 0.01

class AllowBacktracking(gym.Wrapper):
    """
    Use deltas in max(X) as the reward, rather than deltas
    in X. This way, agents are not discouraged too heavily
    from exploring backwards if there is no way to advance
    head-on in the level.
    """
    def __init__(self, env):
        super(AllowBacktracking, self).__init__(env)
        self._cur_x = 0
        self._max_x = 0

    def reset(self, **kwargs):
        self._cur_x = 0
        self._max_x = 0
        return self.env.reset(**kwargs)

    def step(self, action):
        obs, rew, done, info = self.env.step(action)
        self._cur_x += rew
        rew = max(0, self._cur_x - self._max_x)
        self._max_x = max(self._max_x, self._cur_x)
        return obs, rew, done, info

class NoopResetEnvSonic(gym.Wrapper):
    def __init__(self, env, noop_max=30):
        """Sample initial states by taking random number of no-ops on reset.
        No-op is assumed to be action 0.
        """
        gym.Wrapper.__init__(self,env)
        self.noop_max = noop_max
        self.override_num_noops = None
        self.noop_action = np.zeros(env.action_space.n)
    
    def reset(self, **kwargs):
         """ Do no-op action for a number of steps in [1, noop_max]."""
         self.env.reset(**kwargs)
         if self.override_num_noops is not None:
             noops = self.override_num_noops
         else:
            noops = self.unwrapped.np_random.randint(1, self.noop_max+1)
         assert noops > 0
         obs = None
         for _ in range(noops):
             obs, _, done, _ = self.env.step(self.noop_action)
             if done:
                 obs = self.env.reset(**kwargs)
         return obs
     
class MaxAndSkipEnv(gym.Wrapper):
    def __init__(self, env, skip=4):
        """Return only every `skip`-th frame"""
        gym.Wrapper.__init__(self, env)
        # most recent raw observations (for max pooling across time steps)
        self._obs_buffer = np.zeros((2,)+env.observation_space.shape, dtype='uint8')
        self._skip = skip
    
    def step(self, action):
        """Repeat action, sum reward, and max over last observations."""        
        total_reward = 0.0
        done = None
        for i in range(self._skip):
            obs, reward, done, info = self.env.step(action)
            if i==self._skip - 2: self._obs_buffer[0] = obs
            if i==self._skip - 1: self._obs_buffer[1] = obs
            total_reward += reward
            if done:
                break
        # Note that the observation on the done=True frame
        # doesn't matter
        max_frame = self._obs_buffer.max(axis=0)

        return max_frame, total_reward, done, info
     
class FireResetEnv(gym.Wrapper):
    def __init__(self, env):
        """Take action on reset for environments that are fixed until firing."""
        gym.Wrapper.__init__(self, env)
        assert env.unwrapped.get_action_meanings()[1] == 'FIRE'
        assert len(env.unwrapped.get_action_meanings()) >= 3
    
    def reset(self, **kwargs):
        self.env.reset(**kwargs)
        obs, _, done, _ = self.env.step(1)
        if done:
            self.env.reset(**kwargs)
        obs, _, done, _ = self.env.step(2)
        if done:
            self.env.reset(**kwargs)
        return obs

class EpisodicLifeEnv(gym.Wrapper):
    def __init__(self, env):
        """Make end-of-life == end-of-episode, but only reset on true game over.
        Done by DeepMind for the DQN and co. since it helps value estimation.
        """
        gym.Wrapper.__init__(self, env)
        self.lives = 0
        self.was_real_done  = True
        
    def step(self, action):
        obs, reward, done, info = self.env.step(action)
        self.was_real_done = done
        # check current lives, make loss of life terminal,
        # then update lives to handle bonus lives
        lives = self.env.unwrapped.ale.lives()
        if lives < self.lives and lives > 0:
            # for Qbert somtimes we stay in lives == 0 condtion for a few frames
            # so its important to keep lives > 0, so that we only reset once
            # the environment advertises done.
            done = True
        self.lives = lives
        return obs, reward, done, info
    
    def reset(self, **kwargs):
        """Reset only when lives are exhausted.
        This way all states are still reachable even though lives are episodic,
        and the learner need not know about any of this behind-the-scenes.
        """
        if self.was_real_done:
            obs = self.env.reset(**kwargs)
        else:
            # no-op step to advance from terminal/lost life state
            obs ,_, _, _ = self.env.step(0)    
        self.lives = self.env.unwrapped.ale.lives()
        return obs

class ClipRewardEnv(gym.RewardWrapper):
    def reward(self, reward):
        """Bin reward to {+1, 0, -1} by its sign."""
        return np.sign(reward)

class WarpFrame(gym.ObservationWrapper):
    def __init__(self, env):
        """Warp frames to 84x84 as done in the Nature paper and later work."""
        gym.ObservationWrapper.__init__(self, env)
        self.width = width
        self.height = height
        self.observation_space = spaces.Box(low=0, high=255, shape=(self.height, self.width, 1), dtype=np.uint8)

    def observation(self, frame):
        frame = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
        frame = cv2.resize(frame, (self.width, self.height), interpolation=cv2.INTER_AREA)
        return frame[:, :, None]

class FrameStack(gym.Wrapper):
    def __init__(self, env, k):
        """Stack k last frames.
        Returns lazy array, which is much more memory efficient.
        See Also
        --------
        baselines.common.atari_wrappers.LazyFrames
        """
        gym.Wrapper.__init__(self, env)
        self.k = k
        self.frames = deque([], maxlen=k)
        shp = env.observation_space.shape
        self.observation_space = spaces.Box(low=0, high=255, shape=(shp[0], shp[1], shp[2] * k), dtype=np.uint8)

    def reset(self):
        ob = self.env.reset()
        for _ in range(self.k):
            self.frames.append(ob)
        return self._get_ob()

    def step(self, action):
        ob, reward, done, info = self.env.step(action)
        self.frames.append(ob)
        return self._get_ob(), reward, done, info

    def _get_ob(self):
        assert len(self.frames) == self.k
        return np.reshape(self.frames, newshape=self.observation_space.shape)

class ScaledFloatFrame(gym.ObservationWrapper):
    def observation(self, observation):
        # careful! This undoes the memory optimization, use
        # with smaller replay buffers only.
        return np.array(observation).astype(np.float32) / 255.0
    
class LazyFrames(object):
    def __init__(self, frames):
        """This object ensures that common frames between the observations are only stored once.
        It exists purely to optimize memory usage which can be huge for DQN's 1M frames replay
        buffers.
        This object should only be converted to numpy array before being passed to the model.
        You'd not believe how complex the previous solution was."""
        self._frames = frames

    def __array__(self, dtype=None):
        out = np.concatenate(self._frames, axis=2)
        if dtype is not None:
            out = out.astype(dtype)
        return out

def make_sonic(game, state):
    env = make(game=game, state=state)
    #env = retrowrapper.RetroWrapper(game=game, state=state)
    env = NoopResetEnvSonic(env, noop_max=30)
    env = MaxAndSkipEnv(env, skip=4)
    return env

def wrap_sonic(env, episode_life=True, clip_rewards=False, 
                  frame_stack=False, scale=False, pytorch_img=False,
                  reward_scale = True, sonic_actions=True, max_x_reward=True):
    if episode_life:
        env = EpisodicLifeEnv(env)
    
    key = 'get_action_meanings'
    if hasattr(env.unwrapped, key):
        meanings = env.unwrapped.get_action_meanings()
    else:
        meanings = [env.unwrapped.get_action_meaning(np.zeros(a)) 
                        for a in range(env.action_space.n)]
    if 'FIRE' in meanings:
        env = FireResetEnv(env)
    env = WarpFrame(env)
    if scale:
        env = ScaledFloatFrame(env)
    if max_x_reward:
        env=AllowBacktracking(env)
    if sonic_actions:
        env=ActionsDiscretizer(env)
    if clip_rewards:
        env = ClipRewardEnv(env)
    if reward_scale:
        env = RewardScaler(env)
    if frame_stack:
        env = FrameStack(env, 4)
    return env

MakisKans avatar Jan 28 '20 23:01 MakisKans

I was facing similar issues, but I solved it by removing verbose. Try removing verbose =1 , from model = PPO2(CnnPolicy, env, verbose=1).

fnuabhimanyu avatar Apr 08 '20 17:04 fnuabhimanyu

still, struggling in this issues, does anyone help me out?

HongminWu avatar Apr 13 '20 09:04 HongminWu

Same problem. Does this have an answer now?

cmal avatar Sep 19 '20 11:09 cmal

Same

bamasa avatar Apr 07 '21 16:04 bamasa

Facing the same problem. Is anyone having a solution to this ?

harsh-ux avatar Aug 23 '21 19:08 harsh-ux

/root/miniconda3/lib/python3.7/multiprocessing/connection.py:379: ConnectionResetError
----------------------------------------------------------------------------- Captured stderr call ------------------------------------------------------------------------------
Process SpawnProcess-2:
Traceback (most recent call last):
  File "/root/miniconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/root/miniconda3/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/root/virtualenv/venv/baselines/baselines/common/vec_env/subproc_vec_env.py", line 15, in worker
    envs = [env_fn_wrapper() for env_fn_wrapper in env_fn_wrappers.x]
  File "/root/virtualenv/venv/baselines/baselines/common/vec_env/subproc_vec_env.py", line 15, in <listcomp>
    envs = [env_fn_wrapper() for env_fn_wrapper in env_fn_wrappers.x]
  File "/root/virtualenv/venv/baselines/baselines/common/tests/test_env_after_learn.py", line 15, in make_env
    env = gym.make('CartPole-v1' if algo == 'acktr' else 'PongNoFrameskip-v4')
  File "/root/miniconda3/lib/python3.7/site-packages/gym/envs/registration.py", line 156, in make
    return registry.make(id, **kwargs)
  File "/root/miniconda3/lib/python3.7/site-packages/gym/envs/registration.py", line 101, in make
    env = spec.make(**kwargs)
  File "/root/miniconda3/lib/python3.7/site-packages/gym/envs/registration.py", line 73, in make
    env = cls(**_kwargs)
  File "/root/miniconda3/lib/python3.7/site-packages/gym/envs/atari/atari_env.py", line 49, in __init__
    self.game_path = atari_py.get_game_path(game)
  File "/root/miniconda3/lib/python3.7/site-packages/atari_py/games.py", line 20, in get_game_path
    raise Exception('ROM is missing for %s, see https://github.com/openai/atari-py#roms for instructions' % (game_name,))
Exception: ROM is missing for pong, see https://github.com/openai/atari-py#roms for instructions
__________________________________________________________________________ test_env_after_learn[deepq] __________________________________________________________________________

algo = 'deepq'

    @pytest.mark.parametrize('algo', algos)
    def test_env_after_learn(algo):
        def make_env():
            # acktr requires too much RAM, fails on travis
            env = gym.make('CartPole-v1' if algo == 'acktr' else 'PongNoFrameskip-v4')
            return env
    
        make_session(make_default=True, graph=tf.Graph())
>       env = SubprocVecEnv([make_env])

baselines/common/tests/test_env_after_learn.py:19: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
baselines/common/vec_env/subproc_vec_env.py:71: in __init__
    observation_space, action_space, self.spec = self.remotes[0].recv().x
/root/miniconda3/lib/python3.7/multiprocessing/connection.py:250: in recv
    buf = self._recv_bytes()
/root/miniconda3/lib/python3.7/multiprocessing/connection.py:407: in _recv_bytes
    buf = self._recv(4)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <multiprocessing.connection.Connection object at 0x7f4be43769b0>, size = 4, read = <built-in function read>

    def _recv(self, size, read=_read):
        buf = io.BytesIO()
        handle = self._handle
        remaining = size
        while remaining > 0:
>           chunk = read(handle, remaining)
E           ConnectionResetError: [Errno 104] Connection reset by peer

/root/miniconda3/lib/python3.7/multiprocessing/connection.py:379: ConnectionResetError
----------------------------------------------------------------------------- Captured stderr call 

I noticed this error:

 raise Exception('ROM is missing for %s, see https://github.com/openai/atari-py#roms for instructions' % (game_name,))

When I solved this error, ConnectionResetError: [Errno 104] Connection reset by peer disappeared.But there is one error left

===================================================== 46 failed, 49 passed, 31 skipped, 1111 warnings in 110.05s (0:01:50) ======================================================

The error becomes:

====================================================== 1 failed, 94 passed, 31 skipped, 6368 warnings in 269.86s (0:04:29) ======================================================
baselines/common/tests/test_doc_examples.py:20: in <lambda>
    venv = DummyVecEnv([lambda: cmd_util.make_mujoco_env('Reacher-v2', seed=0)])
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

env_id = 'Reacher-v2', seed = 0, reward_scale = 1.0

    def make_mujoco_env(env_id, seed, reward_scale=1.0):
        """
        Create a wrapped, monitored gym.Env for MuJoCo.
        """
>       rank = MPI.COMM_WORLD.Get_rank()
E       AttributeError: 'NoneType' object has no attribute 'COMM_WORLD'

baselines/common/cmd_util.py:112: AttributeError

My environment(for your information):

┌──(root💀agi)-[~]
└─# uname -a                   
Linux agi 5.14.0-kali4-amd64 #1 SMP Debian 5.14.16-1kali1 (2021-11-05) x86_64 GNU/Linux

                                                                                                                                                                                
┌──(venv)─(root💀agi)-[~/virtualenv/venv/baselines]
└─# pip list                                                                                                                                                               130 ⨯
Package                Version    Location                       
---------------------- ---------- -------------------------------
absl-py                1.0.0      
asn1crypto             0.24.0     
astor                  0.8.1      
atari-py               0.2.9      
attrs                  21.2.0     
baselines              0.1.6      /root/virtualenv/venv/baselines
box2d-py               2.3.8      
cached-property        1.5.2      
certifi                2021.10.8  
cffi                   1.12.2     
chardet                3.0.4      
click                  8.0.3      
cloudpickle            1.2.2      
conda                  4.10.3     
conda-package-handling 1.7.3      
cryptography           2.6.1      
cycler                 0.11.0     
Cython                 0.29.26    
filelock               3.4.0      
fonttools              4.28.5     
future                 0.18.2     
gast                   0.5.3      
glfw                   2.5.0      
google-pasta           0.2.0      
grpcio                 1.43.0     
gym                    0.15.7     
h5py                   3.6.0      
idna                   2.8        
imageio                2.13.4     
importlib-metadata     4.10.0     
iniconfig              1.1.1      
joblib                 1.1.0      
Keras-Applications     1.0.8      
Keras-Preprocessing    1.1.2      
kiwisolver             1.3.2      
lockfile               0.12.2     
Markdown               3.3.6      
matplotlib             3.5.1      
mujoco-py              1.50.1.68  
numpy                  1.21.5     
opencv-python          4.5.4.60   
packaging              21.3       
pandas                 1.3.5      
Pillow                 8.4.0      
pip                    19.0.3     
pluggy                 1.0.0      
protobuf               3.19.1     
py                     1.11.0     
pycosat                0.6.3      
pycparser              2.19       
pyglet                 1.5.0      
pyOpenSSL              19.0.0     
pyparsing              3.0.6      
PySocks                1.6.8      
pytest                 6.2.5      
python-dateutil        2.8.2      
pytz                   2021.3     
requests               2.21.0     
ruamel-yaml            0.15.46    
scipy                  1.7.3      
setuptools             41.0.0     
six                    1.12.0     
tensorboard            1.14.0     
tensorflow             1.14.0     
tensorflow-estimator   1.14.0     
termcolor              1.1.0      
toml                   0.10.2     
tqdm                   4.62.3     
typing-extensions      4.0.1      
urllib3                1.24.1     
Werkzeug               2.0.2      
wheel                  0.33.1     
wrapt                  1.13.3     
youtube-dl             2021.12.17 
zipp                   3.6.0      

Open-AGI avatar Dec 21 '21 12:12 Open-AGI

same error...

Mickeyyyang avatar Jan 11 '22 02:01 Mickeyyyang

same error

Vaillus avatar Feb 03 '23 15:02 Vaillus

Same issue. When i try to use SubProcVecEnv on colab

kvrban avatar Apr 20 '23 14:04 kvrban