ros1_bridge
ros1_bridge copied to clipboard
ROS1 Bridge doesn't work with Multithreaded Executor on a ROS2 service
Bug report
Required Info:
- Operating System: Ubuntu 18.04
- Installation type: Binary
- Version or commit hash: dashing
- DDS implementation: Default
- Client library (if applicable): N/A
Steps to reproduce issue
- Create a service with a Multithreaded executor in ROS2
- Connect to ROS1 via the ros1_bridge
- Call the ROS2 service through the bridge. Only a single thread runs on the service
#include <chrono>
#include <functional>
#include <memory>
#include <string>
#include <thread>
#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/string.hpp"
#include "sensor_msgs/msg/laser_scan.hpp"
#include "geometry_msgs/msg/vector3_stamped.hpp"
#include "std_srvs/srv/empty.hpp"
using namespace std::chrono_literals;
/**
* A small convenience function for converting a thread ID to a string
**/
std::string string_thread_id()
{
auto hashed = std::hash<std::thread::id>()(std::this_thread::get_id());
return std::to_string(hashed);
}
class PublisherNode : public rclcpp::Node
{
public:
PublisherNode()
: Node("PublisherNode")
{
publisher_ = this->create_publisher<geometry_msgs::msg::Vector3Stamped>("topic", 10);
auto timer_callback =
[this]() -> void {
auto message = geometry_msgs::msg::Vector3Stamped();
message.header.stamp = this->now();
this->publisher_->publish(message);
};
timer_ = this->create_wall_timer(500ms, timer_callback);
}
private:
rclcpp::TimerBase::SharedPtr timer_;
rclcpp::Publisher<geometry_msgs::msg::Vector3Stamped>::SharedPtr publisher_;
};
class DualThreadedNode : public rclcpp::Node
{
public:
DualThreadedNode()
: Node("DualThreadedNode")
{
/* These define the callback groups
* They don't really do much on their own, but they have to exist in order to
* assign callbacks to them. They're also what the executor looks for when trying to run multiple threads
*/
callback_group_subscriber1_ = this->create_callback_group(
rclcpp::CallbackGroupType::MutuallyExclusive);
callback_group_subscriber2_ = this->create_callback_group(
rclcpp::CallbackGroupType::MutuallyExclusive);
auto service = [this](const std::shared_ptr<rmw_request_id_t> request_header,
const std::shared_ptr<std_srvs::srv::Empty::Request> req,
const std::shared_ptr<std_srvs::srv::Empty::Response> res) -> void {
this->test_service(request_header, req, res);
};
callback_group4_ = this->create_callback_group(
rclcpp::callback_group::CallbackGroupType::MutuallyExclusive);
test_srv_ = this->create_service<std_srvs::srv::Empty>("test_service", service, rmw_qos_profile_default, callback_group4_);
auto sub1_opt = rclcpp::SubscriptionOptions();
sub1_opt.callback_group = callback_group_subscriber1_;
subscription1_ = this->create_subscription<geometry_msgs::msg::Vector3Stamped>(
"topic",
rclcpp::QoS(10),
std::bind(
&DualThreadedNode::subscriber1_cb,
this,
std::placeholders::_1),
sub1_opt);
}
private:
/**
* Every time the Publisher publishes something, all subscribers to the topic get poked
* This function gets called when Subscriber1 is poked (due to the std::bind we used when defining it)
*/
void subscriber1_cb(const geometry_msgs::msg::Vector3Stamped::SharedPtr msg)
{
vector_msg_ = msg;
}
void test_service(
const std::shared_ptr<rmw_request_id_t> request_header,
const std::shared_ptr<std_srvs::srv::Empty::Request> request,
const std::shared_ptr<std_srvs::srv::Empty::Response> response)
{
RCLCPP_INFO(this->get_logger(), "Received message in callback1");
RCLCPP_INFO(this->get_logger(), "msg timestamp : %f", rclcpp::Time(vector_msg_->header.stamp ).seconds());
auto received_time = this->get_clock()->now();
while (received_time + rclcpp::Duration(5,0) > rclcpp::Time(vector_msg_->header.stamp) && rclcpp::ok())
{
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
RCLCPP_INFO(this->get_logger(), "Exitted cb after 5 secs");
}
rclcpp::CallbackGroup::SharedPtr callback_group_subscriber1_;
rclcpp::CallbackGroup::SharedPtr callback_group_subscriber2_;
rclcpp::CallbackGroup::SharedPtr callback_group4_;
rclcpp::Subscription<geometry_msgs::msg::Vector3Stamped>::SharedPtr subscription1_;
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr subscription2_;
rclcpp::Service<std_srvs::srv::Empty>::SharedPtr test_srv_;
geometry_msgs::msg::Vector3Stamped::SharedPtr vector_msg_;
rclcpp::Time stamp_;
};
int main(int argc, char * argv[])
{
rclcpp::init(argc, argv);
// You MUST use the MultiThreadedExecutor to use, well, multiple threads
rclcpp::executors::MultiThreadedExecutor executor;
auto pubnode = std::make_shared<PublisherNode>();
auto subnode = std::make_shared<DualThreadedNode>(); // This contains BOTH subscriber callbacks.
// They will still run on different threads
// One Node. Two callbacks. Two Threads
executor.add_node(pubnode);
executor.add_node(subnode);
executor.spin();
rclcpp::shutdown();
return 0;
}
# source ros1 workspace
# source ros2 workspace
ros2 ros1_bridge dynamic_bridge --bridge-all-topics;
# source ros1 workspace
rosservice call /test_service std_srvs/srv/Empty;
Expected behaviour
Multithreaded executor should run multiple nodes in separate threads.
Actual behavior
ROS2 service is run in a single thread
Additional information
Feature request
ros1_bridge should support nodes with the Multithreaded executor
@joelbudu
Create a service with a Multithreaded executor in ROS2 Multithreaded executor should run multiple nodes in separate threads.
Do you expect that a service created by a node running on a multithreaded executor of ros2 can deal with multiple requests from ros1 at the same time? Or two more different services created by a node or nodes? I just want to make sure you make the right code (IIRC, the flag of Callback group, exclusive, will be checked inside if you use the default callback group only in one node) in ROS2.
About ros1_bridge
, there are some locations that seem to be updated if we want to support dealing with multiple requests from ros1 at the same time. such as,
-
use CallbackGroup with
Reentrant
(It just bridge the data for the service, so I think we can use reentrant here.) for https://github.com/ros2/ros1_bridge/blob/5d77ce0caae4133cbdafbfcfb89c91ba8d875771/include/ros1_bridge/factory.hpp#L354 -
use
MultipleThreadedExecutor
for https://github.com/ros2/ros1_bridge/blob/5d77ce0caae4133cbdafbfcfb89c91ba8d875771/src/dynamic_bridge.cpp#L791
BTW: could you provide an example and more detailed steps to reproduce what you expect?
Could you use foxy
or rolling
? Because it seems the EOL of dashing
is on May, 2021.
@iuhilnehc-ynos Thank you for your response. My use case is a scenario where you have a long-running blocking service that depends on other threads updating a member variable before returning the response. It is a single service per node but the node will have other callbacks from topics that need to be processed concurrently.
You have also pointed very correctly to the locations I thought would need updating to support this.
I would update the Issue with more detailed steps to reproduce as well.
With respect to the ros version I'm thinking it may be easy to merge the PR in both branches (including dashing). If it's not possible I will have to create a fork and have the change in there to be able to work in my current environment
You have also pointed very correctly to the locations
I am sorry, item 1 pointed before is the bridge for ros2 client
-> ros1 service
. (For your case, it should be https://github.com/ros2/ros1_bridge/blob/5d77ce0caae4133cbdafbfcfb89c91ba8d875771/include/ros1_bridge/factory.hpp#L331)
@joelbudu
My use case is a scenario where you have a long-running blocking service that depends on other threads updating a member variable before returning the response. It is a single service per node but the node will have other callbacks from topics that need to be processed concurrently.
Thank you for sharing this. At least you didn't use the same callback group (because of creating a service per node) for service callback, so we expect that the service callback can run on separate threads when using MultiThreadedExecutor.
And use ros::AsyncSpinner async_spinner(a_proper_number);
(not 1, refer to http://wiki.ros.org/roscpp/Overview/Callbacks%20and%20Spinning#Multi-threaded_Spinning) for
https://github.com/ros2/ros1_bridge/blob/5d77ce0caae4133cbdafbfcfb89c91ba8d875771/src/dynamic_bridge.cpp#L787
You have also pointed very correctly to the locations
I am sorry, item 1 pointed before is the bridge for
ros2 client
->ros1 service
. (For your case, it should behttps://github.com/ros2/ros1_bridge/blob/5d77ce0caae4133cbdafbfcfb89c91ba8d875771/include/ros1_bridge/factory.hpp#L331 )
I might need a bit of clarity on how this works. I thought the Multithreading/Callback setup would be setup on the ROS2 service ?
@joelbudu
In your case, you just cared about the ros2 service callback that can run on separate threads, so we just need to make sure the ros1 spin with multiple threads which can call ros1_service_callback in ros1_bridge
to request ros2 service
at the same time.
Could you just confirm if updating
https://github.com/ros2/ros1_bridge/blob/dashing/src/dynamic_bridge.cpp#L778
with ros::AsyncSpinner async_spinner(2);
is work for you?
Yes @iuhilnehc-ynos I can confirm updating https://github.com/ros2/ros1_bridge/blob/dashing/src/dynamic_bridge.cpp#L778 with ros::AsyncSpinner async_spinner(2); works for me
@joelbudu , Thank you for your confirmation. Before creating a PR for this feature, I'll go deep to check if it introduces some potential problems.
Hi, @joelbudu, I found the example you updated on https://github.com/ros2/ros1_bridge/issues/314#issue-894424329, only containing one service (test_srv_) with callback_group4_(MutuallyExclusive) in one node (DualThreadedNode), I am a little confused about what you expected.
** Actual behavior ** ROS2 service is run in a single thread
Actually, there is also a geometry_msgs::msg::Vector3Stamped publisher and subscriber running in different callback groups. So there is a shared variable that is being updated by the subscriber and the value of that variable is checked in the blocking while loop of the service
@iuhilnehc-ynos I've been following the PR request linked to this issue. I just wanted to confirm if for my scenario only changing the ros::AsyncSpinner async_spinner(2) would still suffice or is there any other change you would advice to change? Thanks
@joelbudu
If you just implemented a demo or a trial, I think the answer is yes
, otherwise, I don't recommend using loop wait inside a service callback, because the loop in the service callback will make ros1_bridge occupy one ros::AsyncSpinner thread. If there are many such waiting loops all over other services, even setting 100 for ros::AsyncSpinner
is not enough.
void test_service(
const std::shared_ptr<rmw_request_id_t> request_header,
const std::shared_ptr<std_srvs::srv::Empty::Request> request,
const std::shared_ptr<std_srvs::srv::Empty::Response> response)
{
RCLCPP_INFO(this->get_logger(), "Received message in callback1");
RCLCPP_INFO(this->get_logger(), "msg timestamp : %f", rclcpp::Time(vector_msg_->header.stamp ).seconds());
auto received_time = this->get_clock()->now();
while (received_time + rclcpp::Duration(5,0) > rclcpp::Time(vector_msg_->header.stamp) && rclcpp::ok())
// it seems an anti-pattern here
{
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
RCLCPP_INFO(this->get_logger(), "Exitted cb after 5 secs");
}