verl icon indicating copy to clipboard operation
verl copied to clipboard

卡在代码沙盒的地方

Open no-execution opened this issue 10 months ago • 4 comments

使用prime的代码校验逻辑的时候,走到以下分支之后会卡住不动,查看后台监控也没有任何进程在跑(没有rollout),想问一下怎么解决?

Image

no-execution avatar Feb 16 '25 11:02 no-execution

Hello, I think this is a common problem because neither coroutines nor threads can guarantee timeout handling when the CPU is busy. Only using processes can achieve that. Below is the minimal code to reproduce the issue; perhaps this requires a better sandbox.

async def infinite_loop():
    while True:
        pass


async def main():
    try:
        await asyncio.wait_for(infinite_loop(), timeout=1.0)
    except asyncio.TimeoutError:
        print("Timeout reached. The infinite loop did not finish in time.")

# timeout will not be triggered
asyncio.run(main())

huiyeruzhou avatar Feb 25 '25 06:02 huiyeruzhou

Same issue

c-box avatar Feb 25 '25 08:02 c-box

same here

edchengg avatar Feb 26 '25 00:02 edchengg

same issue

StevenZHB avatar Feb 26 '25 19:02 StevenZHB

same issue

lihaoling avatar Mar 07 '25 05:03 lihaoling

same issue

JieWu02 avatar Mar 07 '25 05:03 JieWu02

simplify prime.py as test code below: compute_score: maybe in while True loop data_source count is 10, max_workers is 4. timeout is 12s

import asyncio
from concurrent.futures import ProcessPoolExecutor
import random
import time
from functools import partial
import psutil
import os

def compute_score(task_id):
    sleep_time = random.uniform(1, 30)
    print(f"pid: {os.getpid()} task {task_id} Sleeping for {sleep_time:.2f} seconds...")
    if sleep_time > 15:
        print(f"\tpid {os.getpid()} is in while true loop")
        while True:
            pass
    time.sleep(sleep_time)
    return sleep_time

async def single_compute_score(compute_score_fn, executor, task_id, timeout=12.0):
    loop = asyncio.get_running_loop()
    try:
        future = loop.run_in_executor(executor, partial(compute_score_fn, task_id))
        return await asyncio.wait_for(future, timeout=timeout)
    except asyncio.TimeoutError:
        print(f"Timeout occurred for a solution")
        return None
    except Exception as e:
        print(f"Error processing solution: Error: {e}")
        return None

async def parallel_compute_score_async():
    data_source = [1]*10
    results = [None] * len(data_source)

    # 循环外面申请ProcessPool,复用executor
    with ProcessPoolExecutor(max_workers=4) as executor:
        try:
            tasks_async = [
                single_compute_score(
                    compute_score, 
                    executor, 
                    task_id = i,
                    timeout=12.
                )
                for i in range(0, len(data_source))
            ]
            results = await asyncio.gather(*tasks_async, return_exceptions=False)
        except Exception as e:
            print(f"[Exception] async gather failed: {e}")
            raise
        finally:
            terminated_count = 0
            for pid, proc in executor._processes.items():
                try:
                    p = psutil.Process(pid)
                    p.terminate()
                    try:
                        p.wait(timeout=5)
                    except psutil.TimeoutExpired:
                        p.kill()
                    terminated_count += 1
                except Exception:
                    pass
            print(f"[Shutdown] {terminated_count} subprocess(es) terminated.")

    print(f'======{results=}')
    return results

start_at = time.time()

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
    loop.run_until_complete(
        parallel_compute_score_async()
    )
finally:
    loop.close()
    
# asyncio.run(parallel_compute_score_async())
print(f'=====finish {time.time() - start_at}')

after run test.py, it seems hang. Image

yang-ybb avatar Jul 22 '25 07:07 yang-ybb

seems add os.kill(pid, signal.SIGTERM) after p.kill() can solve

Image Image

yang-ybb avatar Jul 22 '25 08:07 yang-ybb

  1. enable launch_reward_fn_async and give compute_reward_async more cpus in
@ray.remote(num_cpus=1)
def compute_reward_async
  1. use an external sandbox: https://verl.readthedocs.io/en/latest/examples/sandbox_fusion_example.html

chenhaiq avatar Jul 22 '25 08:07 chenhaiq