OmniGibson icon indicating copy to clipboard operation
OmniGibson copied to clipboard

Add basic Ray implementation

Open mjlbach opened this issue 1 year ago • 0 comments

Ray is a popular library for reinforcement learning and distributed computing.

One complication with using ray is that by default, it sets CUDA_VISIBLE_DEVICES if you allocate a process with a GPU. Unfortunately, there are two issues preventing use with omnigibson.

  1. Importing OG creates a SimulationApp context which allocates the omnigibson process to the first GPU available
  2. Ray sets CUDA_VISIBLE_DEVICES, which crashes omniverse regardless of which active_gpu and physics_gpu is set in the constructor.

The solution to running ray/omnigibson is below, but requires some changes to how OG is initialized.

import ray
import os
import time
import json

import numpy as np

@ray.remote
def collect_episode_batch():
    print(f"Beginning collect episode batch", flush=True)
    import omnigibson as og

    #Work around an omni bug, manually set GPU but don't set CUDA_VISIBLE_DEVICES (which is set by ray)
    active_gpu = int(os.environ["CUDA_VISIBLE_DEVICES"])
    del os.environ["CUDA_VISIBLE_DEVICES"]
    print(f"Initializing OG on {active_gpu}", flush=True)
    og.initialize({"active_gpu": active_gpu})

    cfg = dict()

    # Define scene
    cfg["scene"] = {
        "type": "InteractiveTraversableScene",
        "scene_model": "Rs_int",
    }
    # Define robots
    cfg["robots"] = [
        {
            "type": "Tiago",
            "name": "skynet_robot",
            "obs_modalities": ["rgb", "depth"],
        },
    ]

    # Define task
    cfg["task"] = {
        "type": "DummyTask",
        "termination_config": dict(),
        "reward_config": dict(),
    }

    # Create the environment
    env = og.Environment(cfg)
    print("Environment initialized", flush=True)

    # Step!
    times = []
    for idx in range(1000):
        start_step = time.time()
        env.step(env.action_space.sample())
        step_time = time.time() - start_step
        times.append(step_time)
        print(idx, flush=True)

    return times

def benchmark(config):
    processes = config["gpus"] * config["envs_per_gpu"]

    benchmark_times = []
    for idx in range(processes):
        print(f"Initializing process for ray worker {idx} out of {processes}", flush=True)
        benchmark_times.append(collect_episode_batch.options(num_gpus=1/config["envs_per_gpu"]).remote())

    mean_times = []
    total_times = []
    for benchmark in benchmark_times:
        times = ray.get(benchmark)
        mean_time = np.mean(times)
        total_time = np.sum(times)
        mean_times.append(mean_time)
        total_times.append(total_time)


    return mean_times, total_times

if __name__ == "__main__":
    configurations = [
        # {
        #     "gpus": 1,
        #     "envs_per_gpu": 1
        # },
        # {
        #     "gpus": 1,
        #     "envs_per_gpu": 3
        # },
        {
            "gpus": 3,
            "envs_per_gpu": 1
        },
        {
            "gpus": 3,
            "envs_per_gpu": 3
        }
        
    ]

    result = {}

    print("Starting ray cluster")
    ray.init(dashboard_host="0.0.0.0")
    print("Ray cluster initialized")
    for idx, config in enumerate(configurations):
        print(f"Starting benchmark for config {config}", flush=True)
        mean_times, total_times = benchmark(config)
        result[idx] = {
            "gpus": config["gpus"],
            "envs_per_gpus": config["envs_per_gpu"],
            "mean_step_times": mean_times,
            "total_step_times": total_times,
            "mean_step_time": np.mean(mean_times),
            "total_step_time": np.mean(total_times),
        }
        print(f"Ending benchmark for config {config}", flush=True)

        with open("/benchmark/benchmark_result.json", "w") as f:
            json.dump(result, f)

mjlbach avatar Mar 08 '23 00:03 mjlbach