cyclonedds
cyclonedds copied to clipboard
RHC RESOURCE_LIMITS bug
If the max_instances of RHC is set to 3, sending more than 3 instances will trigger a stuck RHC problem.
Steps
- Use the helloworld examples.
- DataReader set
dds_qset_resource_limits(qos, DDS_LENGTH_UNLIMITED, 3, DDS_LENGTH_UNLIMITED);
while (true)
{
rc = dds_take (reader, samples, infos, MAX_SAMPLES, MAX_SAMPLES);
if (rc < 0)
DDS_FATAL("dds_read: %s\n", dds_strretcode(-rc));
/* Check if we read some data and it is valid. */
if ((rc > 0) && (infos[0].valid_data))
{
/* Print Message. */
msg = (HelloWorldData_Msg*) samples[0];
printf ("=== [Subscriber] Received : ");
printf ("Message (%"PRId32", %s)\n", msg->userID, msg->message);
fflush (stdout);
}
else
{
/* Polling sleep. */
dds_sleepfor (DDS_MSECS (100));
}
}
- DataWriter: Cyclic data sending, instances are 4.
while (true)
{
for (size_t i = 0; i < 4; i++)
{
msg.userID = i;
rc = dds_write (writer, &msg);
if (rc != DDS_RETCODE_OK) {
printf("dds_write: %s\n", dds_strretcode(-rc));
} else {
printf("msg.userID = %d, msg.message = %s\n", msg.userID, msg.message);
}
dds_sleepfor(DDS_MSECS(500));
}
}
Bug Trigger Conditions
rhc call stack is:
deliver_locally_allinsync->deliver_locally_fastpath->ddsi_rhc_store->dds_rhc_default_store->rhc_store_new_instance
- rhc_store_new_instance return the RHC_REJECTED
- dds_rhc_default_store return the false
- deliver_locally_fastpath return the DDS_RETCODE_TRY_AGAIN
static dds_return_t deliver_locally_fastpath (struct ddsi_domaingv *gv, struct ddsi_entity_common *source_entity, bool source_entity_locked, struct ddsi_local_reader_ary *fastpath_rdary, const struct ddsi_writer_info *wrinfo, const struct deliver_locally_ops * __restrict ops, void *vsourceinfo)
{
...
dds_return_t rc;
while (!ddsi_rhc_store (rdary[i]->rhc, wrinfo, payload, tk))
{
if ((rc = ops->on_failure_fastpath (source_entity, source_entity_locked, fastpath_rdary, vsourceinfo)) != DDS_RETCODE_OK)
{
free_sample_after_store (gv, payload, tk);
// if msg.userID = 3 , rc is DDS_RETCODE_TRY_AGAIN
return rc;
}
}
} while (rdary[++i] && rdary[i]->type == type);
free_sample_after_store (gv, payload, tk);
}
}
return DDS_RETCODE_OK;
}
dds_return_t deliver_locally_allinsync (struct ddsi_domaingv *gv, struct ddsi_entity_common *source_entity, bool source_entity_locked, struct ddsi_local_reader_ary *fastpath_rdary, const struct ddsi_writer_info *wrinfo, const struct deliver_locally_ops * __restrict ops, void *vsourceinfo)
{
dds_return_t rc;
/* FIXME: Retry loop for re-delivery of rejected reliable samples is a bad hack
should instead throttle back the writer by skipping acknowledgement and retry */
do {
ddsrt_mutex_lock (&fastpath_rdary->rdary_lock);
if (fastpath_rdary->fastpath_ok)
{
EETRACE (source_entity, " => EVERYONE\n");
if (fastpath_rdary->rdary[0])
rc = deliver_locally_fastpath (gv, source_entity, source_entity_locked, fastpath_rdary, wrinfo, ops, vsourceinfo);
else
rc = DDS_RETCODE_OK;
ddsrt_mutex_unlock (&fastpath_rdary->rdary_lock);
}
else
{
ddsrt_mutex_unlock (&fastpath_rdary->rdary_lock);
rc = deliver_locally_slowpath (gv, source_entity, source_entity_locked, wrinfo, ops, vsourceinfo);
}
} while (rc == DDS_RETCODE_TRY_AGAIN);
return rc;
}