rmw_fastrtps
rmw_fastrtps copied to clipboard
Reused shared memory with `PREALLOCATED_WITH_REALLOC`
On the PR https://github.com/ros2/rclcpp/pull/2624 I provide a fix to allow the user to make copies of loaned messages, keeping them beyond the subscription callback scope. With the fix, the loaned message is returned to the DDS when the user's copy of the message goes out of scope.
In the PR I add a unit test to verify that memory is not reused until all entities have returned the loan.
The tests fully pass when history memory policy is set as DYNAMIC_REUSABLE, but fails when set as PREALLOCATED_WITH_REALLOC:
[ FAILED ] 2 tests, listed below:
[ FAILED ] TestSharedMemory.user_is_last_loan_owner
[ FAILED ] TestSharedMemory.sub_is_last_loan_owner
The failures mean that a user is provided with loaned memory which has been previously provided, and is still in use:
- Case 1: The user keeps copies (loaned not returned)
- Case 2: A not-spinning subscription is keeping copies in its internal buffer.
While a temporary solution would be to only use DYNAMIC_REUSABLE on shared memory, this mode has the issue that a undesired copy is performed on a multi-process system, as I describe in https://github.com/ros2/rmw_fastrtps/issues/782
So in short:
PREALLOCATED_WITH_REALLOC: Issues shared memory still in use
DYNAMIC_REUSABLE: A copy of the message happens on a multi-process system
Do you think this is a bug, or the use of PREALLOCATED_WITH_REALLOC inherently breaks proper shared memory management?
These failures can be explained by the documentation in eProsima's datasharing-delivery-constraints:
Using Data-sharing mechanism, the DataWriter’s history is shared with the DataReaders.
This means that the effective HistoryQos depth on the DataReader is, at most,
the Datawriter’s HistoryQos depth.
To avoid confusions, set the DataReaders’ history depth to a value equal or less than the DataWriter’s.
In the failure [ FAILED ] TestSharedMemory.sub_is_last_loan_owner, the subscription has set a QoS depth larger than the publisher depth, so the failure seems kind of expected.
My question is, should the subscription/reader log some warning in this case? Like some QoS mismatch event:
[WARNING]: HistoryQos depth on the DataReader should be equal or lesser than the DataWriter.
The goal would be for the user for somehow to know that if he wants shared memory to safely handle memory (i.e. its stored messages won't silently be overridden):
- Subscription depth should be <= Publisher depth
- User N copies should be <= Publisher depth
Maybe this makes sense to just be mentioned on the ROS2 documentation about shared memory & loaned messages.
Currently the user can still ask for Loaned Messages even if all the shared memory has been exhausted, like when the previous loaned messages hasn't been returned (either because the user has copies, or subscriptions has them in its buffers).
In this case, the user is provided with memory still in use, which will silently override user/subscription stored messages.
I think the right behavior would be for the publisher to throw when the user requests loans and memory has been exhausted. What do you think?
My question is, should the subscription/reader log some warning in this case? Like some QoS mismatch event: [WARNING]: HistoryQos depth on the DataReader should be equal or lesser than the DataWriter.
IMO, sounds reasonable if they are doing data-sharing. and this is probably request for Fast-DDS, and we (ROS 2) can catch the QoS incompatibility event from rmw implementation?
In this case, the user is provided with memory still in use, which will silently override user/subscription stored messages. I think the right behavior would be for the publisher to throw when the user requests loans and memory has been exhausted.
IMO this depends on the application requirement.
- If publisher stops publishing because the memory blocks are not returned by user, all the subscription waiting for the next messages and sub-system will stop immediately. if the 3rd party application put a hold on the shared memory pointer, they can stop the entire system. for me this is more like security issue. because of this, i am not sure stopping publishing would be the right way to take, including throwing exception.
- maybe publisher goes on publishing the data to overwrite the data? but there is a possibility that subscriber reads the corrupted data or overwritten one updated already by publisher. i think there has been a discussion on
is_validAPI to verify if the data is still intact on subscriber side, but not sure if this comes from DDS or vendor specific API. (I guess that is not the part of DDS, but vendor API can checkSampleIfo.valid_dataflag?) i think that thisis_validAPI would be useful for this case. - Give some time to wait until the shared memory is released in the configuration. like https://fast-dds.docs.eprosima.com/en/latest/fastdds/transport/datasharing.html#blocking-reuse-of-samples-until-acknowledged
- after all, most safest way is currently implemented that disabling loaning the memory on subscriptions.
CC: @MiguelCompany @wjwwood @clalancette @alsora
@mauropasse Currently, the only way to make the publisher block till the shared payload is not being used is using RELIABLE, and KEEP_ALL.
You can also increase extra_samples in ResourceLimitsQosPolicy to make the pool bigger (for instance, having extra_samples = depth would make the pool twice the depth).
One thing that could be done is adding a warning when a sample is marked for reuse without it being acknowledged. There is this callback in the DataWriterListener that could be of use.
My question is, should the subscription/reader log some warning in this case? Like some QoS mismatch event:
[WARNING]: HistoryQos depth on the DataReader should be equal or lesser than the DataWriter.
The problem with this is that HistoryQos is not transmitted in discovery.
Great information @MiguelCompany. I'll explore the options!
@MiguelCompany I tested these scenarios based on your suggestions:
KEEP_ALLon publisher: Shared memory isn't reused, even if loans were returned by the sub/user (since the publisher still keeps the loans). No memory corruption or duplicates detected, so it serves the purpose (but might not be the most performant option due possible dynamic allocations?)extra_samples: Works well; it extends the shared memory pool beyond the pub QoS depth. This option lets subs/users hold additional samples.
One thing that could be done is adding a warning when a sample is marked for reuse without it being acknowledged.
- I tested
RELIABLEwithon_unacknowledged_sample_removed: It didn't trigger, as the DDS sub likely sent the ACK back to pub even when loans weren’t returned? (subscription not spinning, storing messages). I overrided the API oncustom_publisher_info.hpp: CustomDataWriterListeneradding a warning, but it didn't show anything.
If the subscription is not spinning but it receives the loaned messages, I guess the ACK is still sent in DDS? So the on_unacknowledged_sample_removed wouldn't be useful in this case?
Thanks!
KEEP_ALLon publisher: Shared memory isn't reused, even if loans were returned by the sub/user (since the publisher still keeps the loans). No memory corruption or duplicates detected, so it serves the purpose (but might not be the most performant option due possible dynamic allocations?)
No dynamic allocations here. The only possible issue is that, since the publisher is blocked until the oldest sample is acknowledged (or reliability.max_blocking_time elapses), it could hinder the publication rate.
- I tested
RELIABLEwithon_unacknowledged_sample_removed: It didn't trigger, as the DDS sub likely sent the ACK back to pub even when loans weren’t returned? (subscription not spinning, storing messages). I overrided the API oncustom_publisher_info.hpp: CustomDataWriterListeneradding a warning, but it didn't show anything.
That is awkward. Was the reader also RELIABLE? Samples are automatically acknowledged for BEST_EFFORT readers.
Just in case, could you check again with the changes on this commit? I just added an error log when the writer detects the situation of a sample being removed without acknowledgement.
That is awkward. Was the reader also RELIABLE?
Yes. So this is the scenario just to clarify:
Single process: [Pub / Sub_A / Sub_B] = RELIABLE
1. Publisher (depth=1) publishes loaned messages
2. Subscription_A (depth=10) doesn't spin, stores messages
3. Subscription_B spin, user stores 10 messages
After publishing the 2nd message, the publisher is provided with same memory
used in the 1st message, despite loans not being returned. This is where we'd like to see the warning.
When is usually the ACK sent by the subscription? When executed or when message is received in DataReader? What is the relation with this callback, and the fact that asking a loan will provide memory still in use?
The only case in which I manage to see this callback in action (with also the logs you added in the commit) is setting:
Pub: best effort
Sub: reliable
[WARN] [1728658509.233333338] [pub_sub_node]: New publisher discovered on topic '/topic',
offering incompatible QoS. No messages will be sent to it. Last incompatible policy: RELIABILITY_QOS_POLICY
[INFO] [1728658509.243104727] [pub_sub_node]: Publishing: '1'
2024-10-11 14:55:09.243 [RTPS_HISTORY Error] Unacknowledged change removed: test_msgs::msg::dds_::BasicTypes_ -> Function prepare_change
[WARN] [1728658509.243487964] [rmw_fastrtps_shared_cpp]: CustomPublisherInfo: on_unacknowledged_sample_removed!!
memory reused: 0x56141ada4bc0
[INFO] [1728658509.253650552] [pub_sub_node]: Publishing: '2'
[WARN] [1728658509.253678907] [rmw_fastrtps_shared_cpp]: CustomPublisherInfo: on_unacknowledged_sample_removed!!
2024-10-11 14:55:09.253 [RTPS_HISTORY Error] Unacknowledged change removed: test_msgs::msg::dds_::BasicTypes_ -> Function prepare_change
memory reused: 0x56141ada4b50
Since my test is single process, the send_datasharing_ack() is not called, but I'm unsure if is the same ACK sender that causes the callback on_unacknowledged_sample_removed() to not be called.
I expanded the test to check multi-process behavior (pub/sub in different processes):
- Subscription not spinning (thus not returning loans):
- The
on_unacknowledged_sample_removed()callback is invoked correctly when memory is reused (since no ACK is sent to the publisher). - The user (publisher process) could be warned about memory still in use.
- The
- Subscription spinning (but user keeps messages, not returning loans):
- The callback is not invoked, even if memory is reused, likely because the subscription sent ACK to the publisher.
- Single-process:
- The callback is not invoked, regardless of whether the subscription is spinning.
So, in most cases, the on_unacknowledged_sample_removed() callback doesn't serve to notify the user about memory reuse.
A different approach to verify if memory is still in use could be:
- If the publisher knows how many same-host subscriptions exist, it could infer if memory is still in use:
- The loan must be returned X times (where X = N subscriptions + 1 for the
transient_localpublisher).
- The loan must be returned X times (where X = N subscriptions + 1 for the
Could logic be implemented to track this? What do you think @MiguelCompany
@mauropasse thank you for checking this, for us this is really interesting topic!
Subscription spinning (but user keeps messages, not returning loans): The callback is not invoked, even if memory is reused, likely because the subscription sent ACK to the publisher.
this is because that ACK is sent to the corresponding publisher when the message is taken on the subscription? right?
https://github.com/ros2/rmw_fastrtps/blob/5cb7665847bc9debfee2e8077c324ff4cc403ea0/rmw_fastrtps_shared_cpp/src/rmw_take.cpp#L98
for me, this looks like okay by design of on_unacknowledged_sample_removed? because subscription sent the ACK for that message.
i mean that on_unacknowledged_sample_removed does not tell the data is valid or not...
i think we need the is_valid API in the end, so that application can check the data integrity right before accessing the data...
Just FYI, with previous comment on https://github.com/ros2/rmw_fastrtps/issues/783#issuecomment-2407607674
Pub: best effort Sub: reliable
I do not think this is gonna communicate at all...
offering incompatible QoS. No messages will be sent to it. Last incompatible policy: RELIABILITY_QOS_POLICY
DDS detected the incompatible QoS, and no messages are delivered to this concerned subscription. because DDS cannot guarantee RELIABLE durability on this subscription.
@mauropasse @fujitatomoya This is in fact getting interesting...
this is because that ACK is sent to the corresponding publisher when the message is taken on the subscription? right?
@fujitatomoya you nailed it! It seems Fast DDS is acknowledging the sample when taking it.
So the following table is a summary on when samples are acknowledged
| Same process | Inter-process without data-sharing | Inter-process with data-sharing | |
|---|---|---|---|
| Sample acknowledged... | ... when published | ... when added to history | ... on take |
A different approach to verify if memory is still in use could be:
If the publisher knows how many same-host subscriptions exist, it could infer if memory is still in use:
- The loan must be returned X times (where X = N subscriptions + 1 for the
transient_localpublisher).Could logic be implemented to track this? What do you think @MiguelCompany
I think it would be better to extend the current mechanism and make it possible to acknowledge the sample when the loan is returned. This way we could use on_unacknowledged_sample_removed to warn the user.
Either that or expose is_valid API as @fujitatomoya suggested
i think we need the
is_validAPI in the end, so that application can check the data integrity right before accessing the data...
I thought of another possibility. The user could be warned when an item is added to the LoanManager, and the address of the sample already exists there:
https://github.com/ros2/rmw_fastrtps/blob/9d2150ffc1b549d0adc71971b7f350e325b61340/rmw_fastrtps_shared_cpp/src/rmw_take.cpp#L534-L539
I think it would be better to extend the current mechanism and make it possible to acknowledge the sample when the loan is returned. This way we could use on_unacknowledged_sample_removed to warn the user.
as Fast-DDS, i think this sounds more robust and reliable? is there any side effect for this? because doing this means, the DataReader takes more time to send the ACK to the DataWriter, i am not sure if DataWriter's behavior could be different from this...
Either that or expose is_valid API as @fujitatomoya suggested
IMO, either way (above or below) we take, is_valid API is useful for user application to check the data validity right before accessing it.
warning and callback is also useful, but user application needs to sync the state between accessor and warning event in the application.
I thought of another possibility. The user could be warned when an item is added to the LoanManager, and the address of the sample already exists there
this sounds reasonable in ROS 2 RMW implementation. if that is overwriting the same VA (unique_ptr) that already exists in the items list, we can print the warning. and if we can have the user callback here, we can notify the user via this callback.
I've been testing the approaches using add_item and on_unacknowledged_sample_removed to warn the user about its messages being overridden, and they're useful.
So we have two situations:
-
Spinning subscription, user keeps messages (outside subscription callback) which are overridden:
- Subscription process: Warning via
add_itemwhen subscription executed- Note: The warning happens after the message is overriden, published and sub executed
- Publisher process:
on_unacknowledged_...doesn't warn, since the ACK was sent when message was taken- If
send_datasharing_ackwere to be sent onrmw_return_loaned_...instead ofrmw_take_loaned_...the publisher process could warn. - This warning would happen on
rmw_publish, after the message has already been overridden.
- If
- Subscription process: Warning via
-
Not-spinning subscription, receiving and storing a duplicated message which also overridden the 1st one.
- Subscription process:
add_itemis not called, since subscription is not executed. No warning- If somehow
add_itemwere to be called on receiving the message instead of executing subscription, we could have a warn here.
- If somehow
- Publisher process:
on_unacknowledged_...triggers, generating the warning.- The warning doesn't trigger if pub/sub were in the same process!
- Subscription process:
So we have then,
- User message overridden
- Subscription process: Warning - when
execute_subscriptioncalled - Publisher process: could be also warned but requires changing when
send_datasharing_ackis done.
- Subscription process: Warning - when
- Subscription message overridden:
- Subscription process: No warning
- Publisher process: Warning (when publish called) - only applies for multi process pub/sub.
In both cases, the warnings happen after the messages has been overriden. Ideally it would be nice for the warning to happen in all processes, and when the loaned is borrowed (before overrides), but these warnings I tested above seem a step in the right direction!
--- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp
+++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp
+ void on_unacknowledged_sample_removed(
+ eprosima::fastdds::dds::DataWriter* datawriter,
+ const eprosima::fastdds::dds::InstanceHandle_t& handle) override
+ {
+ RCUTILS_LOG_WARN_NAMED(
+ "rmw_fastrtps_shared_cpp",
+ "A shared message held by a subscription has been overriden.");
+ }
+
private:
RMWPublisherEvent * publisher_event_;
};
--- a/rmw_fastrtps_shared_cpp/src/rmw_take.cpp
+++ b/rmw_fastrtps_shared_cpp/src/rmw_take.cpp
void add_item(std::unique_ptr<Item> item)
{
std::lock_guard<std::mutex> guard(mtx);
+ // Check if the new item already exists in the list
+ for (const auto& existing_item : items) {
+ if (existing_item->data_seq.buffer()[0] == item->data_seq.buffer()[0]) {
+ RCUTILS_LOG_WARN_NAMED(
+ "rmw_fastrtps_shared_cpp",
+ "Subscription recieved a message still held by the user (which was overridden)"); }}
is_valid would also be useful. Any suggestions for its implementation?
@mauropasse no major objections, i think this is better for user application. one concern is that when we are using the loaned messages, usually what we care most is performance. adding the extra checking to crawl through the existing_item list would be not ideal for the performance? especially we have a large depth in the HistoryCache?
@fujitatomoya I think it wouldn't hurt performances much.
Only in an (uncommon?) situation, there may be some CPU usage when a non-spinning subscription (with many stored messages) starts spinning, but the user doesn't discard the messages.
For each execute_subscription, an item is added to a list and compared with previous items (which would be zero, since the subscription wasn't spinning before).
If the user discards the message in the callback, the item is removed from the list. However, if the user keeps a copy of the message and doesn't return the loan, the item remains stored.
The overhead comes then from iterating over the messages that the user has kept outside of the subscription callback, when a new message is processed. So I think for most situations, it would be iterating over an empty list?
On other topic, what it worries me is that we still don't have a warning for single-process case where a subscription keeps duplicated an overridden messages.
Suppose, subscription_depth = 100 / publisher_depth = 1 -> Subscription has 100 overridden messages, all the same!
@mauropasse yeah probably the overhead can be ignored mostly when the application returns the memory to the middleware.
On other topic, what it worries me is that we still don't have a warning for single-process case where a subscription keeps duplicated an overridden messages. Suppose, subscription_depth = 100 / publisher_depth = 1 -> Subscription has 100 overridden messages, all the same!
is this really configurable with Fast-DDS? i mean History Object is coupled between DataWriter and DataReader, that means i think subscription depth is also managed with 1 in this case?
Suppose,
subscription_depth = 100/publisher_depth = 1-> Subscription has 100 overridden messages, all the same!
While this will be allowed, it is discouraged in the last paragraph of this section of the documentation. The History QoS is not currently transmitted so it cannot be checked on the reader.
is_validwould also be useful. Any suggestions for its implementation?
It would be as simple as looking for the loaned item and calling is_sample_valid on the DataReader
I think it would be better to extend the current mechanism and make it possible to acknowledge the sample when the loan is returned.
We already added this to the public roadmap here, but it's on the long term and we don't yet know when we will prioritize it.
It would be as simple as looking for the loaned item and calling is_sample_valid on the DataReader
this is implementation specific API to check the sequence number for data-sharing, right?
probably we can bring this topic to PMC meeting, and also consider RMW interface to support other RMW implementations.
@mauropasse @MiguelCompany @Barry-Xu-2018
we had a discussion with ROS PMC meeting, currently we are going to implement this rmw_fastrtps specific interface as native rmw implementation API instead of RMW. we can also provide the documentation that explains how to use and why this interface exits for user.
btw, do you guys happen to know if there is any native rmw implementation API exists in rmw_fastrtps???