orleans icon indicating copy to clipboard operation
orleans copied to clipboard

PersistentStreamPullingAgent skips over the message under a certain condition

Open tchelidze opened this issue 9 months ago • 6 comments

Problem:

When publishing a message under the following conditions

  • using MemoryStreams
  • Stream is inactive, meaning it's not in the PersistentStreamPullingAgent.pubSubCache
  • Messages cache is empty, meaning PersistentStreamPullingAgent.queueCache has no messages in it.

Then the published message is lost.

How to reproduce:

here is the link to the GitHub repo demonstrating the issue https://github.com/tchelidze/Orleans_MemoryStream_LostMessage/tree/master/Orleans_MemoryStream_LostMessage

Analysis:

Consider the following scenario : We published message number 1 to the stream. Then we wait and in the meantime stream goes inactive and message cache gets purged. After that we publish message number 2 and 3 to the stream. Then the following happens.

Inside PersistentStreamPullingAgent.DoHandshakeWithConsumer (which gets called from RegisterAsStreamProducer, remember, stream is inactive) method we retrieve the last processed message token from the consumer. in our case that would be message number 1. Then we take that token (pointing to message number 1) and call queueCache.GetCacheCursor passing that token.

What GetCacheCursor does is where the problem lies. Specifically PooledQueueCache.SetCursor where it checks if the oldestMessage is passed the given token. oldestMessage in our case would be message number 2, while token is message number 1. So that if statement is executed on line 201. Then the interesting part comes, we check if the lastPurgedToken is same or passed the given token. lastPurgedToken again point to message number 1, because that was the last message that was evicted from the stream. So that if statement also executes and PooledQueueCache.SetCursor sets the SequenceToken to the oldest message, which is message number 2.

Issue number 1: As i understand, lastPurgedToken points to the message which was evicted and no longer in the cache, so checking for sequenceToken.CompareTo(entry.Token) >= 0 does not seem correct here, instead i think it should be sequenceToken.CompareTo(entry.Token) > 0.

Story continues.

So, back to PersistentStreamPullingAgent.DoHandshakeWithConsumer line number 315. Here the expectation is that queueCache.GetCacheCursor gives us back the cursor that points to a last processed message, but because queueCache no longer has the message (message number 1) it returns cursor pointing to the oldest message, which in our case would be message number 2. On line 315 we move the cursor forward (because remember, expectation was that Cursor was pointing to the last processed message). As a result, now cursor points to message number 3, and that's how message number 2 is lost.

Issue number 2:

I think in PersistentStreamPullingAgent.DoHandshakeWithConsumer, instead of blindly moving the cursor to next, we should check if it points to the same position as requestedHandshakeToken and if it does not, then we should not move it forward.

Workaround

Only workaround i can think of is to set the StreamPullingAgentOptions.StreamInactivityPeriod and StreamCacheEvictionOptions.DataMaxAgeInCache to very high values to avoid the scenario where queueCache is empty and stream is inactive.

Thoughts ?

tchelidze avatar May 24 '24 17:05 tchelidze