cyclonedds icon indicating copy to clipboard operation
cyclonedds copied to clipboard

Why will the on_data_available callback be triggered again after the writer that matches the reader exits?

Open Dcnqukin opened this issue 4 years ago • 5 comments

In the latest version of the OMG DCPS specification(DDS V1.4), the content of the DATA_AVAILABLE state mentioned in section 2.2.4.1 is that the new information is valid. Why in the implementation of cyclonedds, the callback is triggered when the writer cancels the match? According to my opinion that when the writer cancels the match, it should involve the state change of SUBSCRIPTION_MATCHED, which triggers on_subscription_matched. In fact, the callback on_data_available is triggered in dds_rhc_default_unregister_wr in dds_rhc_default.c.

static void dds_rhc_default_unregister_wr (struct ddsi_rhc * __restrict rhc_common, const struct ddsi_writer_info * __restrict wrinfo)
{
  /* Only to be called when writer with ID WR_IID has died.
     If we require that it will NEVER be resurrected, i.e., that next
     time a new WR_IID will be used for the same writer, then we have
     all the time in the world to scan the cache & clean up and that
     we don't have to keep it locked all the time (even if we do it
     that way now).
     WR_IID was never reused while the built-in topics weren't getting
     generated, but those really require the same instance id for the
     same GUID if an instance still exists in some reader for that GUID.
     So, if unregistration without locking the RHC is desired, entities
     need to get two IIDs: the one visible to the application in the
     built-in topics and in get_instance_handle, and one used internally
     for tracking registrations and unregistrations. */
  struct dds_rhc_default * __restrict const rhc = (struct dds_rhc_default * __restrict) rhc_common;
  bool notify_data_available = false;
  struct rhc_instance *inst;
  struct ddsrt_hh_iter iter;
  const uint64_t wr_iid = wrinfo->iid;

  ddsrt_mutex_lock (&rhc->lock);
  TRACE ("rhc_unregister_wr_iid %"PRIx64",%d:\n", wr_iid, wrinfo->auto_dispose);
  for (inst = ddsrt_hh_iter_first (rhc->instances, &iter); inst; inst = ddsrt_hh_iter_next (&iter))
  {
    if ((inst->wr_iid_islive && inst->wr_iid == wr_iid) || lwregs_contains (&rhc->registrations, inst->iid, wr_iid))
    {
      assert (inst->wrcount > 0);
      struct trigger_info_pre pre;
      struct trigger_info_post post;
      struct trigger_info_qcond trig_qc;
      get_trigger_info_pre (&pre, inst);
      init_trigger_info_qcond (&trig_qc);
      TRACE ("  %"PRIx64":", inst->iid);
      dds_rhc_unregister (rhc, inst, wrinfo, inst->tstamp, &post, &trig_qc, &notify_data_available);
      postprocess_instance_update (rhc, &inst, &pre, &post, &trig_qc);
      TRACE ("\n");
    }
  }
  ddsrt_mutex_unlock (&rhc->lock);

  if (rhc->reader && notify_data_available)
    dds_reader_data_available_cb (rhc->reader);
}

According to my understanding, dds_rhc_default_unregister_wr is specifically used for the reader to unregister the peer writer. Perhaps it would be more appropriate to trigger the on_subscription_matched in the implementation of dds_rhc_default_unregister_wr.

Dcnqukin avatar Dec 28 '21 02:12 Dcnqukin

Hi @Dcnqukin, I tried to find the bit in 2.2.4.1 where it says that the new information is valid, but I failed to find it. All I can see in that section is "new information is available", plus some explanatory text about the distinction between "read" and "plain" communication statuses, where it says:

Read communication statuses: i.e., those that are related to arrival of data, namely DATA_ON_READERS and DATA_AVAILABLE.

which one could reasonably be interpret as saying it is triggered only as a consequence of a write operation (possibly concerning historical data, i.e. transient-local, transient or persistent). Though reasonable, I am pretty sure this is against the intent of the spec as in 2.2.4.2.2, "Changes in Read Communication Statuses", it says:

The StatusChangedFlag becomes TRUE when either a data-sample arrives or else the ViewState, SampleState, or InstanceState of any existing sample changes for any reason other than a call to DataReader::read, DataReader::take or their variants. Specifically any of the following events will cause the StatusChangedFlag to become TRUE:

  • The arrival of new data.
  • A change in the InstanceState of a contained instance. This can be caused by either:
    • The arrival of the notification that an instance has been disposed by:
    • the DataWriter that owns it if OWNERSHIP QoS kind=EXLUSIVE, or by
    • any DataWriter if OWNERSHIP QoS kind=SHARED.
  • The loss of liveliness of the DataWriter of an instance for which there is no other DataWriter.
  • The arrival of the notification that an instance has been unregistered by the only DataWriter that is known to be writing the instance.

The transition from "alive" to "not alive no writers" is clearly one of those. There is a loophole: one can argue that a change in state of an instance for which there are no samples present in the cache should not result in an "invalid sample", because the introduction of an "invalid sample" in the cache constitutes neither the arrival of a "data-sample" nor a change of state of "any existing sample".

That line of reasoning would result in a very unpleasant behaviour where an application using the read operation would always get notifications of instance state changes (because there would always be some sample), whereas an application using the take operation would not. Such behaviour only causes problems.

I convinced that the intent is that these changes should cause a DATA_AVAILABLE event and that the loophole in the spec is accidental. It makes both the middleware and the application logic simpler. Or, to put it differently, it is much easier to ignore an event than to not get triggered at all. In short, Cyclone is, in my view, correct.

it should involve the state change of SUBSCRIPTION_MATCHED, which triggers on_subscription_matched.

This it does, but it isn't the RHC that triggers that event because it is only concerned with storing the data. Signalling subscription matches is handled by the discovery/matching logic outside the RHC.

According to my understanding, dds_rhc_default_unregister_wr is specifically used for the reader to unregister the peer writer. Perhaps it would be more appropriate to trigger the on_subscription_matched in the implementation of dds_rhc_default_unregister_wr.

Not quite: writers can call a function to "unregister" an instance, and they can also lose "liveliness". In both cases you end up in dds_rhc_default_unregister_wr, but the writer remains matched. Furthermore, there is also the PUBLICATION_MATCHED event, which is SUBSCRIPTION_MATCHEDs twin, and so it makes far more sense to handle them in the same way.

--

As an aside: we're discussing the DCPS spec here, but there is also the DDSI spec. That one has something weird about topics that have no key field, where it suggests that the "unregister" and "dispose" operations do not apply to them. However, that's in direct contradiction with the DCPS spec.

DCPS came first, and DDSI is merely a protocol spec to facilitate interoperability. Such a spec must not limit the range of behaviours allowed by the DCPS spec, so the DDSI spec is an epic fail for the OMG. Fortunately, the wording in the DDSI spec is vague enough that you can argue that all DDS topics are "keyed" topics in the sense that term is used in the DDSI spec. Problem solved, except ...

Various implementations used the wrong mapping and can't change because of a need to preserve interoperability with older versions, also forcing other implementations (like Cyclone) to use the wrong mapping. Some rather well-known implementations exhibited the behaviour described in DDSI instead of that described in DCPS — and so were (possibly still are) not DDS implementations despite their claims.

With such an implementation you would not get this invalid sample and DATA_AVAILABLE event when the writer of a "keyless" topic disappears. So those broken implementations may also be a cause for your surprise. Cyclone chooses to implement the DDS spec and as a consequence, is forced to violate the DDSI spec.

eboasson avatar Dec 28 '21 08:12 eboasson

Hi @eboasson , sorry I made a mistake about DCPS section 2.2.4.1 which describe the meaning of DATA_AVAILABLE, you are correct. But i am still confused about the behavior of dds_rhc_default_unregister_wr.

As you said, the RHC not related on_subscription_matched, by the way, I think this behavior is correct. But why cyclone invoke dds_reader_data_available_cb in dds_rhc_default_unregister_wr, as my perspective, when rhc unregister writers, maybe not related to StatusChanged(i.e, DATA_READERS and DATA_AVAILABLE), and also not related new information available.

Let me tell you why I concerned about the behavior of dds_rhc_default_unregister_wr. In example of roundtrip, if we input ./RoundtripPing 0 10 in terminal and input ./RoundtripPong in another terminal. When RoundtripPing finished ping-pong, it will safety exit, and RoundtripPong will invoke on_data_available. Logically RoundtripPong shouldn’t receive any data after peer writer disposed or unregistered, but RoundtripPong will into on_data_available and cause some unexpected behavior likewise a reader not exists but still invoke dds_take(that is not the point).

Sorry I am not a native speaker of English, the earlier comment may not represent my view correctly. But I am still confused that why invoke on_data_availble when peer writers were unregistered or disposed. And thanks for your patient reply.

Dcnqukin avatar Dec 28 '21 15:12 Dcnqukin

No worries! DDS is a complicated middleware and over the years I have seen many users who misunderstood some important detail and kept having troubles because of that. You are asking , so you're not making that mistake.

Maybe you should take a step back, and ask why there is this combination of "instances" (with an "instance" and a "view" state), and "samples" (with a state). The samples are pretty obvious (it is the data that is published), and the state is also a very simple flag that is initially "not read" and changes to "read" when you read the data. But why these instances?

The reason is that it is designed around taking in sensor data, analysing, combining and enriching that data, and control actuators. (There'll usually be user interfaces as well, of course). There are many other attributes of the system that matter, such as fault-tolerance, being able to upgrade or extend it and meeting timing requirements.

Often, these sensors are simple measurement devices (like thermometers), but they can also be sophisticated subsystems. Radar systems are the original use case of DDS, back in the '80s, but the sensor suite and the processing in an autonomous vehicle is the same idea: it tracks the objects in its environment, classifying what they are, predicting their movements. And so on and so forth.

The thing is, you don't know in advance how many objects there will be, nor will you always observe every single one at the same rate (a pedestrian can be hidden behind a tree for a bit, for example). Moreover, the objects come and go. If you look at the data flow and the processing in the system, it doesn't make sense to create a new topic for each object, so you need a way to maintain a table of them where you can update each one independently and where you can add and delete entries. That's where the keys (a.k.a. "instances") come in, just like they do in a relational database.

Adding new objects/key values/instances is easy, you just publish a new key value. The "view state" in DDS exists to inform the reader whether it has seen an instance before or not, because it is very easy to provide while being useful information quite often.

Deleting data in a system like this is a very different story. The only way to communicate that something is no longer relevant is by publishing "something". It could be data, but it is a special kind of data, signalling the end of the life an instance. So in DDS, it is modelled with the "dispose" operation, which doesn't publish data (though Cyclone also has writedispose — a dispose with data, which I think is what dispose should have been), but indicates that a certain instance is no longer relevant (hence the "not alive disposed" instance state).

Such state changes are presented in the same manner as data, and passed on the application via the read/take operations. So it makes sense that "disposing" data results in DATA_AVAILABLE. It could have been designed differently, with a different event and a different operation. In a way, that would have made more sense, because then one could have said that "read" is what you use for processing incoming data, and "take" is what you use to remove disposed instances. But that's simply not how it was done.

The above describes a pure eventually-consistent shared data space, but there are also times where you just want to send messages. You sometimes need to take data from a queue. But if you take all samples for an instance, for how long do you keep the record of the instance in the RHC? If you only ever use "take" for disposed instances, the problem doesn't exist, but otherwise you need to know something else.

That "something else" is what in DDS is called the "writer registration". When a writer publishes data for an instance (or when it disposes it!) it becomes registered as a "live writer" for that particular instance. Once all writers that were publishing the instance indicate they will not being doing so in the future (the "unregister" operation), the instance can be removed from the RHC.

But now our model has three states: an instance for which data is actively being published (ALIVE), one that has been disposed (NOT_ALIVE_DISPOSED) and one that has not been disposed but for which no live writers exist anymore (NOT_ALIVE_NO_WRITERS). Sometimes, the NOT_ALIVE_NO_WRITERS state is used much like the NOT_ALIVE_DISPOSED state is, and so it is also important to pass this information on to the subscriber. It was decided that it was best to again use the same path, triggering DATA_AVAILABLE and returning a (possibly "invalid") sample from read/take.

(By the way, the spec says that you must not look at the contents of a sample if the valid_data flag is false. There's a nasty edge case with that, so Cyclone instead always sets the key fields.)

You're entirely right that this mechanism is totally overcomplicated and surprising in something as simple as a roundtrip example. In fact, this is a big problem with doing good examples for DDS: you want examples to be simple to understand, but you need to build a fairly complicated system to show why these complications are actually sensible. And as a matter of fact, I think ROS 2 fell in that trap as well ...

Apologies for the very long response — and for its being a bit abstract and not directly addressing your question. The trivial response would've been "this is what it says in the spec, so this is what Cyclone does", but that would really not have helped you understand why it is done this way.

eboasson avatar Dec 29 '21 10:12 eboasson

I truly appreciate all you clarify the relationship between instances/keys/samples, and the reason why dds_rhc_default_unregister_wr always invoke dds_reader_data_available_cb when the if-statement is TRUE, everything looks like awesome!

But these marvel discoveries still couldn’t solve my original confusion. Perhaps me should tell the story of the beginning why I paid attention to behavior of on_data_available. I wrote the following example program which based on cyclonedds/examples/helloworld/subscriber.c:

#include "dds/dds.h"
#include "HelloWorldData.h"
#include <stdio.h>
#include <string.h>
#include <stdlib.h>

/* An array of one message (aka sample in dds terms) will be used. */
#define MAX_SAMPLES 1
static HelloWorldData_Msg data[MAX_SAMPLES];
static void * samples[MAX_SAMPLES];
static dds_sample_info_t info[MAX_SAMPLES];
static int32_t count = 0;

static void on_data_available_callback(dds_entity_t reader, void *arg)
{
    /* print tips. */
    printf("trigger data_available_fn: %d\n", ++count);
    int status, samplecount;
    (void)arg;
    samplecount = dds_take (reader, samples, info, MAX_SAMPLES, MAX_SAMPLES);
    if (samplecount < 0)
    DDS_FATAL("dds_take: %s\n", dds_strretcode(-samplecount));
}

int main (int argc, char ** argv)
{
    dds_entity_t participant;
    dds_entity_t topic;
    dds_entity_t reader;
    HelloWorldData_Msg *msg;
    void *samples[MAX_SAMPLES];
    dds_sample_info_t infos[MAX_SAMPLES];
    dds_return_t rc;
    dds_qos_t *qos;
    (void)argc;
    (void)argv;
    
    /* Create a Participant. */
    participant = dds_create_participant (DDS_DOMAIN_DEFAULT, NULL, NULL);
    if (participant < 0)
    DDS_FATAL("dds_create_participant: %s\n", dds_strretcode(-participant));
    
    /* Create a Topic. */
    topic = dds_create_topic (
    participant, &HelloWorldData_Msg_desc, "HelloWorldData_Msg", NULL, NULL);
    if (topic < 0)
    DDS_FATAL("dds_create_topic: %s\n", dds_strretcode(-topic));
    
    /* Create a reliable Reader. */
    qos = dds_create_qos ();
    dds_qset_reliability (qos, DDS_RELIABILITY_RELIABLE, DDS_SECS (10));
    dds_listener_t *listener = dds_create_listener(NULL);
    dds_lset_data_available(listener, on_data_available_callback);
    reader = dds_create_reader (participant, topic, qos, listener);
    if (reader < 0)
    DDS_FATAL("dds_create_reader: %s\n", dds_strretcode(-reader));
    dds_delete_qos(qos);
    
    printf ("\n=== [Subscriber] Waiting for a sample ...\n");
    fflush (stdout);
    
    /* Initialize sample buffer, by pointing the void pointer within
    * the buffer array to a valid sample memory location. */
    samples[0] = HelloWorldData_Msg__alloc ();
    
    /* Poll until data has been read. */
    while (true)
    {
    // Endless loop until Ctrl-C to quit this process.
    ;
    }
    
    /* Free the data location. */
    HelloWorldData_Msg_free (samples[0], DDS_FREE_ALL);
    
    /* Deleting the participant will delete all its children recursively as well. */
    rc = dds_delete (participant);
    if (rc != DDS_RETCODE_OK)
    DDS_FATAL("dds_delete: %s\n", dds_strretcode(-rc));
    
    return EXIT_SUCCESS;
}

In the initial version of subscriber.c, it used poll to read data, not listener or waitset. But I tried to use listener to take data. Avoid main function exits quickly, I coded endless loop until ctrl-c to kill the process. Maybe used getchar() instead of endless loop is better.

In the implement of on_data_available_callback(), I statistic the invoke times of on_data_available_callback() as a static int32_t variable called count, and print tips of the count when on_data_available_callback() invoked every time. Then I found a shocked phenomenon —— on_data_available_callback() has been invoked twice but HelloworldPublisher only wrote one sample.

auto@ubuntu:./HelloworldPublisher 
=== [Publisher]  Waiting for a reader to be discovered ...
=== [Publisher]  Writing : Message (1, Hello World)

It's weird for me the following output result:

auto@ubuntu:./HelloworldSubscriber 

=== [Subscriber] Waiting for a sample ...
trigger data_available_fn: 1
trigger data_available_fn: 2

If this phenomenon is interpreted as correct, it means that it is difficult for me (or other users) to do some statisitical things through on_data_available.

Based on the confusion of this phenomenon, I tried to add breakpoints wherever data_available_cb is called for single-step debugging. Finally, I found after HelloworldPublisher exits at once then the dds_rhc_default_unregister_wr always invoke dds_reader_data_available_cb, that made me sure the behavior of dds_rhc_default_unregister_wr is the culprit of my confusion. By the way, through you patient replying, I think the occurrence of weird phenomenon hasn't related to dds_rhc_default_unregister_wr.

There seems to be no better way to let me (or any other user) understand the occurrence of this weird phenomenon than to ask you. If this phenomenon is correct, give us some programming suggestions to avoid misunderstandings caused by this phenomenon please. If it were otherwise, can you spend time to fix it?

In the end, truly appreciate all you patient replying for me again.

Dcnqukin avatar Dec 30 '21 02:12 Dcnqukin

Hi @Dcnqukin, thanks for showing exactly what you are doing, and now I perhaps should apologise for overthinking it (and for the delay in responding caused by my — successful! — attempt at staying away from my computer for at least a few days around New Year's).

The system is what it is, for the good reasons I tried to explain earlier. The solution therefore is to figure out why on_data_available_callback was called. That means that instead of simply counting calls you should count the number of samples received for which info[i].valid_data (for 0≤i<samplecount) is true. If it is true, then that particular "sample" returned by dds_take is a real sample, published by the application by calling dds_write; if instead it is false, then it was magically generated to indicate a change in some state caused by a disappearing publisher (or something else).

For example:

static void on_data_available_callback(dds_entity_t reader, void *arg)
{
    int status, samplecount;
    (void)arg;
    samplecount = dds_take (reader, samples, info, MAX_SAMPLES, MAX_SAMPLES);
    if (samplecount < 0)
      DDS_FATAL("dds_take: %s\n", dds_strretcode(-samplecount));
    for (int i = 0; i < samplecount; i++)
      count += info[i].valid_data;
    /* print tips. */
    printf("trigger data_available_fn: %d\n", ++count);
}

There are also other good reasons to pay attention to the result of dds_take. It is possible that multiple samples arrive while there is only a single call to on_data_available. The spec allows this, and while the current implementation in Cyclone makes this very unlikely, in some other implementations it is much more likely to happen. I have seen people get caught by that ... In the future, addressing some other issues may well cause Cyclone to change on this point.

An alternative route that I think might be possible is to use the on_liveliness_changed event to correct for overcounting in on_data_available_callback. But haven't actually tried that 🙂.

eboasson avatar Jan 03 '22 08:01 eboasson

@Dcnqukin Thank you for reaching out about this. As this has not had any further discussion for a year I will close it. Feel free to create a new issue or re-open this if you have further questions or comments!

poetinger avatar Jun 28 '23 18:06 poetinger