Orleans Streaming: in [SetCursor] method, If last purged token does not exists, do not throw an exception, just start from the oldest message in cache
https://github.com/dotnet/orleans/blob/374ab206dee3a93de38f1a10ce38b85c99f0031c/src/Orleans.Streaming/Common/PooledCache/PooledQueueCache.cs#L204
Should it be changed to this:
// Check if we missed an event since we last purged the cache
var isLastPurged = this.lastPurgedToken.TryGetValue(cursor.StreamId, out var entry);
if (!isLastPurged || sequenceToken.CompareTo(entry.Token) >= 0)
{
// If the token is more recent than the last purged token, then we didn't lose anything. Start from the oldest message in cache
cursor.State = CursorStates.Set;
cursor.CurrentBlock = oldestBlock;
cursor.Index = oldestBlock.Value.OldestMessageIndex;
cursor.SequenceToken = oldestBlock.Value.GetOldestSequenceToken(cacheDataAdapter);
return;
}
else
{
throw new QueueCacheMissException(sequenceToken,
messageBlocks.Last.Value.GetOldestSequenceToken(cacheDataAdapter),
messageBlocks.First.Value.GetNewestSequenceToken(cacheDataAdapter));
}
We want to throw QueueCacheMissException if we cannot guarantee that we didn't miss any event. QueueCacheMissException should be handled by the application code, and should be read as "we may have missed some events for this stream".
The lastPurgedToken dictionnary might not contains all the purged token (only recent streams purged by this streaming agent).
We want to throw
QueueCacheMissExceptionif we cannot guarantee that we didn't miss any event.QueueCacheMissExceptionshould be handled by the application code, and should be read as "we may have missed some events for this stream".The
lastPurgedTokendictionnary might not contains all the purged token (only recent streams purged by this streaming agent).
I found during testing that when the stream is idle for a period of time (more than 20 minutes) without any messages, when a new message needs to be transmitted through the stream, it will trigger a QueueCacheMissException, the message are not distributed and will be lost. I suspect it may be related to the handling of [PurgeMetadata()]:
`private void PurgeMetadata() { var now = DateTime.UtcNow;
// Get all keys older than this.purgeMetadataInterval
foreach (var kvp in this.lastPurgedToken)
{
if (kvp.Value.TimeStamp + this.purgeMetadataInterval < now)
{
lastPurgedToken.Remove(kvp.Key);
}
}
}`
lastPurgedToken will periodically remove data that is considered expired based on the [purgeMetadataInterval],resulting in the inability to locate the last purged token when a new one arrives.