Camera rendering in MuJoCo - How to take a picture from a simulated camera and still keeping the GUI active
Hello there,
I'm a research assistant and I'm trying to take pictures of my MuJoCo simulation using a simulated camera.
Currently, I have my main rendering loop running:
main render loop
def run(self) -> None:
"""
Method for running the MuJoCo viewer main loop.
Returns:
- None.
"""
with mj.viewer.launch_passive(model=self._model, data=self._data, key_callback=self.keyboard_callback) as viewer:
# load initial state
load_mj_state(self._model, self._data, "home", self._lock)
mj.mj_step(self._model, self._data)
# Toggle site frame visualization.
if self._args.show_site_frames:
viewer.opt.frame = mj.mjtFrame.mjFRAME_SITE
while viewer.is_running():
step_start = time.time()
# Step the simulation.
mj.mj_step(self._model, self._data)
viewer.sync()
time_until_next_step = self.dt - (time.time() - step_start)
if time_until_next_step > 0:
time.sleep(time_until_next_step)
while I have my keyboard_callback function defined as
`keyboard_callback`
def keyboard_callback(self, key) -> None:
"""
Method for the keyboard callback. This method should not be called explicitly.
Parameters:
- key: Keyboard input.
Returns:
- None.
"""
if key == glfw.KEY_COMMA:
print(self.cam.image)
When I call the cam.image attribute, the following function is run
get image attribute
@property
def image(self) -> np.ndarray:
"""Return the captured RGB image."""
self._renderer.update_scene(self._data, camera=self.name)
self._img = self._renderer.render()
self._renderer.update_scene(self._data)
return self._img
The self._renderer is here a mj.Renderer object defined by self._renderer = mj.Renderer(self._model, self._height, self._width) where self._model is the model object loaded from the scene file.
After I call the image attribute, the GUI becomes unresponsive while the main rendering loop still seems to be running just fine. Ideally, I would want to control the GUI after i have rendered a picture from the camera.
All feedback and/or ideas for how I could progress/solve this problem of mine are very much appreciated!
Same issue.
Hi @qgallouedec
I believe I found a solution, so to help out I have written the following that you hopefully can use to solve your problem :)
- define the camera in the scene file
Add camera to scene
...
<camera
name="cam"
pos="1 1 1"
mode="targetbody"
target="<insert name of a body>"
/>
</worldbody>
</mujoco>
- Define camera class, here you can just name mine. In this class, I am using two utility functions: One for making a spatialmath SE3 homogeneous transformation matrices called
make_tfand one for writing point cloudspcwrite
Here is my camera class
class Camera:
def __init__(self, args, model, data, cam_name: str = "", save_dir="data/img/"):
"""Initialize Camera instance.
Args:
- args: Arguments containing camera width and height.
- model: Mujoco model.
- data: Mujoco data.
- cam_name: Name of the camera.
- save_dir: Directory to save captured images.
"""
self._args = args
self._cam_name = cam_name
self._model = model
self._data = data
self._save_dir = save_dir + self._cam_name + "/"
self._width = self._args.cam_width
self._height = self._args.cam_height
self._cam_id = self._data.cam(self._cam_name).id
self._renderer = mj.Renderer(self._model, self._height, self._width)
self._camera = mj.MjvCamera()
self._scene = mj.MjvScene(self._model, maxgeom=10_000)
self._image = np.zeros((self._height, self._width, 3), dtype=np.uint8)
self._depth_image = np.zeros((self._height, self._width, 1), dtype=np.float32)
self._seg_id_image = np.zeros((self._height, self._width, 3), dtype=np.float32)
self._point_cloud = np.zeros((self._height, self._width, 1), dtype=np.float32)
if not os.path.exists(self._save_dir):
os.makedirs(self._save_dir)
@property
def height(self) -> int:
"""
Get the height of the camera.
Returns:
int: The height of the camera.
"""
return self._height
@property
def width(self) -> int:
"""
Get the width of the camera.
Returns:
int: The width of the camera.
"""
return self._width
@property
def save_dir(self) -> str:
"""
Get the directory where images captured by the camera are saved.
Returns:
str: The directory where images captured by the camera are saved.
"""
return self._save_dir
@property
def name(self) -> str:
"""
Get the name of the camera.
Returns:
str: The name of the camera.s
"""
return self._cam_name
@property
def K(self) -> np.ndarray:
"""
Compute the intrinsic camera matrix (K) based on the camera's field of view (fov),
width (_width), and height (_height) parameters, following the pinhole camera model.
Returns:
np.ndarray: The intrinsic camera matrix (K), a 3x3 array representing the camera's intrinsic parameters.
"""
# Convert the field of view from degrees to radians
theta = np.deg2rad(self.fov)
# Focal length calculation (f in terms of sensor width and height)
f_x = (self._width / 2) / np.tan(theta / 2)
f_y = (self._height / 2) / np.tan(theta / 2)
# Pixel resolution (assumed to be focal length per pixel unit)
alpha_u = f_x # focal length in terms of pixel width
alpha_v = f_y # focal length in terms of pixel height
# Optical center offsets (assuming they are at the center of the sensor)
u_0 = (self._width - 1) / 2.0
v_0 = (self._height - 1) / 2.0
# Intrinsic camera matrix K
K = np.array([[alpha_u, 0, u_0], [0, alpha_v, v_0], [0, 0, 1]])
return K
@property
def T_world_cam(self) -> np.ndarray:
"""
Compute the homogeneous transformation matrix for the camera.
The transformation matrix is computed from the camera's position and orientation.
The position and orientation are retrieved from the camera data.
Returns:
np.ndarray: The 4x4 homogeneous transformation matrix representing the camera's pose.
"""
pos = self._data.cam(self._cam_id).xpos
rot = self._data.cam(self._cam_id).xmat.reshape(3, 3).T
T = make_tf(pos=pos, ori=rot).A
return T
@property
def P(self) -> np.ndarray:
"""
Compute the projection matrix for the camera.
The projection matrix is computed as the product of the camera's intrinsic matrix (K)
and the homogeneous transformation matrix (T_world_cam).
Returns:
np.ndarray: The 3x4 projection matrix.
"""
return self.K @ self.T_world_cam
@property
def image(self) -> np.ndarray:
"""Return the captured RGB image."""
self._renderer.update_scene(self._data, camera=self.name)
self._image = self._renderer.render()
return self._image
@property
def depth_image(self) -> np.ndarray:
"""Return the captured depth image."""
self._renderer.update_scene(self._data, camera=self.name)
self._renderer.enable_depth_rendering()
self._depth_image = self._renderer.render()
self._renderer.disable_depth_rendering()
return self._depth_image
@property
def seg_image(self) -> np.ndarray:
"""Return the captured segmentation image based on object's id."""
self._renderer.update_scene(self._data, camera=self.name)
self._renderer.enable_segmentation_rendering()
self._seg_id_image = self._renderer.render()[:, :, 0].reshape(
(self.height, self.width)
)
self._renderer.disable_segmentation_rendering()
return self._seg_id_image
@property
def point_cloud(self) -> np.ndarray:
"""Return the captured point cloud."""
self._point_cloud = self._depth_to_point_cloud(self.depth_image)
return self._point_cloud
@property
def fov(self) -> float:
"""Get the field of view (FOV) of the camera.
Returns:
- float: The field of view angle in degrees.
"""
return self._model.cam(self._cam_id).fovy[0]
@property
def id(self) -> int:
"""Get the identifier of the camera.
Returns:
- int: The identifier of the camera.
"""
return self._cam_id
def _depth_to_point_cloud(self, depth_image: np.ndarray) -> np.ndarray:
"""
Method to convert depth image to a point cloud in camera coordinates.
Args:
- depth_image: The depth image we want to convert to a point cloud.
Returns:
- np.ndarray: 3D points in camera coordinates.
"""
# Get image dimensions
dimg_shape = depth_image.shape
height = dimg_shape[0]
width = dimg_shape[1]
# Create pixel grid
y, x = np.meshgrid(np.arange(height), np.arange(width), indexing="ij")
# Flatten arrays for vectorized computation
x_flat = x.flatten()
y_flat = y.flatten()
depth_flat = depth_image.flatten()
# Negate depth values because z-axis goes into the camera
depth_flat = -depth_flat
# Stack flattened arrays to form homogeneous coordinates
homogeneous_coords = np.vstack((x_flat, y_flat, np.ones_like(x_flat)))
# Compute inverse of the intrinsic matrix K
K_inv = np.linalg.inv(self.K)
# Calculate 3D points in camera coordinates
points_camera = np.dot(K_inv, homogeneous_coords) * depth_flat
# Homogeneous coordinates to 3D points
points_camera = np.vstack((points_camera, np.ones_like(x_flat)))
points_camera = points_camera.T
# dehomogenize
points_camera = points_camera[:, :3] / points_camera[:, 3][:, np.newaxis]
return points_camera
def shoot(self, autosave: bool = True) -> None:
"""
Captures a new rgb image, depth image and point cloud from the camera.
Args:
- autosave: If the camera rgb image, depth image and point cloud should be saved.
Returns:
- None.
"""
self._image = self.image
self._depth_image = self.depth_image
self._point_cloud = self.point_cloud
self._seg_image = self.seg_image
if autosave:
self.save()
def save(self, img_name: str = "") -> None:
"""Saves the captured image and depth information.
Args:
- img_name: Name for the saved image file.
"""
print(f"saving rgb image, depth image and point cloud to {self.save_dir}")
if img_name == "":
timestamp = datetime.now()
cv2.imwrite(
self._save_dir + f"{timestamp}_rgb.png",
cv2.cvtColor(self.image, cv2.COLOR_RGB2BGR),
)
cv2.imwrite(self._save_dir + f"{timestamp}_seg.png", self.seg_image)
np.save(self._save_dir + f"{timestamp}_depth.npy", self.depth_image)
pcwrite(self._save_dir + f"{timestamp}.pcd", self.point_cloud)
else:
cv2.imwrite(
self._save_dir + f"{img_name}_rgb.png",
cv2.cvtColor(self.image, cv2.COLOR_RGB2BGR),
)
cv2.imwrite(self._save_dir + f"{img_name}_seg.png", self.seg_image)
np.save(self._save_dir + f"{img_name}_depth.npy", self.depth_image)
pcwrite(self._save_dir + f"{img_name}.pcd", self.point_cloud)
Here are my utility functions
from typing import List, Union
import numpy as np
import pandas as pd
import spatialmath as sm
import spatialmath.base as smb
def make_tf(
pos: Union[np.ndarray, list] = [0, 0, 0],
ori: Union[np.ndarray, sm.SE3, sm.SO3] = [1, 0, 0, 0],
) -> sm.SE3:
"""
Create a SE3 transformation matrix.
Parameters:
- pos (np.ndarray): Translation vector [x, y, z].
- ori (Union[np.ndarray, SE3]): Orientation, can be a rotation matrix, quaternion, or SE3 object.
Returns:
- SE3: Transformation matrix.
"""
if isinstance(ori, list):
ori = np.array(ori)
if isinstance(ori, sm.SO3):
ori = ori.R
if isinstance(pos, sm.SE3):
pose = pos
pos = pose.t
ori = pose.R
if len(ori) == 9:
ori = np.reshape(ori, (3, 3))
# Convert ori to SE3 if it's already a rotation matrix or a quaternion
if isinstance(ori, np.ndarray):
if ori.shape == (3, 3): # Assuming ori is a rotation matrix
ori = ori
elif ori.shape == (4,): # Assuming ori is a quaternion
ori = sm.UnitQuaternion(s=ori[0], v=ori[1:]).R
elif ori.shape == (3,): # Assuming ori is rpy
ori = sm.SE3.Eul(ori, unit="rad").R
T_R = smb.r2t(ori) if is_R_valid(ori) else smb.r2t(make_R_valid(ori))
R = sm.SE3(T_R, check=False).R
# Combine translation and orientation
T = sm.SE3.Rt(R=R, t=pos, check=False)
return T
def is_R_valid(R: np.ndarray, tol: float = 1e-8) -> bool:
"""
Checks if the input matrix R is a valid 3x3 rotation matrix.
Parameters:
R (np.ndarray): The input matrix to be checked.
tol (float, optional): Tolerance for numerical comparison. Defaults to 1e-8.
Returns:
bool: True if R is a valid rotation matrix, False otherwise.
Raises:
ValueError: If R is not a 3x3 matrix.
"""
# Check if R is a 3x3 matrix
if not isinstance(R, np.ndarray) or R.shape != (3, 3):
raise ValueError(f"Input is not a 3x3 matrix. R is \n{R}")
# Check if R is orthogonal
is_orthogonal = np.allclose(np.dot(R.T, R), np.eye(3), atol=tol)
# Check if the determinant is 1
det = np.linalg.det(R)
return is_orthogonal and np.isclose(det, 1.0, atol=tol)
def make_R_valid(R: np.ndarray, tol: float = 1e-6) -> np.ndarray:
"""
Makes the input matrix R a valid 3x3 rotation matrix.
Parameters:
R (np.ndarray): The input matrix to be made valid.
tol (float, optional): Tolerance for numerical comparison. Defaults to 1e-6.
Returns:
np.ndarray: A valid 3x3 rotation matrix derived from the input matrix R.
Raises:
ValueError: If the input is not a 3x3 matrix or if it cannot be made a valid rotation matrix.
"""
if not isinstance(R, np.ndarray):
R = np.array(R)
# Check if R is a 3x3 matrix
if R.shape != (3, 3):
raise ValueError("Input is not a 3x3 matrix")
# Step 1: Gram-Schmidt Orthogonalization
Q, _ = np.linalg.qr(R)
# Step 2: Ensure determinant is 1
det = np.linalg.det(Q)
if np.isclose(det, 0.0, atol=tol):
raise ValueError("Invalid rotation matrix (determinant is zero)")
# Step 3: Ensure determinant is positive
if det < 0:
Q[:, 2] *= -1
return Q
- Finally what you need is to call the renderer from the main loop. Unfortunately, this means you cannot call it from your keyboard callback. Instead what I do is to append the event to a queue which is then popped whenever the main loop ticks again. I tend to package my simulations into classes so here is a rough sketch of what code you might need.
Here is my run
import queue
import mujoco as mj
import mujoco.viewer
class MySimulation:
...
self.cam = Camera(self._args, self._model, self._data, "cam")
...
def keyboard_callback(self, key) -> None:
"""
Method for the keyboard callback. This method should not be called explicitly.
Parameters:
- key: Keyboard input.
Returns:
- None.
"""
if key == glfw.KEY_SPACE:
pass
elif key == glfw.KEY_PERIOD:
print("Shoot! I just took a picture")
self.cam.shoot()
def run(self) -> None:
"""
Method for running the MuJoCo viewer main loop.
Returns:
- None.
"""
# in order to enable camera rendering in main thread, queue the key events
key_queue = queue.Queue()
with mj.viewer.launch_passive(
model=self._model,
data=self._data,
key_callback=lambda key: key_queue.put(key),
) as viewer:
self.cam._renderer.render()
while viewer.is_running():
step_start = time.time()
while not key_queue.empty():
self.keyboard_callback(key_queue.get())
# step the simulation.
mj.mj_step(self._model, self._data)
viewer.sync()
time_until_next_step = self.dt - (time.time() - step_start)
if time_until_next_step > 0:
time.sleep(time_until_next_step)
I hope this can assist you in solving your problem :)
Thank you!
Hey @vmstavens where do you take pcwrite function? I do not see it in your utils
Oh yeah, my bad, thanks for pointing that out @Fima2003! I hope this helps
pcwrite
``` pythonfrom pathlib import Path
from typing import Union
import numpy as np
import open3d as o3d
def pcwrite(
filename: str, point_cloud: Union[np.ndarray, list, o3d.geometry.PointCloud]
) -> None:
"""
Write a point cloud to a PCD file.
Parameters
----------
filename : str
File path where the PCD file will be saved.
point_cloud : Union[np.ndarray, list, o3d.geometry.PointCloud]
The point cloud data to write. It can be:
- A numpy array representing the point cloud (nx3).
- A list of points where each point is a list or tuple of 3 coordinates.
- An Open3D point cloud object.
Returns
----------
None
Raises
----------
ValueError
If the provided point cloud type is unsupported or the file cannot be written.
"""
try:
if isinstance(point_cloud, np.ndarray):
# Convert numpy array to Open3D point cloud
pcd = o3d.geometry.PointCloud()
pcd.points = o3d.utility.Vector3dVector(point_cloud)
elif isinstance(point_cloud, list):
# Convert list to Open3D point cloud
pcd = o3d.geometry.PointCloud()
pcd.points = o3d.utility.Vector3dVector(np.array(point_cloud))
elif isinstance(point_cloud, o3d.geometry.PointCloud):
# Point cloud is already an Open3D point cloud object
pcd = point_cloud
else:
raise ValueError("Unsupported point cloud type")
path = Path(filename)
# Create all parent directories if they don't exist
path.parent.mkdir(parents=True, exist_ok=True)
# Check if the file exists
if path.exists():
print(f"Open3D: Overwriting {filename}.")
else:
print(f"Open3D: Creating {filename}.")
# Write the point cloud to a PCD file
o3d.io.write_point_cloud(filename, pcd)
except Exception as e:
raise ValueError(f"Error writing point cloud to '{filename}': {e}")
```