Dropped samples after system clock adjustments
Is there an already existing issue for this?
- [X] I have searched the existing issues
Expected behavior
When dealing with system clock adjustments (manually or due to clock server synchronization), my subscriber drops all samples once the system has been set back into the past. This seems to be due to DataReaderHistory::received_change_keep_last / DataReaderHistory::completed_change_keep_last comparing all the incoming samples' sourceTimestamp (which will be a timestamp in the past once the clock has been changed) to the first change's, leading to all samples to be dropped:
CacheChange_t* first_change = instance_changes.at(0);
if (change->sourceTimestamp >= first_change->sourceTimestamp)
{
// As the instance is ordered by source timestamp, we can always remove the first one.
ret_value = remove_change_sub(first_change);
}
else
{
// Received change is older than oldest, and should be discarded
return true;
}
If I remove the if and simply drop the first sample everything seems to work unaffected from the system clock adjustments.
Note that I would expect the current fast dds behaviour if DestinationOrderQosPolicy were implemented (which it is not) and set to BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS. However, according to the manual should be BY_RECEPTION_TIMESTAMP_DESTINATIONORDER_QOS .
Note that I have created a pull request for additional observations when the system clock is adjusted (PR #5018).
This might be related to #4850.
Current behavior
Samples get dropped when publisher's system clock is set into the past.
Steps to reproduce
Set back system clock after disabling automatic synchronization via NTP.
Fast DDS version/commit
2.14.2
Platform/Architecture
Ubuntu Focal 20.04 amd64
Transport layer
Shared Memory Transport (SHM)
Additional context
No response
XML configuration file
No response
Relevant log output
No response
Network traffic capture
No response
In my opinion, the stack functionality should be as independent of the system clock as possible. Otherwise, hard-to-diagnose errors will occur at random points in time (when the system clock jumps forward or backward), such as @ma30002000 pointed out above. From this point of view, the use of sourceTimestamp (in the basis of the system clock) should be minimized. At the same time, sourceTimestamp is used in the stack in the following five places:
- Where @ma30002000 indicated, namely in the
DataReaderHistory::received_change_keep_last()andDataReaderHistory::completed_change_keep_last()methods. This condition for discarding accepted samples contradicts the using QoSBY_RECEPTION_TIMESTAMP_DESTINATIONORDER_QOS- i.e. the reader history should not be sorted by source time. Also these samples with large sequence numbers are silently discarded without notifying the user (REJECTED_BY_...callback is not called). I suggest that these sample drop conditions be excluded from the code for QoSBY_RECEPTION_TIMESTAMP_DESTINATION_ORDER_QOS. - The reader's history is sorted by the timestamp of the source using the following comparison function:
https://github.com/eProsima/Fast-DDS/blob/f973dc5dd21676c0cbd359154106aff7f8ee351a/src/cpp/rtps/common/ChangeComparison.hpp#L28-L35
This does not correspond to the set QoS parameter
BY_RECEPTION_TIMESTAMP_DESTINATION_ORDER_QOS. I propose to implement sorting in accordance with QoSBY_RECEPTION_TIMESTAMP_DESTINATION_ORDER_QOSand without usingsourceTimestamp. In this case, it will probably be enough to use the sample sequence numbers from the correspondingDataWriter. - In the implementation of the QoS Lifespan for
DataWriter. From the point of view of practical use, this parameter should work in terms of local time and not depend on system clock jumps (in the case of a backward jump, the sample will mistakenly be in the history for more than a givenLifespan duration, and in the case of a forward jump, it will be removed from the history ahead of time). Technically, this is implemented by adding a steady clock timestamp to theCacheChange_twith its analysis in the methodDataWriterImpl::lifespan_expired(). - In the implementation of the QoS Lifespan for
DataReader. The reader's implementation is based on the assumption that the Writer's and Reader's system clocks are synchronized. If this is not the case, the received samples will be discarded in theDataReaderImpl::on_new_cache_change_added()method. Incorrect operation will also be observed here when time jumps forward or backward (by analogy with paragraph 3 above). It would be more interesting for the user to get a mechanism to control the obsolescence of samples in the Reader's history, which works based on the local(steady) clock and does not depend on the quality of synchronization of the system clock. I suggest considering such an implementation (perhaps with its activation from the new QoS parameter forDataReaderfor preserve the old default behavior of Lifespan QoS). - In the implementation of
disable_positive_ackin theStatefulWriterside. I suggest considering the possibility of using a steady clock with its stamping at theCacheChange_t(see paragraph 3 above).
Please comment on the five points presented above.
Hi @ma30002000, Thanks for using Fast DDS. We are trying to reproduce the issue and investigate. We will come back with a feedback.
Any indications and hints concerning the root cause (and a possible fix) would be highly appreciated..
@ma30002000 @i-and
Thank you for you patience here.
I have rebased #5018 and added some commits that
- Refactor parts of the code using
std::system_clockto always useTime_tequivalent code. - Update
current_time_since_unix_epochsoTime_t::now()is steady.
These changes improve resilience against system_clock being updated while the application is running.
Would you please review / test those changes?
Regarding lifespan, section 2.2.3.16 of the DDS Standard states:
- The ‘expiration time’ of each sample is computed by adding the duration specified by the LIFESPAN QoS to the source timestamp
- This QoS relies on the sender and receiving applications having their clocks sufficiently synchronized
So there's not much we can do here.
In the future, we could add a way for the user to inject a method that returns the current time, and make Time_t::now() depend upon it.
What do you think about that?
@ma30002000 @i-and FYI, we have just merged #5018 to master, and ordered backports to the supported versions (2.14.x, and 2.10.x)
Hi @MiguelCompany, I will try to see the changes by the end of this week.
The proposed changes do not completely solve the issue. To confirm this, I wrote a test based on the example of hello_world and the revision of git 70314ceb6 (Date: Fri Sep 6 08:48:14 2024 +0200). It is also necessary to patch the following:
diff --git a/examples/cpp/hello_world/PublisherApp.cpp b/examples/cpp/hello_world/PublisherApp.cpp
index 63b433113..3bb9cc515 100644
--- a/examples/cpp/hello_world/PublisherApp.cpp
+++ b/examples/cpp/hello_world/PublisherApp.cpp
@@ -130,6 +130,16 @@ void PublisherApp::run()
{
while (!is_stopped() && ((samples_ == 0) || (hello_.index() < samples_)))
{
+ if (hello_.index() >= 5)
+ {
+ std::cout << "Index=" << hello_.index() << ". Paused forever..." << std::endl;
+ std::unique_lock<std::mutex> period_lock(mutex_);
+ cv_.wait(period_lock, [&]()
+ {
+ return is_stopped();
+ });
+ }
+
if (publish())
{
std::cout << "Message: '" << hello_.message() << "' with index: '" << hello_.index()
diff --git a/examples/cpp/hello_world/WaitsetSubscriberApp.cpp b/examples/cpp/hello_world/WaitsetSubscriberApp.cpp
index f039365b6..2581a83f6 100644
--- a/examples/cpp/hello_world/WaitsetSubscriberApp.cpp
+++ b/examples/cpp/hello_world/WaitsetSubscriberApp.cpp
@@ -21,6 +21,7 @@
#include <condition_variable>
#include <stdexcept>
+#include <thread>
#include <fastdds/dds/core/condition/GuardCondition.hpp>
#include <fastdds/dds/core/condition/WaitSet.hpp>
@@ -134,11 +135,11 @@ void WaitsetSubscriberApp::run()
reader_->get_subscription_matched_status(status_);
if (status_.current_count_change == 1)
{
- std::cout << "Waitset Subscriber matched." << std::endl;
+ std::cout << "Waitset Subscriber matched. Handle=" << status_.last_publication_handle << std::endl;
}
else if (status_.current_count_change == -1)
{
- std::cout << "Waitset Subscriber unmatched." << std::endl;
+ std::cout << "Waitset Subscriber unmatched." << status_.last_publication_handle << std::endl;
}
else
{
@@ -150,6 +151,12 @@ void WaitsetSubscriberApp::run()
if (changed_statuses.is_active(StatusMask::data_available()))
{
SampleInfo info;
+ if (received_samples_ == 1)
+ {
+ auto pause = std::chrono::milliseconds(60000);
+ std::cout << "When data available WAITING for " << pause.count() << std::endl;
+ std::this_thread::sleep_for(pause);
+ }
while ((!is_stopped()) &&
(RETCODE_OK == reader_->take_next_sample(&hello_, &info)))
{
@@ -158,7 +165,7 @@ void WaitsetSubscriberApp::run()
received_samples_++;
// Print Hello world message data
std::cout << "Message: '" << hello_.message() << "' with index: '"
- << hello_.index() << "' RECEIVED" << std::endl;
+ << hello_.index() << "' RECEIVED from " << info.publication_handle << std::endl;
if (samples_ > 0 && (received_samples_ >= samples_))
{
stop();
diff --git a/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp b/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp
index 0dcd18a45..f0bcfed2e 100644
--- a/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp
+++ b/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp
@@ -262,6 +262,7 @@ bool DataReaderHistory::received_change_keep_last(
DataReaderInstance::ChangeCollection& instance_changes = vit->second->cache_changes;
if (instance_changes.size() < static_cast<size_t>(history_qos_.depth))
{
+ std::cout << "history_qos_.depth=" << history_qos_.depth << std::endl;
ret_value = true;
}
else
@@ -270,11 +271,17 @@ bool DataReaderHistory::received_change_keep_last(
CacheChange_t* first_change = instance_changes.at(0);
if (a_change->sourceTimestamp >= first_change->sourceTimestamp)
{
+ std::cout << "As the instance is ordered by source timestamp, we can always remove the first one. "
+ "history_qos_.depth=" << history_qos_.depth << ", a_change->sourceTimestamp=" << a_change->sourceTimestamp <<
+ ", first_change->sourceTimestamp=" << first_change->sourceTimestamp << std::endl;
// As the instance is ordered by source timestamp, we can always remove the first one.
ret_value = remove_change_sub(first_change);
}
else
{
+ std::cout << "Received change is older than oldest, and should be discarded. "
+ "history_qos_.depth=" << history_qos_.depth << ", a_change->sourceTimestamp=" << a_change->sourceTimestamp <<
+ ", first_change->sourceTimestamp=" << first_change->sourceTimestamp << std::endl;
// Received change is older than oldest, and should be discarded
return true;
}
After compiling the hello_world example, open three terminals and run the following in each:
- In the first terminal, run
./hello_world subscriber -w. The subsequent actions in the 2nd and 3rd terminals must be performed in a time not exceeding 60 seconds. - In the second terminal:
./hello_world publisher. - After disabling the time synchronization service (
sudo systemctl stop systemd-timesyncd) - set the system clock back (for example, by one hour). - In the third terminal:
./hello_world publisher. As a result:
Subscriber running. Please press Ctrl+C to stop the Subscriber at any time.
Waitset Subscriber matched. Handle=1.f.f7.f3.ed.58.f4.9f.0.0.0.0.0.0.1.3
history_qos_.depth=1
Message: 'Hello world' with index: '1' RECEIVED from 1.f.f7.f3.ed.58.f4.9f.0.0.0.0.0.0.1.3
history_qos_.depth=1
When data available WAITING for 60000
As the instance is ordered by source timestamp, we can always remove the first one. history_qos_.depth=1, a_change->sourceTimestamp=1725831240.997453690, first_change->sourceTimestamp=1725831240.896825205
As the instance is ordered by source timestamp, we can always remove the first one. history_qos_.depth=1, a_change->sourceTimestamp=1725831241.97925046, first_change->sourceTimestamp=1725831240.997453690
As the instance is ordered by source timestamp, we can always remove the first one. history_qos_.depth=1, a_change->sourceTimestamp=1725831241.198400104, first_change->sourceTimestamp=1725831241.97925046
Received change is older than oldest, and should be discarded. history_qos_.depth=1, a_change->sourceTimestamp=1725829275.63839469, first_change->sourceTimestamp=1725831241.198400104
Received change is older than oldest, and should be discarded. history_qos_.depth=1, a_change->sourceTimestamp=1725829275.164280786, first_change->sourceTimestamp=1725831241.198400104
Received change is older than oldest, and should be discarded. history_qos_.depth=1, a_change->sourceTimestamp=1725829275.264997918, first_change->sourceTimestamp=1725831241.198400104
Received change is older than oldest, and should be discarded. history_qos_.depth=1, a_change->sourceTimestamp=1725829275.365657331, first_change->sourceTimestamp=1725831241.198400104
Received change is older than oldest, and should be discarded. history_qos_.depth=1, a_change->sourceTimestamp=1725829275.466348101, first_change->sourceTimestamp=1725831241.198400104
Message: 'Hello world' with index: '5' RECEIVED from 1.f.f7.f3.ed.58.f4.9f.0.0.0.0.0.0.1.3
Waitset Subscriber matched. Handle=1.f.f7.f3.b.59.9d.7d.0.0.0.0.0.0.1.3
Publisher running. Please press Ctrl+C to stop the Publisher at any time.
Publisher matched.
Message: 'Hello world' with index: '1' SENT
Message: 'Hello world' with index: '2' SENT
Message: 'Hello world' with index: '3' SENT
Message: 'Hello world' with index: '4' SENT
Message: 'Hello world' with index: '5' SENT
Index=5. Paused forever...
Publisher running. Please press Ctrl+C to stop the Publisher at any time.
Publisher matched.
Message: 'Hello world' with index: '1' SENT
Message: 'Hello world' with index: '2' SENT
Message: 'Hello world' with index: '3' SENT
Message: 'Hello world' with index: '4' SENT
Message: 'Hello world' with index: '5' SENT
Index=5. Paused forever...
As you can see from terminal 1, all samples from the second publisher were discarded in the method DataReaderHistory::received_change_keep_last(). I.e. the issue is present.
In my opinion, in order to reliably correct this error, you should use @ma30002000 recommendation and exclude the conditions in methods DataReaderHistory::received_change_keep_last and DataReaderHistory::completed_change_keep_last that lead to discarding samples.
To do this, you will probably have to rework the sorting criteria for the reader's history so that it does not depend on SourceTimestamp. But this will have to be done for the correct implementation of QoS BY_RECEPTION_TIMESTAMP_DESTINATIONORDER_QOS.
About the new implementation of the function void current_time_since_unix_epoch():
https://github.com/eProsima/Fast-DDS/blob/70314ceb69fe42859cd43202064ef670423dc48f/src/cpp/utils/time_t_helpers.hpp#L37-L57
I suggest not to do this and return to its previous implementation for the following reasons:
- This does not solve the issue (see the test above).
- The time adjustment is completely blocked when the FastDDS starts before the system time synchronization with the master is performed.
- Since the frequency generators for the steady and system clocks are not connected in any way, their divergence will be observed over time.
- From the user's point of view, the interface will change because the time returned in many places (for example in the
SampleInfo::source_timestamp) will no longer correspond to the usual system clocks.
Based on the above, I propose to correct the situation with the system clock where they are used and at the same time use the standard system clock (without freezing at the time of FastDDS launch).
Section 2.2.3.16 of the DDS Standard contains the following: This QoS relies on the sender and receiving applications having their clocks sufficiently synchronized. If this is not the case and the Service can detect it, the DataReader is allowed to use the reception timestamp instead of the source timestamp in its computation of the ‘expiration time.’
In order for the Service (FastDDS) to detect the lack of synchronization, for example, you can implement an additional parameter set by the user. At the same time, the user would set this parameter to the "do not use a source timestamp" state in the case when he knew for sure that the time in the system was not synchronized. This would probably be a compromise solution that would not contradict the provision of the DDS standard.
@i-and Thank you for testing.
First, let me remark one of my comments above
These changes improve resilience against
system_clockbeing updated while the application is running.
So yes, the merged PR was only addressing the case of updating the system clock after the application has been launched.
That said, I agree on reverting the implementation of current_time_since_unix_epoch due to this comment:
2. The time adjustment is completely blocked when the FastDDS starts before the system time synchronization with the master is performed.
We will try to review the implementation of the reader history to improve this situation, but in the mean time I suggest you use write_w_timestamp so you can set your own clock when writing samples.
That said, I agree on reverting the implementation of
current_time_since_unix_epoch
@i-and We did this in #5213
@MiguelCompany Thanks for the quick response.
We will try to review the implementation of the reader history to improve this situation,
When rework the reader's history, please take into account the actual state of DestinationOrderQosPolicy. Now there is a contradiction there: the value BY_RECEPTION_TIMESTAMP_DESTINATIONORDER_QOS is set, and the behavior corresponds rather to BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS.
Ideally, implement DestinationOrderQosPolicy with full support for these two options.
Hi, sorry for the long delay....
I also do agree on reverting the implementation of current_time_since_unix_epoch.
A proper implementation of DestinationOrderQosPolicy with the correct behavior would be nice ;-)
I helped myself with the following patch, allowing system_clock jumps on single host only usage of fast dds (similarly to https://github.com/eProsima/Fast-DDS/blob/master/src/cpp/rtps/common/ChangeComparison.hpp):
diff --git a/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp b/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp
index d8455e497..94ae73ad2 100644
--- a/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp
+++ b/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp
@@ -274,7 +274,9 @@ bool DataReaderHistory::received_change_keep_last(
{
// Try to substitute the oldest sample.
CacheChange_t* first_change = instance_changes.at(0);
- if (a_change->sourceTimestamp >= first_change->sourceTimestamp)
+ if (a_change->writerGUID == first_change->writerGUID ?
+ a_change->sequenceNumber >= first_change->sequenceNumber :
+ a_change->sourceTimestamp >= first_change->sourceTimestamp)
{
// As the instance is ordered by source timestamp, we can always remove the first one.
ret_value = remove_change_sub(first_change);
@@ -808,7 +810,9 @@ bool DataReaderHistory::completed_change_keep_last(
{
// Try to substitute the oldest sample.
CacheChange_t* first_change = instance_changes.at(0);
- if (change->sourceTimestamp >= first_change->sourceTimestamp)
+ if (change->writerGUID == first_change->writerGUID ?
+ change->sequenceNumber >= first_change->sequenceNumber :
+ change->sourceTimestamp >= first_change->sourceTimestamp)
{
// As the instance is ordered by source timestamp, we can always remove the first one.
ret_value = remove_change_sub(first_change);
This should make sample substitution consistent with the history ordering in my opinion.
@ma30002000 Thank you very much for that diff!
You could in fact use if (rtps::history_order_cmp(first_change, change)), and I do think that would fix the case of changing the system clock while the writer is running.
@MiguelCompany Yes, the attached patch (together with the changes in #5018 + patching of high_resolution_clock) fixed the issue in my specific use case (single host system).
According to our CONTRIBUTING.md guidelines, I am closing this issue due to seaming to be resolved. Please feel free to reopen it if necessary.