OmniGibson
OmniGibson copied to clipboard
Add basic Ray implementation
Ray is a popular library for reinforcement learning and distributed computing.
One complication with using ray is that by default, it sets CUDA_VISIBLE_DEVICES if you allocate a process with a GPU. Unfortunately, there are two issues preventing use with omnigibson.
- Importing OG creates a SimulationApp context which allocates the omnigibson process to the first GPU available
- Ray sets CUDA_VISIBLE_DEVICES, which crashes omniverse regardless of which
active_gpu
andphysics_gpu
is set in the constructor.
The solution to running ray/omnigibson is below, but requires some changes to how OG is initialized.
import ray
import os
import time
import json
import numpy as np
@ray.remote
def collect_episode_batch():
print(f"Beginning collect episode batch", flush=True)
import omnigibson as og
#Work around an omni bug, manually set GPU but don't set CUDA_VISIBLE_DEVICES (which is set by ray)
active_gpu = int(os.environ["CUDA_VISIBLE_DEVICES"])
del os.environ["CUDA_VISIBLE_DEVICES"]
print(f"Initializing OG on {active_gpu}", flush=True)
og.initialize({"active_gpu": active_gpu})
cfg = dict()
# Define scene
cfg["scene"] = {
"type": "InteractiveTraversableScene",
"scene_model": "Rs_int",
}
# Define robots
cfg["robots"] = [
{
"type": "Tiago",
"name": "skynet_robot",
"obs_modalities": ["rgb", "depth"],
},
]
# Define task
cfg["task"] = {
"type": "DummyTask",
"termination_config": dict(),
"reward_config": dict(),
}
# Create the environment
env = og.Environment(cfg)
print("Environment initialized", flush=True)
# Step!
times = []
for idx in range(1000):
start_step = time.time()
env.step(env.action_space.sample())
step_time = time.time() - start_step
times.append(step_time)
print(idx, flush=True)
return times
def benchmark(config):
processes = config["gpus"] * config["envs_per_gpu"]
benchmark_times = []
for idx in range(processes):
print(f"Initializing process for ray worker {idx} out of {processes}", flush=True)
benchmark_times.append(collect_episode_batch.options(num_gpus=1/config["envs_per_gpu"]).remote())
mean_times = []
total_times = []
for benchmark in benchmark_times:
times = ray.get(benchmark)
mean_time = np.mean(times)
total_time = np.sum(times)
mean_times.append(mean_time)
total_times.append(total_time)
return mean_times, total_times
if __name__ == "__main__":
configurations = [
# {
# "gpus": 1,
# "envs_per_gpu": 1
# },
# {
# "gpus": 1,
# "envs_per_gpu": 3
# },
{
"gpus": 3,
"envs_per_gpu": 1
},
{
"gpus": 3,
"envs_per_gpu": 3
}
]
result = {}
print("Starting ray cluster")
ray.init(dashboard_host="0.0.0.0")
print("Ray cluster initialized")
for idx, config in enumerate(configurations):
print(f"Starting benchmark for config {config}", flush=True)
mean_times, total_times = benchmark(config)
result[idx] = {
"gpus": config["gpus"],
"envs_per_gpus": config["envs_per_gpu"],
"mean_step_times": mean_times,
"total_step_times": total_times,
"mean_step_time": np.mean(mean_times),
"total_step_time": np.mean(total_times),
}
print(f"Ending benchmark for config {config}", flush=True)
with open("/benchmark/benchmark_result.json", "w") as f:
json.dump(result, f)