iceoryx
iceoryx copied to clipboard
When I delete the sleep statement of the publisher of the example iceoptions, I find that the subscriber will not receive some messages
Required information
Operating system: E.g. Ubuntu 20.04.4 LTS
Compiler version: E.g. gcc version 9.4.0 (Ubuntu 9.4.0-1ubuntu1~20.04.1)
Observed result or behaviour: When I delete the sleep statement of the publisher of the example iceoptions, I find that the subscriber will not receive some messages. The rule I found is that when the publisher sends messages very fast, for example, when there is no sleep statement or a small interval such as sleep 1 microsecond, the subscriber will lose some messages, even if you set both ConsumerTooSlowPolicy::WAIT_FOR_CONSUMER and QueueFullPolicy:: BLOCK_PRODUCER is also useless. Because of the high performance of iceoxy ipc, I want to use iceoryx for information exchange between the subsystems of one of my quantitative trading systems. I don't want any data loss, and I hope to get your help.
Expected result or behaviour: subscriber should receive all messages
Conditions where it occurred / Performed steps: delete the sleep statement of the publisher (file iox_publisher_with_options.cpp)
@byrnexu I looked into the code and run the following setup:
- remove the
std::this_thread::sleep_for(std::chrono::milliseconds(400));in the publisher - start the subscriber in the first terminal
- start the publisher after the subscriber is ready to receive
This lead to the loss of the first thousands samples but after the connection is fully established no more samples are lost. So I can confirm your initial observation.
When I modified the iox_publisher_with_options.cpp a bit and waited 400ms after publisher.offer() no more samples were lost:
publisher.offer();
//! [offer]
std::this_thread::sleep_for(std::chrono::milliseconds(400));
Background: RouDi receives subscription- or offer- service requests in the background via IPC (unix domain socket) messages and collects all tasks every 50ms. When the subscriber would like to subscribe to a topic it takes around 50ms until it is subscribed and then blocks the publisher. In this time you will loose samples since the publisher does not know about the new upcoming subscriber.
Furthermore, when the publisher does not have any subscriber it just sends out the samples unblocking until someone subscribes. For that late joiner you can always get the history of the last N samples but this history is very limited.
The solution for your system: You want to ensure that the publisher has your required subscriber before sending the data and you do not send data before the subscriber is present. This you can ensure easily by adding a simple busy loop right after offer. This would look like:
publisher.offer();
while (!publisher.hasSubscribers())
{
std::this_thread::yield(); // makes it less busy, signal the OS that others can do some work
}
This approach has the disadvantage that it has a high CPU load as long as the publisher is waiting for subscribers.
For such cases we also have in our internal API adaptive_wait where you can write stuff like:
#include "iceoryx_hoofs/internal/cxx/adaptive_wait.hpp"
publisher.offer();
wait_loop([&]{ !publisher.hasSubscribers(); });
This has a smart waiting strategy and the CPU load is minimal again but has the disadvantage that it is not public API and the API may change in the future (but this is unlikely). Just be warned.
@byrnexu I talked to @elBoberido and he hinted me that you also can set the publisherOptions.offerOnCreate = true so that the service is automatically offered when created.
When you start the subscriber first and then the publisher you wouldn't need in theory the publisher.hasSubscribers() loop - but just to be sure I would keep it.
But you still may have a caveat, what if you would like to have multiple subscribers. Once every subscriber is subscribed they won't miss any samples but until then the samples which were sent before the connection was established maybe lost (when they are not stored in the history) and the hasSubscribers() approach will not solve this.
If you have the use case of multiple subscribers we could add a function numberOfSubscribers() to the public API so that you also can solve this issue.
@elfenpiff Thank you very much for providing such an excellent library and patient answers. I don't have such a demand for multiple subscribers for the time being, but I feel that this is also a very useful interface.
@byrnexu I am always happy to help!
I would say this deserves an entry in the FAQ and then we close this issue.
@elfenpiff I can take care about extending the FAQ.
^_^ It is not a problem of the publisher waiting for subscribers, but a problem of the publisher pushing data too fast and the subscribers cannot receive all the data. I encountered the same problem as the questioner. How should this problem be solved?
@xfsaid did you set the subscriber options to QueueFullPolicy:: BLOCK_PRODUCER and publisher options to ConsumerTooSlowPolicy::WAIT_FOR_CONSUMER? If yes, could you provide a minimal example that trigger the behavior?
@elBoberido I have written the configuration you mentioned, and I have uploaded my example to github. https://github.com/xfsaid/iceoryx_demo The problem is in line 34 of the "cb_pub_demo.cpp" file. Thank you so much!
@xfsaid the data is not lost but waits in the receiver queue to be taken.
You can either change the onSampleReceivedCallback to this
while (subscriber->hasData()) {
counter++;
subscriber->take().and_then([](auto &sample) {
std::cout << "received x: " << sample->x << ", counter:" << counter << std::endl;
});
}
or use the WaitSet with a State instead of an Event.
The Listeneruses events, which are faster than states but this also means that it the publisher e.g. fires 5 events while the subscriber is just in the onSampleReceivedCallback it will be triggered only once after it leaves the callback. If you do not take care of taking all the data out of the queue, they will just stay there and the default queue size is 256 samples. The WaitSet supports states as well as events and if used with states it will fire as long as there are data in the queue.
@elfenpiff @mossmaurice I just discovered that our Listener examples are misleading. I guess the callback example can be changed to read all the samples in the callback but I'm not quite sure if the discovery example also needs changes.
@elBoberido This method does solve the problem of data loss, I will test other uses again. Thank you so much.
The examples will be fixed in #2251