卡在代码沙盒的地方
使用prime的代码校验逻辑的时候,走到以下分支之后会卡住不动,查看后台监控也没有任何进程在跑(没有rollout),想问一下怎么解决?
Hello, I think this is a common problem because neither coroutines nor threads can guarantee timeout handling when the CPU is busy. Only using processes can achieve that. Below is the minimal code to reproduce the issue; perhaps this requires a better sandbox.
async def infinite_loop():
while True:
pass
async def main():
try:
await asyncio.wait_for(infinite_loop(), timeout=1.0)
except asyncio.TimeoutError:
print("Timeout reached. The infinite loop did not finish in time.")
# timeout will not be triggered
asyncio.run(main())
Same issue
same here
same issue
same issue
same issue
simplify prime.py as test code below: compute_score: maybe in while True loop data_source count is 10, max_workers is 4. timeout is 12s
import asyncio
from concurrent.futures import ProcessPoolExecutor
import random
import time
from functools import partial
import psutil
import os
def compute_score(task_id):
sleep_time = random.uniform(1, 30)
print(f"pid: {os.getpid()} task {task_id} Sleeping for {sleep_time:.2f} seconds...")
if sleep_time > 15:
print(f"\tpid {os.getpid()} is in while true loop")
while True:
pass
time.sleep(sleep_time)
return sleep_time
async def single_compute_score(compute_score_fn, executor, task_id, timeout=12.0):
loop = asyncio.get_running_loop()
try:
future = loop.run_in_executor(executor, partial(compute_score_fn, task_id))
return await asyncio.wait_for(future, timeout=timeout)
except asyncio.TimeoutError:
print(f"Timeout occurred for a solution")
return None
except Exception as e:
print(f"Error processing solution: Error: {e}")
return None
async def parallel_compute_score_async():
data_source = [1]*10
results = [None] * len(data_source)
# 循环外面申请ProcessPool,复用executor
with ProcessPoolExecutor(max_workers=4) as executor:
try:
tasks_async = [
single_compute_score(
compute_score,
executor,
task_id = i,
timeout=12.
)
for i in range(0, len(data_source))
]
results = await asyncio.gather(*tasks_async, return_exceptions=False)
except Exception as e:
print(f"[Exception] async gather failed: {e}")
raise
finally:
terminated_count = 0
for pid, proc in executor._processes.items():
try:
p = psutil.Process(pid)
p.terminate()
try:
p.wait(timeout=5)
except psutil.TimeoutExpired:
p.kill()
terminated_count += 1
except Exception:
pass
print(f"[Shutdown] {terminated_count} subprocess(es) terminated.")
print(f'======{results=}')
return results
start_at = time.time()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(
parallel_compute_score_async()
)
finally:
loop.close()
# asyncio.run(parallel_compute_score_async())
print(f'=====finish {time.time() - start_at}')
after run test.py, it seems hang.
seems add os.kill(pid, signal.SIGTERM) after p.kill() can solve
- enable launch_reward_fn_async and give compute_reward_async more cpus in
@ray.remote(num_cpus=1)
def compute_reward_async
- use an external sandbox: https://verl.readthedocs.io/en/latest/examples/sandbox_fusion_example.html