cyclonedds icon indicating copy to clipboard operation
cyclonedds copied to clipboard

RHC RESOURCE_LIMITS bug

Open jason1024po opened this issue 1 year ago • 0 comments

If the max_instances of RHC is set to 3, sending more than 3 instances will trigger a stuck RHC problem.

Steps

  • Use the helloworld examples.
  • DataReader set dds_qset_resource_limits(qos, DDS_LENGTH_UNLIMITED, 3, DDS_LENGTH_UNLIMITED);
  while (true)
  {
    rc = dds_take (reader, samples, infos, MAX_SAMPLES, MAX_SAMPLES);
    if (rc < 0)
      DDS_FATAL("dds_read: %s\n", dds_strretcode(-rc));

    /* Check if we read some data and it is valid. */
    if ((rc > 0) && (infos[0].valid_data))
    {
      /* Print Message. */
      msg = (HelloWorldData_Msg*) samples[0];
      printf ("=== [Subscriber] Received : ");
      printf ("Message (%"PRId32", %s)\n", msg->userID, msg->message);
      fflush (stdout);
    }
    else
    {
      /* Polling sleep. */
      dds_sleepfor (DDS_MSECS (100));
    }
  }
  • DataWriter: Cyclic data sending, instances are 4.
 while (true)
  {    
    for (size_t i = 0; i < 4; i++)
    {        
        msg.userID = i;

        rc = dds_write (writer, &msg);        
        if (rc != DDS_RETCODE_OK) {
          printf("dds_write: %s\n", dds_strretcode(-rc));
        } else {
          printf("msg.userID = %d, msg.message = %s\n", msg.userID, msg.message);
        }
        dds_sleepfor(DDS_MSECS(500));
    }
  }

Bug Trigger Conditions

rhc call stack is: deliver_locally_allinsync->deliver_locally_fastpath->ddsi_rhc_store->dds_rhc_default_store->rhc_store_new_instance

  1. rhc_store_new_instance return the RHC_REJECTED
  2. dds_rhc_default_store return the false
  3. deliver_locally_fastpath return the DDS_RETCODE_TRY_AGAIN
static dds_return_t deliver_locally_fastpath (struct ddsi_domaingv *gv, struct ddsi_entity_common *source_entity, bool source_entity_locked, struct ddsi_local_reader_ary *fastpath_rdary, const struct ddsi_writer_info *wrinfo, const struct deliver_locally_ops * __restrict ops, void *vsourceinfo)
{
        ...
        dds_return_t rc;
        while (!ddsi_rhc_store (rdary[i]->rhc, wrinfo, payload, tk))
        {
          if ((rc = ops->on_failure_fastpath (source_entity, source_entity_locked, fastpath_rdary, vsourceinfo)) != DDS_RETCODE_OK)
          {
            free_sample_after_store (gv, payload, tk);
            // if msg.userID = 3 , rc is DDS_RETCODE_TRY_AGAIN
            return rc; 
          }
        }
      } while (rdary[++i] && rdary[i]->type == type);
      free_sample_after_store (gv, payload, tk);
    }
  }
  return DDS_RETCODE_OK;
}
dds_return_t deliver_locally_allinsync (struct ddsi_domaingv *gv, struct ddsi_entity_common *source_entity, bool source_entity_locked, struct ddsi_local_reader_ary *fastpath_rdary, const struct ddsi_writer_info *wrinfo, const struct deliver_locally_ops * __restrict ops, void *vsourceinfo)
{
  dds_return_t rc;
  /* FIXME: Retry loop for re-delivery of rejected reliable samples is a bad hack
     should instead throttle back the writer by skipping acknowledgement and retry */
  do {
    ddsrt_mutex_lock (&fastpath_rdary->rdary_lock);
    if (fastpath_rdary->fastpath_ok)
    {
      EETRACE (source_entity, " => EVERYONE\n");
      if (fastpath_rdary->rdary[0])
        rc = deliver_locally_fastpath (gv, source_entity, source_entity_locked, fastpath_rdary, wrinfo, ops, vsourceinfo);
      else
        rc = DDS_RETCODE_OK;
      ddsrt_mutex_unlock (&fastpath_rdary->rdary_lock);
    }
    else
    {
      ddsrt_mutex_unlock (&fastpath_rdary->rdary_lock);
      rc = deliver_locally_slowpath (gv, source_entity, source_entity_locked, wrinfo, ops, vsourceinfo);
    }
  } while (rc == DDS_RETCODE_TRY_AGAIN);
  return rc;
}

jason1024po avatar Dec 29 '23 14:12 jason1024po