mujoco icon indicating copy to clipboard operation
mujoco copied to clipboard

Camera rendering in MuJoCo - How to take a picture from a simulated camera and still keeping the GUI active

Open vmstavens opened this issue 1 year ago • 3 comments

Hello there,

I'm a research assistant and I'm trying to take pictures of my MuJoCo simulation using a simulated camera.

Currently, I have my main rendering loop running:

main render loop

def run(self) -> None:
  """
  Method for running the MuJoCo viewer main loop.
  
  Returns:
  - None.
  """
  with mj.viewer.launch_passive(model=self._model, data=self._data, key_callback=self.keyboard_callback) as viewer:
  
	  # load initial state
	  load_mj_state(self._model, self._data, "home", self._lock)
	  mj.mj_step(self._model, self._data)
  
	  # Toggle site frame visualization.
	  if self._args.show_site_frames:
		  viewer.opt.frame = mj.mjtFrame.mjFRAME_SITE
  
	  while viewer.is_running():
		  step_start = time.time()
  
		  # Step the simulation.
		  mj.mj_step(self._model, self._data)
  
		  viewer.sync()
		  time_until_next_step = self.dt - (time.time() - step_start)
		  if time_until_next_step > 0:
			  time.sleep(time_until_next_step)

while I have my keyboard_callback function defined as

`keyboard_callback`

def keyboard_callback(self, key) -> None:
  """
  Method for the keyboard callback. This method should not be called explicitly.
  
  Parameters:
  - key: Keyboard input.
  
  Returns:
  - None.
  """
  if key == glfw.KEY_COMMA:
	  print(self.cam.image)

When I call the cam.image attribute, the following function is run

get image attribute
@property
def image(self) -> np.ndarray:
  """Return the captured RGB image."""
  self._renderer.update_scene(self._data, camera=self.name)
  self._img = self._renderer.render()
  self._renderer.update_scene(self._data)
  return self._img

The self._renderer is here a mj.Renderer object defined by self._renderer = mj.Renderer(self._model, self._height, self._width) where self._model is the model object loaded from the scene file.

After I call the image attribute, the GUI becomes unresponsive while the main rendering loop still seems to be running just fine. Ideally, I would want to control the GUI after i have rendered a picture from the camera.

All feedback and/or ideas for how I could progress/solve this problem of mine are very much appreciated!

vmstavens avatar May 16 '24 16:05 vmstavens

Same issue.

qgallouedec avatar Jun 08 '24 11:06 qgallouedec

Hi @qgallouedec

I believe I found a solution, so to help out I have written the following that you hopefully can use to solve your problem :)

  1. define the camera in the scene file
Add camera to scene
	...
		<camera
		  	name="cam"
		  	pos="1 1 1"
			mode="targetbody"
			target="<insert name of a body>"
		/>
	</worldbody>
</mujoco>
  1. Define camera class, here you can just name mine. In this class, I am using two utility functions: One for making a spatialmath SE3 homogeneous transformation matrices called make_tf and one for writing point clouds pcwrite
Here is my camera class
class Camera:
    def __init__(self, args, model, data, cam_name: str = "", save_dir="data/img/"):
        """Initialize Camera instance.

        Args:
        - args: Arguments containing camera width and height.
        - model: Mujoco model.
        - data: Mujoco data.
        - cam_name: Name of the camera.
        - save_dir: Directory to save captured images.
        """
        self._args = args
        self._cam_name = cam_name
        self._model = model
        self._data = data
        self._save_dir = save_dir + self._cam_name + "/"

        self._width = self._args.cam_width
        self._height = self._args.cam_height
        self._cam_id = self._data.cam(self._cam_name).id

        self._renderer = mj.Renderer(self._model, self._height, self._width)
        self._camera = mj.MjvCamera()
        self._scene = mj.MjvScene(self._model, maxgeom=10_000)

        self._image = np.zeros((self._height, self._width, 3), dtype=np.uint8)
        self._depth_image = np.zeros((self._height, self._width, 1), dtype=np.float32)
        self._seg_id_image = np.zeros((self._height, self._width, 3), dtype=np.float32)
        self._point_cloud = np.zeros((self._height, self._width, 1), dtype=np.float32)

        if not os.path.exists(self._save_dir):
            os.makedirs(self._save_dir)

    @property
    def height(self) -> int:
        """
        Get the height of the camera.

        Returns:
                int: The height of the camera.
        """
        return self._height

    @property
    def width(self) -> int:
        """
        Get the width of the camera.

        Returns:
                int: The width of the camera.
        """
        return self._width

    @property
    def save_dir(self) -> str:
        """
        Get the directory where images captured by the camera are saved.

        Returns:
                str: The directory where images captured by the camera are saved.
        """
        return self._save_dir

    @property
    def name(self) -> str:
        """
        Get the name of the camera.

        Returns:
                str: The name of the camera.s
        """
        return self._cam_name

    @property
    def K(self) -> np.ndarray:
        """
        Compute the intrinsic camera matrix (K) based on the camera's field of view (fov),
        width (_width), and height (_height) parameters, following the pinhole camera model.

        Returns:
        np.ndarray: The intrinsic camera matrix (K), a 3x3 array representing the camera's intrinsic parameters.
        """
        # Convert the field of view from degrees to radians
        theta = np.deg2rad(self.fov)

        # Focal length calculation (f in terms of sensor width and height)
        f_x = (self._width / 2) / np.tan(theta / 2)
        f_y = (self._height / 2) / np.tan(theta / 2)

        # Pixel resolution (assumed to be focal length per pixel unit)
        alpha_u = f_x  # focal length in terms of pixel width
        alpha_v = f_y  # focal length in terms of pixel height

        # Optical center offsets (assuming they are at the center of the sensor)
        u_0 = (self._width - 1) / 2.0
        v_0 = (self._height - 1) / 2.0

        # Intrinsic camera matrix K
        K = np.array([[alpha_u, 0, u_0], [0, alpha_v, v_0], [0, 0, 1]])

        return K

    @property
    def T_world_cam(self) -> np.ndarray:
        """
        Compute the homogeneous transformation matrix for the camera.

        The transformation matrix is computed from the camera's position and orientation.
        The position and orientation are retrieved from the camera data.

        Returns:
        np.ndarray: The 4x4 homogeneous transformation matrix representing the camera's pose.
        """
        pos = self._data.cam(self._cam_id).xpos
        rot = self._data.cam(self._cam_id).xmat.reshape(3, 3).T
        T = make_tf(pos=pos, ori=rot).A
        return T

    @property
    def P(self) -> np.ndarray:
        """
        Compute the projection matrix for the camera.

        The projection matrix is computed as the product of the camera's intrinsic matrix (K)
        and the homogeneous transformation matrix (T_world_cam).

        Returns:
        np.ndarray: The 3x4 projection matrix.
        """
        return self.K @ self.T_world_cam

    @property
    def image(self) -> np.ndarray:
        """Return the captured RGB image."""
        self._renderer.update_scene(self._data, camera=self.name)
        self._image = self._renderer.render()
        return self._image

    @property
    def depth_image(self) -> np.ndarray:
        """Return the captured depth image."""
        self._renderer.update_scene(self._data, camera=self.name)
        self._renderer.enable_depth_rendering()
        self._depth_image = self._renderer.render()
        self._renderer.disable_depth_rendering()
        return self._depth_image

    @property
    def seg_image(self) -> np.ndarray:
        """Return the captured segmentation image based on object's id."""
        self._renderer.update_scene(self._data, camera=self.name)
        self._renderer.enable_segmentation_rendering()

        self._seg_id_image = self._renderer.render()[:, :, 0].reshape(
            (self.height, self.width)
        )
        self._renderer.disable_segmentation_rendering()
        return self._seg_id_image

    @property
    def point_cloud(self) -> np.ndarray:
        """Return the captured point cloud."""
        self._point_cloud = self._depth_to_point_cloud(self.depth_image)
        return self._point_cloud

    @property
    def fov(self) -> float:
        """Get the field of view (FOV) of the camera.

        Returns:
        - float: The field of view angle in degrees.
        """
        return self._model.cam(self._cam_id).fovy[0]

    @property
    def id(self) -> int:
        """Get the identifier of the camera.

        Returns:
        - int: The identifier of the camera.
        """
        return self._cam_id

    def _depth_to_point_cloud(self, depth_image: np.ndarray) -> np.ndarray:
        """
        Method to convert depth image to a point cloud in camera coordinates.

        Args:
        - depth_image: The depth image we want to convert to a point cloud.

        Returns:
        - np.ndarray: 3D points in camera coordinates.
        """
        # Get image dimensions
        dimg_shape = depth_image.shape
        height = dimg_shape[0]
        width = dimg_shape[1]

        # Create pixel grid
        y, x = np.meshgrid(np.arange(height), np.arange(width), indexing="ij")

        # Flatten arrays for vectorized computation
        x_flat = x.flatten()
        y_flat = y.flatten()
        depth_flat = depth_image.flatten()

        # Negate depth values because z-axis goes into the camera
        depth_flat = -depth_flat

        # Stack flattened arrays to form homogeneous coordinates
        homogeneous_coords = np.vstack((x_flat, y_flat, np.ones_like(x_flat)))

        # Compute inverse of the intrinsic matrix K
        K_inv = np.linalg.inv(self.K)

        # Calculate 3D points in camera coordinates
        points_camera = np.dot(K_inv, homogeneous_coords) * depth_flat

        # Homogeneous coordinates to 3D points
        points_camera = np.vstack((points_camera, np.ones_like(x_flat)))

        points_camera = points_camera.T

        # dehomogenize
        points_camera = points_camera[:, :3] / points_camera[:, 3][:, np.newaxis]

        return points_camera

    def shoot(self, autosave: bool = True) -> None:
        """
        Captures a new rgb image, depth image and point cloud from the camera.
        Args:
        - autosave: If the camera rgb image, depth image and point cloud should be saved.

        Returns:
        - None.
        """
        self._image = self.image
        self._depth_image = self.depth_image
        self._point_cloud = self.point_cloud
        self._seg_image = self.seg_image
        if autosave:
            self.save()

    def save(self, img_name: str = "") -> None:
        """Saves the captured image and depth information.

        Args:
        - img_name: Name for the saved image file.
        """
        print(f"saving rgb image, depth image and point cloud to {self.save_dir}")

        if img_name == "":
            timestamp = datetime.now()
            cv2.imwrite(
                self._save_dir + f"{timestamp}_rgb.png",
                cv2.cvtColor(self.image, cv2.COLOR_RGB2BGR),
            )
            cv2.imwrite(self._save_dir + f"{timestamp}_seg.png", self.seg_image)
            np.save(self._save_dir + f"{timestamp}_depth.npy", self.depth_image)
            pcwrite(self._save_dir + f"{timestamp}.pcd", self.point_cloud)

        else:
            cv2.imwrite(
                self._save_dir + f"{img_name}_rgb.png",
                cv2.cvtColor(self.image, cv2.COLOR_RGB2BGR),
            )
            cv2.imwrite(self._save_dir + f"{img_name}_seg.png", self.seg_image)
            np.save(self._save_dir + f"{img_name}_depth.npy", self.depth_image)
            pcwrite(self._save_dir + f"{img_name}.pcd", self.point_cloud)
Here are my utility functions
from typing import List, Union

import numpy as np
import pandas as pd
import spatialmath as sm
import spatialmath.base as smb


def make_tf(
    pos: Union[np.ndarray, list] = [0, 0, 0],
    ori: Union[np.ndarray, sm.SE3, sm.SO3] = [1, 0, 0, 0],
) -> sm.SE3:
    """
    Create a SE3 transformation matrix.

    Parameters:
    - pos (np.ndarray): Translation vector [x, y, z].
    - ori (Union[np.ndarray, SE3]): Orientation, can be a rotation matrix, quaternion, or SE3 object.

    Returns:
    - SE3: Transformation matrix.
    """

    if isinstance(ori, list):
        ori = np.array(ori)

    if isinstance(ori, sm.SO3):
        ori = ori.R

    if isinstance(pos, sm.SE3):
        pose = pos
        pos = pose.t
        ori = pose.R

    if len(ori) == 9:
        ori = np.reshape(ori, (3, 3))

    # Convert ori to SE3 if it's already a rotation matrix or a quaternion
    if isinstance(ori, np.ndarray):
        if ori.shape == (3, 3):  # Assuming ori is a rotation matrix
            ori = ori
        elif ori.shape == (4,):  # Assuming ori is a quaternion
            ori = sm.UnitQuaternion(s=ori[0], v=ori[1:]).R
        elif ori.shape == (3,):  # Assuming ori is rpy
            ori = sm.SE3.Eul(ori, unit="rad").R

    T_R = smb.r2t(ori) if is_R_valid(ori) else smb.r2t(make_R_valid(ori))
    R = sm.SE3(T_R, check=False).R

    # Combine translation and orientation
    T = sm.SE3.Rt(R=R, t=pos, check=False)

    return T


def is_R_valid(R: np.ndarray, tol: float = 1e-8) -> bool:
    """
    Checks if the input matrix R is a valid 3x3 rotation matrix.

    Parameters:
            R (np.ndarray): The input matrix to be checked.
            tol (float, optional): Tolerance for numerical comparison. Defaults to 1e-8.

    Returns:
            bool: True if R is a valid rotation matrix, False otherwise.

    Raises:
            ValueError: If R is not a 3x3 matrix.
    """
    # Check if R is a 3x3 matrix
    if not isinstance(R, np.ndarray) or R.shape != (3, 3):
        raise ValueError(f"Input is not a 3x3 matrix. R is \n{R}")

    # Check if R is orthogonal
    is_orthogonal = np.allclose(np.dot(R.T, R), np.eye(3), atol=tol)

    # Check if the determinant is 1
    det = np.linalg.det(R)

    return is_orthogonal and np.isclose(det, 1.0, atol=tol)

def make_R_valid(R: np.ndarray, tol: float = 1e-6) -> np.ndarray:
    """
    Makes the input matrix R a valid 3x3 rotation matrix.

    Parameters:
            R (np.ndarray): The input matrix to be made valid.
            tol (float, optional): Tolerance for numerical comparison. Defaults to 1e-6.

    Returns:
            np.ndarray: A valid 3x3 rotation matrix derived from the input matrix R.

    Raises:
            ValueError: If the input is not a 3x3 matrix or if it cannot be made a valid rotation matrix.
    """
    if not isinstance(R, np.ndarray):
        R = np.array(R)

    # Check if R is a 3x3 matrix
    if R.shape != (3, 3):
        raise ValueError("Input is not a 3x3 matrix")

    # Step 1: Gram-Schmidt Orthogonalization
    Q, _ = np.linalg.qr(R)

    # Step 2: Ensure determinant is 1
    det = np.linalg.det(Q)
    if np.isclose(det, 0.0, atol=tol):
        raise ValueError("Invalid rotation matrix (determinant is zero)")

    # Step 3: Ensure determinant is positive
    if det < 0:
        Q[:, 2] *= -1

    return Q

  1. Finally what you need is to call the renderer from the main loop. Unfortunately, this means you cannot call it from your keyboard callback. Instead what I do is to append the event to a queue which is then popped whenever the main loop ticks again. I tend to package my simulations into classes so here is a rough sketch of what code you might need.
Here is my run

import queue
import mujoco as mj
import mujoco.viewer

class MySimulation:

	...

	self.cam = Camera(self._args, self._model, self._data, "cam")
	...

	def keyboard_callback(self, key) -> None:
		"""
		Method for the keyboard callback. This method should not be called explicitly.

		Parameters:
		- key: Keyboard input.

		Returns:
		- None.
		"""
		if key == glfw.KEY_SPACE:
			pass
		elif key == glfw.KEY_PERIOD:
			print("Shoot! I just took a picture")
			self.cam.shoot()

	def run(self) -> None:
		"""
		Method for running the MuJoCo viewer main loop.

		Returns:
		- None.
		"""
		# in order to enable camera rendering in main thread, queue the key events
		key_queue = queue.Queue()

		with mj.viewer.launch_passive(
			model=self._model,
			data=self._data,
			key_callback=lambda key: key_queue.put(key),
		) as viewer:
			self.cam._renderer.render()

			while viewer.is_running():
				step_start = time.time()

				while not key_queue.empty():
					self.keyboard_callback(key_queue.get())

				# step the simulation.
				mj.mj_step(self._model, self._data)
				viewer.sync()

				time_until_next_step = self.dt - (time.time() - step_start)
				if time_until_next_step > 0:
					time.sleep(time_until_next_step)

I hope this can assist you in solving your problem :)

vmstavens avatar Jun 15 '24 08:06 vmstavens

Thank you!

qgallouedec avatar Jun 15 '24 09:06 qgallouedec

Hey @vmstavens where do you take pcwrite function? I do not see it in your utils

Fima2003 avatar Jan 06 '25 10:01 Fima2003

Oh yeah, my bad, thanks for pointing that out @Fima2003! I hope this helps

pcwrite ``` python
from pathlib import Path
from typing import Union

import numpy as np
import open3d as o3d

def pcwrite(
    filename: str, point_cloud: Union[np.ndarray, list, o3d.geometry.PointCloud]
) -> None:
    """
    Write a point cloud to a PCD file.

    Parameters
    ----------
    filename : str
        File path where the PCD file will be saved.
    point_cloud : Union[np.ndarray, list, o3d.geometry.PointCloud]
        The point cloud data to write. It can be:
        - A numpy array representing the point cloud (nx3).
        - A list of points where each point is a list or tuple of 3 coordinates.
        - An Open3D point cloud object.

    Returns
    ----------
    None

    Raises
    ----------
    ValueError
        If the provided point cloud type is unsupported or the file cannot be written.
    """
    try:
        if isinstance(point_cloud, np.ndarray):
            # Convert numpy array to Open3D point cloud
            pcd = o3d.geometry.PointCloud()
            pcd.points = o3d.utility.Vector3dVector(point_cloud)
        elif isinstance(point_cloud, list):
            # Convert list to Open3D point cloud
            pcd = o3d.geometry.PointCloud()
            pcd.points = o3d.utility.Vector3dVector(np.array(point_cloud))
        elif isinstance(point_cloud, o3d.geometry.PointCloud):
            # Point cloud is already an Open3D point cloud object
            pcd = point_cloud
        else:
            raise ValueError("Unsupported point cloud type")

        path = Path(filename)

        # Create all parent directories if they don't exist
        path.parent.mkdir(parents=True, exist_ok=True)

        # Check if the file exists
        if path.exists():
            print(f"Open3D: Overwriting {filename}.")
        else:
            print(f"Open3D: Creating {filename}.")

        # Write the point cloud to a PCD file
        o3d.io.write_point_cloud(filename, pcd)
    except Exception as e:
        raise ValueError(f"Error writing point cloud to '{filename}': {e}")
```

vmstavens avatar Jan 12 '25 18:01 vmstavens