rclcpp icon indicating copy to clipboard operation
rclcpp copied to clipboard

[Proposal] Adding single message getting API to rclcpp

Open SteveMacenski opened this issue 2 years ago • 26 comments

Problem statement:

We're missing a way to get a single message from a topic. This question has come up a number of times in conversations with my colleagues, ROS Answers question askers, and myself even noticing the absense of this API. ROS 1 had this as waitForMessage().

An example use-case is how I used it in ROS 1 in SLAM Toolbox: when a service requests deserialization of a previous SLAM session, I want to get a single message from the current laser scanner to match it to the serialized one to ensure compatibility (else throw exception). Another example mentioned to me by a colleague was getting a single message from the camera info topic from a fixed-focus camera since it never changes to store and have the input to a machine learning system.

There are plenty of examples of a need for a way to get a message from a topic in a polling / blocking manner. I don't think that it is wise for architectural reasons to lean on this kind of method for regular polling of topics and should be advised in documentation that it is used as only when the odd situation requires it. As such, we don't need to be excessively concerned with performance but it shouldn't be outrageously bad (impl proposal below is naive but fine from that perspective).

For better performance or for use of regular polling, it is certainly possible to create subscriptionPolls, or a wrapper on subscriptions that handle message gathering and contain a method getLastMessage() to grab the last message if the programming style for regular polling is desirable in an efficient way. I think that is out of scope of this particular feature request but also could be useful for long-running subscriber/regular polling needs (though easy and common for users to bake into their nodes).

Proposed solution:

I propose adding a top-level rclcpp API called getSingleMessage or getLatestMessage (or similar) which will create a subscription within the function and block until a message is recieved or a timeout expires. An example can be seen below:

template <typename MsgT, NodeT = rclcpp::Node>
MsgT::SharedPtr rclcpp::getSingleMessage<MsgT, NodeT>(
  NodeT node,
  const std::string & topic,
  const rclcpp::Duration & timeout_dur = rclcpp::Duration(0),
  const rmw_qos_profile_t qos_profile = rmw_qos_profile_default,
  const rclcpp::callback_group::CallbackGroup::SharedPtr callback_group = nullptr)
{
  rclcpp::Time start = node->now();
  MsgT::SharedPtr my_msg;
  auto sub = node->create_subscription<MsgT>(topic, [&](const MsgT::SharedPtr msg){my_msg = msg;}, QoS, callback_group);

  while (node->now() - start < timeout_dur) {
    rclcpp::spin_some(node);
    if (my_msg) {
      sub.reset();
      return my_msg;
    }
  }

  sub.reset();
  throw std::runtime_error("Some error msg");
}

Elements I think are most important:

  • Ability to have a timeout for maximum time to block
  • Setting QoS / callback group to use for subscription (optionally)
  • Templated based on node type so it can support both rclcpp::Node and rclcpp_lifecycle::LifecycleNode (and others in the future)
  • Handled in the client library so users can easily access rather than baking the above logic in-line in application code that can be easily generalized

Documentation and other considerations:

  • Should be for isolated or single use, not polling for a topic. This method is inefficient for regular use and not meant for that.
  • Should we add in executor internally so doesn't disrupt program flow? I don't care either way, but seems like it might be excessive but happy to do it if we feel that's the right direction to replace callback groups.

SteveMacenski avatar Jun 13 '22 20:06 SteveMacenski

@SteveMacenski

this does not work for your use case?

https://github.com/ros2/rclcpp/blob/8e6a6fb32d8d6a818b483660e326f2c5313b64ae/rclcpp/include/rclcpp/wait_for_message.hpp#L78-L94

fujitatomoya avatar Jun 13 '22 23:06 fujitatomoya

Exactly like that! Why is that not included in any of the API documentation https://docs.ros2.org/galactic/api/rclcpp/index.html (exists for Foxy, Galactic but neither's API docs include this and searching for it on google comes up with no API or documentation results)?

I also don't see a similar thing for rclpy, but maybe also another case of just not included in the documentation?

SteveMacenski avatar Jun 13 '22 23:06 SteveMacenski

that..... i am not sure 😢 @clalancette could you give us some help here?

fujitatomoya avatar Jun 13 '22 23:06 fujitatomoya

I also don't see a similar thing for rclpy, but maybe also another case of just not included in the documentation?

AFAIK, rclpy does not have such API. but i think it is doable, adding the subscription into wait set, and wait for specific time window.

fujitatomoya avatar Jun 13 '22 23:06 fujitatomoya

I could also work on adding it to rclpy to make them eq (also my colleagues want it in rclpy anyway), but I'd like to resolve the missing documentation here too. Also happy to help do what's necessary on that front.

SteveMacenski avatar Jun 14 '22 03:06 SteveMacenski

It looks like the wait_for_message API requires that the node is not already added to an executor, right? (otherwise you would have a node in 2 waitsets) This essentially means that it can't be called "from inside" a node because the node does not have the knowledge to determine whether it's going to be added to an executor or not (and when this will happen)

alsora avatar Jun 14 '22 12:06 alsora

Exactly like that! Why is that not included in any of the API documentation https://docs.ros2.org/galactic/api/rclcpp/index.html

That's because the documentation doesn't live there anymore. We haven't been great about announcing it, but the documentation for rclcpp in e.g. Humble lives at http://docs.ros.org/en/humble/p/rclcpp/ (you can get to it via https://index.ros.org/p/rclcpp/github-ros2-rclcpp/#humble , and then clicking on the "API Docs" in the upper right). Documentation for Rolling also lives there, and is where documentation will be going forward.

clalancette avatar Jun 14 '22 13:06 clalancette

But this API is also in Foxy and Galactic, neither of which show this API. Why is that and can be add it? Foxy/Galactic doesn't exist in that URL-space for the updated docs (also begs the question of why we changed the format to much, but that's another topic).

Also the fact that this can't be run inside a node kind of negates alot of the utility of this feature and isn't what I was meaning in my feature proposal, so I don't think this is a sufficient solution. This needs to be able to work in some timer callback or in application code. I suggested in the proposal perhaps having an internal executor would be an unideal but possible option to keep it isolated from the rest of the system.

SteveMacenski avatar Jun 14 '22 20:06 SteveMacenski

But this API is also in Foxy and Galactic, neither of which show this API.

Likely that's because this was backported into Foxy and Galactic, and we only ever ran the documentation generation for those releases at release time. That's one of the benefits of the current revamp of the documentation site; we have jobs now that continually run and update the documentation.

Why is that and can be add it?

Possibly. I actually don't know how the earlier documentation was generated, maybe @wjwwood @jacobperron or @cottsay can pipe in.

Foxy/Galactic doesn't exist in that URL-space for the updated docs

Yes, unfortunately it is difficult to turn on the new documentation generator for the old releases. Going forward from Humble, they should all use the new documentation generation.

(also begs the question of why we changed the format to much, but that's another topic).

Mostly for consistency with https://docs.ros.org, which also uses the readthedocs theme.

clalancette avatar Jun 14 '22 20:06 clalancette

It looks like the wait_for_message API requires that the node is not already added to an executor, right? (otherwise you would have a node in 2 waitsets)

i am not sure about this. i have never tried to use. it seems that it only requires context from node?

fujitatomoya avatar Jun 14 '22 21:06 fujitatomoya

@fujitatomoya here the basic usage:

#include <rclcpp/rclcpp.hpp>
#include "rclcpp/wait_for_message.hpp"
#include <std_msgs/msg/string.hpp>

using namespace std::chrono_literals;

int main(int argc, char **argv)
{
    // Init ROS
    rclcpp::init(argc, argv);

    auto node = std::make_shared<rclcpp::Node>("wait_for_message_node");
    std_msgs::msg::String out;
    RCLCPP_INFO(node->get_logger(), "Waiting for message...");
    auto ret = rclcpp::wait_for_message(out, node, "wait_for_message_topic"/*, 5s*/);
    if (ret)
        RCLCPP_INFO(node->get_logger(), "Got it!");

    // Shut down ROS
    rclcpp::shutdown();

    return 0;
}

llapx avatar Jun 17 '22 00:06 llapx

@llapx thanks, we know this works.

the question was, what if we use already added to an executor node to issue wait_for_message. that is what i am not sure.

It looks like the wait_for_message API requires that the node is not already added to an executor, right? (otherwise you would have a node in 2 waitsets)

fujitatomoya avatar Jun 17 '22 01:06 fujitatomoya

@fujitatomoya

the question was, what if we use already added to an executor node to issue wait_for_message. that is what i am not sure.

OK, I'll have a try.

llapx avatar Jun 17 '22 01:06 llapx

@fujitatomoya

the question was, what if we use already added to an executor node to issue wait_for_message. that is what i am not sure.

Create a node for issue wait_for_message, in my option, here's the sample:

#include <rclcpp/rclcpp.hpp>
#include "rclcpp/wait_for_message.hpp"
#include <std_msgs/msg/string.hpp>

class WaitForMsg : public rclcpp::Node
{
public:
    WaitForMsg()
        : Node("demo_node")
    {
        auto callback =
            [this](const std_msgs::msg::String::SharedPtr msg) -> void
        {
            RCLCPP_INFO(this->get_logger(), "I heard: [%s]", msg->data.c_str());
        };
        sub_ = this->create_subscription<std_msgs::msg::String>("topic02", 10, callback);

        // wait for message
        node_ = std::make_shared<rclcpp::Node>("wait_for_msg_node");
        std::thread(
            [&]()
            {
                while (rclcpp::ok())
                {
                    std_msgs::msg::String out;
                    auto ret = rclcpp::wait_for_message(out, node_, "topic01");
                    if (ret)
                        RCLCPP_INFO(node_->get_logger(), "I heard: [%s]", out.data.c_str());
                }
            }).detach();
    }

private:
    rclcpp::Subscription<std_msgs::msg::String>::SharedPtr sub_;
    std::shared_ptr<rclcpp::Node> node_;
};

int main(int argc, char **argv)
{
    rclcpp::init(argc, argv);

    rclcpp::spin(std::make_shared<WaitForMsg>());
    rclcpp::shutdown();

    return 0;
}

llapx avatar Jun 17 '22 09:06 llapx

The fact you need the second node is what I have a problem with. That's why I proposed potentially having an internal SingleThreadedExecutor that is used to spin to grab a message so that it doesn't mess with the execution model of the program at large if in fact this is called in the callback of another interface (subscriber, timer, service):

Should we add in executor internally so doesn't disrupt program flow? I don't care either way, but seems like it might be excessive but happy to do it if we feel that's the right direction to replace callback groups.

This is kind of the direction that alot of things are going to have to go long-term in ROS 2 I predict (ex. TF2 using a background thread not to jam up traffic of the application). Utilities are going to need to spin up threads to do background processing so that the user's thread can be used to build an application without having to deeply understand the subtleties of the execution model of each of the utility or client library provided functions.

Though, it would be great if we could detect if the node that is passed is currently spinning or not. If its not executing something (e.g. we're not in the middle of a callback from a subscription, timer, etc) we would choose to not create the the executor and just use the node's idle one. Use'm if you need'm, but prefer not to. That way 1 implementation of this function could supply both the current behavior and the behavior we're looking for.

SteveMacenski avatar Jun 17 '22 19:06 SteveMacenski

Maybe this behavior can be implemented by using the recently added RMW Listener APIs: https://github.com/ros2/rclcpp/blob/master/rclcpp/include/rclcpp/subscription_base.hpp#L292-L361 From inside a node you can register a callback on a subscription and this callback will be invoked when a message is received. Although this callback does not have direct access to the message, you know that the message is available so it can be read from the subscription object itself. I "think" that all this should work and do not cause compatibility issues with the executor.

alsora avatar Jun 17 '22 19:06 alsora

This is kind of the direction that alot of things are going to have to go long-term in ROS 2 I predict (ex. TF2 using a background thread not to jam up traffic of the application).

I do think that it warrants a closer look. That is, adding these extra threads everywhere works, but seems like kind of a hack. That suggests to me that maybe our current design could use some changes.

I think it would be worthwhile to go in and look at ROS 1, and also other event loops (like the Qt and Gtk ones) to see how they handle similar situations. Certainly ROS 2 can't be the only event loop implementation to have wrestled with this question.

clalancette avatar Jun 17 '22 19:06 clalancette

@alsora that could help, but I'm not seeing how that entirely resolves the issue - do you have pseudo-code that would demonstrate the utility for how we'd use that in context? I'm thinking that it could be used so that we can have the input node create the subscriber on its executor and trigger a callback when a message is received - but would that callback itself trigger if the subscriber was created in another callback function (e.g. we're calling wait_for_message inside of a timer or service callback)? If we can get to that point, I can see how we could just hank the message off the queue, but that would kind of be violating the model. I think its fine for this particular situation since its narrowly scoped but might not set a good example?

That suggests to me that maybe our current design could use some changes.

Completely agree. I think that's already evidenced by the fact that you must create another executor to have a spinning node for subscriptions / expose services and actions but also call a service/action yourself using spin_until_future_complete since it would add the executor to 2 spinning actions at once to both expose and utilize common ROS interfaces.

For now, auditing the core tools and making sure anywhere that services/actions or non-trivial number of subscription callbacks are placed on their own executors would be great. We did the TF2 migration as part of trying to get 1 node per server within Nav2, but I suspect there are other utilities that should be as well. Although, I'll admit that in trying to come up with a few more examples, I ran a little bit dry. Rviz2 plugins, bond, places where user-defined plugins are loaded so that they can call whatever interfaces they require without worry come to mind. Otherwise, most things are higher level in places like Nav2/Moveit2/user application software where when they need to themselves host and call ROS interfaces run into the above mentioned execution model headaches. But at least that's with their own software that they themselves wrote and are staring at, and not some library they happen to be using that locks up the system in subtle ways that aren't immediately obvious unless you know how they work internally. Those internal library use-cases are the 'got-chas' I think it would be good to spend some time and make sure users don't run into. Happy to help with that if we identify more.

SteveMacenski avatar Jun 17 '22 20:06 SteveMacenski

The listener APIs are completely independent from concepts such as waitsets/callback groups/executors/etc. They are executed when a message is received: if it's an inter-process message they are executed by the RMW thread, if it's an intra-process message they are executed by the publisher thread. This means that in your example they would trigger even if the subscriber was created inside another callback function.

As you said, it is possible to extract the message out of the queue at that point. The correct way to do that would be creating some new rclcpp APIs to handle that for you. I don't think that this would violate any model: listener APIs are set independently on subscription objects, this means that whoever does that has ownership of the subscription so they should be ok with processing this message outside of the executor thread.

alsora avatar Jun 22 '22 12:06 alsora

@SteveMacenski @alsora either Listener API or internal executor, i agree the current user experience is not good.

fujitatomoya avatar Jun 22 '22 23:06 fujitatomoya

@SteveMacenski

this does not work for your use case?

https://github.com/ros2/rclcpp/blob/8e6a6fb32d8d6a818b483660e326f2c5313b64ae/rclcpp/include/rclcpp/wait_for_message.hpp#L78-L94

Hello I've been trying to use this for my node, however, this is very limited the usecase only support the default QoS. Unfortunately, If a publisher uses a different QoS, this won't work. I think either we have to make it work for every QoS type or have to add a new argument of QoS that allows the user to specify it on the first hand.

saikishor avatar Dec 12 '22 16:12 saikishor

@saikishor There's actually an overloaded function that lets you pass a subscriber with specific QoS set here. However, when I tried using that with a transient local subscriber, e.g.: auto sub = nodePtr_->create_subscription<my_messages::msg::MsgType>("/topic", rclcpp::QoS(1).reliability(RMW_QOS_POLICY_RELIABILITY_RELIABLE).durability(RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL), [](const std::shared_ptr<my_messages::msg::MsgType>) {}); I always get ONLY the first message that has been published in the past - what I need is the last one (or all of them) and can't get that for the life of me. Any one experienced something similar?

mikramarc avatar Dec 14 '22 13:12 mikramarc

@mikramarc a bit late to this but did you manage to get your example to work properly? Doing something similar.

Also are you using intra-process communication? I believe there exists a bug for transient local (https://github.com/ros2/rclcpp/issues/1753) that may be related

tgroechel avatar Jun 02 '23 17:06 tgroechel

@mikramarc a bit late to this but did you manage to get your example to work properly? Doing something similar.

Also are you using intra-process communication? I believe there exists a bug for transient local (#1753) that may be related

Yes, for the trasient local it doesn't work for me too. For the rest of the cases, it seems to work.

saikishor avatar Jun 19 '23 07:06 saikishor

canbe closed?

ZhenshengLee avatar Jun 21 '24 02:06 ZhenshengLee

i did not have time to test https://github.com/ros2/rclcpp/issues/1953#issuecomment-1155096374 with current executor implementation. (during jazzy, waitset and executor related area has been significantly modified.) otherwise, i think we can keep this open.

fujitatomoya avatar Jun 21 '24 04:06 fujitatomoya