orleans
orleans copied to clipboard
PersistentStreamPullingAgent skips over the message under a certain condition
Problem:
When publishing a message under the following conditions
- using MemoryStreams
- Stream is inactive, meaning it's not in the
PersistentStreamPullingAgent.pubSubCache
- Messages cache is empty, meaning
PersistentStreamPullingAgent.queueCache
has no messages in it.
Then the published message is lost.
How to reproduce:
here is the link to the GitHub repo demonstrating the issue https://github.com/tchelidze/Orleans_MemoryStream_LostMessage/tree/master/Orleans_MemoryStream_LostMessage
Analysis:
Consider the following scenario : We published message number 1 to the stream. Then we wait and in the meantime stream goes inactive and message cache gets purged. After that we publish message number 2 and 3 to the stream. Then the following happens.
Inside PersistentStreamPullingAgent.DoHandshakeWithConsumer (which gets called from RegisterAsStreamProducer
, remember, stream is inactive) method we retrieve the last processed message token from the consumer. in our case that would be message number 1. Then we take that token (pointing to message number 1) and call queueCache.GetCacheCursor
passing that token.
What GetCacheCursor
does is where the problem lies. Specifically PooledQueueCache.SetCursor where it checks if the oldestMessage
is passed the given token. oldestMessage
in our case would be message number 2, while token is message number 1. So that if
statement is executed on line 201. Then the interesting part comes, we check if the lastPurgedToken
is same or passed the given token. lastPurgedToken
again point to message number 1, because that was the last message that was evicted from the stream. So that if
statement also executes and PooledQueueCache.SetCursor
sets the SequenceToken
to the oldest message, which is message number 2.
Issue number 1:
As i understand, lastPurgedToken
points to the message which was evicted and no longer in the cache, so checking for sequenceToken.CompareTo(entry.Token) >= 0
does not seem correct here, instead i think it should be sequenceToken.CompareTo(entry.Token) > 0
.
Story continues.
So, back to PersistentStreamPullingAgent.DoHandshakeWithConsumer line number 315. Here the expectation is that queueCache.GetCacheCursor
gives us back the cursor that points to a last processed message, but because queueCache
no longer has the message (message number 1) it returns cursor pointing to the oldest message, which in our case would be message number 2. On line 315 we move the cursor forward (because remember, expectation was that Cursor was pointing to the last processed message). As a result, now cursor points to message number 3, and that's how message number 2 is lost.
Issue number 2:
I think in PersistentStreamPullingAgent.DoHandshakeWithConsumer, instead of blindly moving the cursor to next, we should check if it points to the same position as requestedHandshakeToken
and if it does not, then we should not move it forward.
Workaround
Only workaround i can think of is to set the StreamPullingAgentOptions.StreamInactivityPeriod
and StreamCacheEvictionOptions.DataMaxAgeInCache
to very high values to avoid the scenario where queueCache is empty and stream is inactive.
Thoughts ?