message_filters icon indicating copy to clipboard operation
message_filters copied to clipboard

Latest_time policy may in some case refrain from publishing for an aribitrarily long period

Open wuchenhaogit opened this issue 1 year ago • 4 comments
trafficstars

The latest_time filter only publishes when the arriving message comes from the pivot sensor. if (valid_rate && (i == find_pivot(now)) && is_full()) { publish(); } However, if all sensors sample at a gradually decreasing frequencies, i.e., each new message always arrives later than before, they may all fails to match the pivot (because new message arrives and decrease its frequency, pivot is thus changed). When this happens, the filter may not publishing for an extended, or even infinite period, despite all sensors functioning properly.

Here is an example. There are 2 sensors, initally with the same frequency. Starting from a time, both sensors' frequencies begin to decrease. Then pivot will change whenever a new message arrives, so that no pivot match, and thus no publishing will occur. Image below is the log of this example.

This impairs the usability of this policy. Because each time sensors decrease frequencies together, even if only for a small decreasing period in the fluctuation, the policy may refrain from publishing until the decreasing stops. The pubishing frequency is thus below the highest frequency among sensors, which does not meet the expectation for this policy.

To address this issue, it might be better to compulsely publish whenever current period has exceeded the pivot's frequency, i.e., use t_last to store last publishing time: int p = find_pivot(now); if (valid_rate && ( (i == p) || (now - t_last).seconds() >= 1 / rates_[p].hz) ) && is_full()) { publish(); t_last = now;'}` With this modification, the publishing frequency is guaranteed to approach the pivot's frequency.

wuchenhaogit avatar Jun 12 '24 04:06 wuchenhaogit

This was discussed in the issue triage meeting, and the conclusion was that we don't have anyone to look into this at the moment, but it would be helpful (for others that might help with this issue) if there was a simple isolated example, e.g. a bag file and some sample code to reproduce the issue. For now we will leave it here, but if you update this with more information it will be triaged again in the future.

Also as a note, please copy-paste text, rather than posting pictures of text. Thanks!

wjwwood avatar Jun 20 '24 20:06 wjwwood

@wjwwood can you put help-wanted label back on? i think you accidentally removed it?

fujitatomoya avatar Jun 27 '24 23:06 fujitatomoya

Thank you for paying attention to my issue. Below I provide my code that generates the result I described above.

#include <iostream>
#include <string>
#include <cfloat>
#include <chrono>
#include <memory>
#include <string>

#include <unistd.h>

#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/string.hpp"
#include "signal.h"

#include <message_filters/subscriber.h>
#include <message_filters/synchronizer.h>
#include <message_filters/sync_policies/latest_time.h>

#include <sensor_msgs/msg/joint_state.hpp>

using namespace std;

using namespace std::chrono_literals;

int test_num = 50;

class Publisher : public rclcpp::Node
{
public:
    Publisher(string topic_name, double initial_period1_, double initial_period2_, double delta_):
            Node(topic_name.c_str()), topic_name_(topic_name), initial_period1(initial_period1_), 
            initial_period2(initial_period2_), delta(delta_)
    {
        string topic_name1 = topic_name_ + "1";
        string topic_name2 = topic_name_ + "2";
        publisher1_ = this->create_publisher<sensor_msgs::msg::JointState>(topic_name1.c_str(), 10);
        publisher2_ = this->create_publisher<sensor_msgs::msg::JointState>(topic_name2.c_str(), 10);

        Thread = new std::thread(&Publisher::timer_callback,this);
    }

    void timer_callback()
    {
        rclcpp::Duration period1 = rclcpp::Duration(0, initial_period1*1e6);
        rclcpp::Duration period2 = rclcpp::Duration(0, initial_period2*1e6);

        // interleave two topics
        rclcpp::Duration initial_gap = rclcpp::Duration(0, initial_period1/2*1e6);
        
        // time of incoming messages of each channel
        rclcpp::Time next1 = this->now();
        rclcpp::Time next2 = next1 + initial_gap;

        // warm up the statistics
        cout << endl << "----- stable -----" << endl << endl;

        for(int i = 0 ; i<test_num ; ++i)
        {
            publisher1(next1);
            next1 += period1;
            rclcpp::sleep_for(std::chrono::nanoseconds((next2 - this->now()).nanoseconds()));
            
            publisher2(next2);
            next2 += period2;
            rclcpp::sleep_for(std::chrono::nanoseconds((next1 - this->now()).nanoseconds()));
        }


        // start to decrease the frequency
        cout << endl << "----- decreasing -----" << endl << endl;
        
        rclcpp::Duration period_increment = rclcpp::Duration(0, delta*1e6);
        for (int i = 0 ; i < test_num ; ++i) {
            publisher1(next1);
            period1 = period2 + period_increment; // new hz1 will be smaller than now hz2
            next1 += period1;
            rclcpp::sleep_for(std::chrono::nanoseconds((next2 - this->now()).nanoseconds()));
            
            publisher2(next2);
            period2 = period1 + period_increment; // new hz2 will be smaller than now hz1
            next2 += period2;
            rclcpp::sleep_for(std::chrono::nanoseconds((next1 - this->now()).nanoseconds()));
        }

        
        // be stable again. publishing should be normal again.
        cout << endl << "----- stable -----" << endl << endl;
        for (int i = 0 ; i < test_num ; ++i) {
            publisher1(next1);
            next1 += period1;
            rclcpp::sleep_for(std::chrono::nanoseconds((next2 - this->now()).nanoseconds()));
            
            publisher2(next2);
            next2 += period2;
            rclcpp::sleep_for(std::chrono::nanoseconds((next1 - this->now()).nanoseconds()));
        }

        kill(getpid(),SIGINT);
    }

    void publisher1(rclcpp::Time now)
    {
        auto message = sensor_msgs::msg::JointState();
        message.header.stamp = now;
        // cout << "pub " << topic_name_ << " time " << fixed << setprecision(9) << now.seconds() << endl;
        
        publisher1_->publish(message);
    }

    void publisher2(rclcpp::Time now)
    {
        auto message = sensor_msgs::msg::JointState();
        message.header.stamp = now;
        // cout << "pub " << topic_name_ << " time " << fixed << setprecision(9) << now.seconds() << endl;
        
        publisher2_->publish(message);
    }


private:
    string topic_name_;
    std::thread* Thread;
    rclcpp::Publisher<sensor_msgs::msg::JointState>::SharedPtr publisher1_;
    rclcpp::Publisher<sensor_msgs::msg::JointState>::SharedPtr publisher2_;
    double initial_period1;
    double initial_period2;
    double delta;
};


using namespace message_filters;

using std::placeholders::_1;
using std::placeholders::_2;


class SubscriberTopic2 : public rclcpp::Node {
    public:
    SubscriberTopic2() :
        Node("subscriber_topic2")
    {
        topic1_sub_ = this->create_subscription<sensor_msgs::msg::JointState>("topic1", 300, 
                                                                        std::bind(&SubscriberTopic2::callback1, this, _1));
        topic2_sub_ = this->create_subscription<sensor_msgs::msg::JointState>("topic2", 300, 
                                                                        std::bind(&SubscriberTopic2::callback2, this, _1));

        sync_.registerCallback(std::bind(&SubscriberTopic2::callback, this, _1, _2));
    }

    ~SubscriberTopic2()
    {
    }

    private:

    void callback(const sensor_msgs::msg::JointState::ConstSharedPtr& msg1, const sensor_msgs::msg::JointState::ConstSharedPtr& msg2)
    {
        cout << "published at " << fixed << setprecision(9) << this->now().seconds() << endl;
        cout << "\tmsg1 " << fixed << setprecision(9) << (double)msg1->header.stamp.sec + 1e-9*(double)msg1->header.stamp.nanosec << endl;
        cout << "\tmsg2 " << fixed << setprecision(9) << (double)msg2->header.stamp.sec + 1e-9*(double)msg2->header.stamp.nanosec << endl;
    }

    void callback1(sensor_msgs::msg::JointState::SharedPtr msg)
    {
        cout << "sub1 " << fixed << setprecision(9) << (double)msg->header.stamp.sec + 1e-9*(double)msg->header.stamp.nanosec << endl;
        
        sync_.add<0>(msg);
    }

    void callback2(sensor_msgs::msg::JointState::SharedPtr msg)
    {
        cout << "sub2 " << fixed << setprecision(9) << (double)msg->header.stamp.sec + 1e-9*(double)msg->header.stamp.nanosec << endl;
        
        sync_.add<1>(msg);
    }


    rclcpp::CallbackGroup::SharedPtr callback_group_subscriber_;

    rclcpp::Subscription<sensor_msgs::msg::JointState>::SharedPtr topic1_sub_;
    rclcpp::Subscription<sensor_msgs::msg::JointState>::SharedPtr topic2_sub_;

    typedef Synchronizer<sync_policies::LatestTime<sensor_msgs::msg::JointState, sensor_msgs::msg::JointState> > Sync;

    Sync sync_;
};

using namespace std;


int main(int argc, char * argv[])
{
    double ini1;    // initial period of channel 1 (in ms)
    double ini2;    // initial period of channel 2 (in ms)
    double delta;   // increment of period
    if(argc == 1){ // default (my previous example)
        test_num = 50;       // number of messages in decreasing & stable pattern
        ini1 = 20;
        ini2 = 20;
        delta = 0.2667;
    }
    else {
        assert(argc == 5);
        test_num = atoi(argv[1]);
        ini1 = atof(argv[2]);
        ini2 = atof(argv[3]); 
        delta = atof(argv[4]);
    }

    rclcpp::init(argc, argv);
    rclcpp::executors::SingleThreadedExecutor executor;
    // rclcpp::executors::MultiThreadedExecutor executor(rclcpp::ExecutorOptions(),50);

    auto sub_node = std::make_shared<SubscriberTopic2>();
    executor.add_node(sub_node);

    string topic_name = "topic";
    
    auto pub_node = std::make_shared<Publisher>(topic_name, ini1, ini2, delta);
    executor.add_node(pub_node);

    executor.spin();

    executor.remove_node(sub_node);
    executor.remove_node(pub_node);

    rclcpp::shutdown();
    return 0;
}

wuchenhaogit avatar Jul 01 '24 15:07 wuchenhaogit

@wjwwood @fujitatomoya I have reproduced the issue, and I would like to try to fix it if no one else is already looking at it.

nicolaloi avatar Aug 17 '24 16:08 nicolaloi