launch icon indicating copy to clipboard operation
launch copied to clipboard

Feature Request: LaunchService wrapper for easy non-blocking launch and shutdown

Open KKSTB opened this issue 2 years ago • 5 comments
trafficstars

Feature request

Feature description

LaunchService has puzzled many people as to how to properly launch a LaunchDescription without blocking the main thread: https://github.com/ros2/launch/pull/210 https://answers.ros.org/question/321118/ros2-nodes-occasionally-dying-using-launchservice-in-a-subprocess/ https://github.com/ros2/launch/issues/126

So I propose either adding a start() function to LaunchService, or a new wrapper class, that spawns a daemon process to run the async launch loop, like this:

import asyncio
import multiprocessing

from launch import LaunchDescription, LaunchService


class Ros2LaunchParent:
    def start(self, launch_description: LaunchDescription):
        self._stop_event = multiprocessing.Event()
        self._process = multiprocessing.Process(target=self._run_process, args=(self._stop_event, launch_description), daemon=True)
        self._process.start()

    def shutdown(self):
        self._stop_event.set()
        self._process.join()

    def _run_process(self, stop_event, launch_description):
        loop = asyncio.get_event_loop()
        launch_service = LaunchService()
        launch_service.include_launch_description(launch_description)
        launch_task = loop.create_task(launch_service.run_async())
        loop.run_until_complete(loop.run_in_executor(None, stop_event.wait))
        if not launch_task.done():
            asyncio.ensure_future(launch_service.shutdown(), loop=loop)
            loop.run_until_complete(launch_task)

Implementation considerations

Besides launching a LaunchDescription, it would be better if there is another mode of launching an individual node and get its PID to operate the process, just like ROS1 does.

KKSTB avatar Aug 15 '23 07:08 KKSTB

HI @KKSTB, I am trying to replicate what you described in this issue, but for me this code is still blocking. Can you help me out?

This is what I have so far:

    def start_node_process(self, launch_description: LaunchDescription):
        self._stop_event = multiprocessing.Event()
        self._process = multiprocessing.Process(
            target=self._run_process,
            args=(self._stop_event, launch_description),
            daemon=True
        )
        self._process.start()

    def _run_process(self, stop_event, launch_description):
        loop = asyncio.get_event_loop()
        launch_service = LaunchService()
        launch_service.include_launch_description(launch_description)
        launch_task = loop.create_task(launch_service.run_async())
        loop.run_until_complete(loop.run_in_executor(None, stop_event.wait))
        if not launch_task.done():
            asyncio.ensure_future(launch_service.shutdown(), loop=loop)
            loop.run_until_complete(launch_task)

    def start_ros_node(self, node_dict):
        node = launch_ros.actions.Node(**node_dict)
        self.start_node_process(LaunchDescription([node]))

However, when I call start_ros_node it still blocks and never returns

Thanks in advance!

Rezenders avatar Nov 13 '23 17:11 Rezenders

Hi @Rezenders. I used it in a large project to launch many other python launch files without problem. Maybe you can try the followings to see what's wrong:

  1. Just like what I do, use launch description that includes a python launch file that launches something e.g. a talker node
  2. Just like what you do, use launch description to launch a node, but launches a talker node

KKSTB avatar Nov 14 '23 15:11 KKSTB

I did encounter some problem when my project grows to use several event loops. So a slight change to the code in OP is needed:

From:

loop = asyncio.get_event_loop()

To:

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

KKSTB avatar Nov 28 '23 04:11 KKSTB

I would have expected something like this to work, but it doesn't:

from launch import LaunchService, LaunchDescription
from launch.actions import IncludeLaunchDescription
from ament_index_python.packages import get_package_share_directory
from launch.launch_description_sources import AnyLaunchDescriptionSource
import pathlib
from multiprocessing import Process
import time

def get_launch_file(package_name, launch_file_name):
    path = pathlib.Path(get_package_share_directory(package_name), "launch", launch_file_name)
    assert path.exists()
    return path

def app_srv_via_launch_xml(app):
    """Provides a launch services for an application in the foreground.

    Assumes it has an XML launch file.

    Args:
        app (str): The application name, ie "demo_pkg".
    """
    launch_file = get_launch_file(app, f"{app}.launch.xml")

    ld = LaunchDescription(
        [
            IncludeLaunchDescription(AnyLaunchDescriptionSource([str(launch_file)])),
        ]
    )

    service = LaunchService()
    service.include_launch_description(ld)
    return service


def main():
    app_name = "demo_pkg"
    service = app_srv_via_launch_xml(app_name)
    demo_pkg_process = Process(target=service.run, daemon=True)
    demo_pkg_process.start()
    time.sleep(10)
    demo_pkg_process.terminate()
    print("Done")


if __name__=="__main__":
    main()

The process exits cleanly, but the demo_pkg node is till running.

Ryanf55 avatar Feb 27 '24 07:02 Ryanf55

The process exits cleanly, but the demo_pkg node is till running.

This is because using SIGTERM can result in orphaned processes (https://github.com/ros2/launch/blob/d9ffd805e3d9ca42fe4dd0019ae095e9eb0d4d72/launch/launch/launch_service.py#L209C53-L209C85).

To shutdown children processes, LaunchService.shutdown() should be called. Another method is to send SIGINT.

And since there is no way to send SIGINT to the LaunchService in your example, this run method probably cannot shutdown children processes once started.

KKSTB avatar Mar 01 '24 02:03 KKSTB