orleans
orleans copied to clipboard
QueueCacheMissException: Item not found in cache.
We've encounted QueueCacheMissException
in production where it happens infrequently and at seemingly random times.
I've managed to replicate it in the test code below, by basically taking the default configuration for streams and dividing it by 300s in order to maintain the original ratio.
[Fact]
public async Task Test()
{
using var fixture = new ClusterFixture();
var client = fixture.SiloHost.Services.GetRequiredService<IClusterClient>();
var logger = fixture.SiloHost.Services.GetRequiredService<ILoggerFactory>().CreateLogger("Logger");
var streamProvider = client.GetStreamProvider("SimpleMemoryStreamProvider");
var stream = streamProvider.GetStream<int>("TestStream", Guid.Empty);
foreach (var i in Enumerable.Range(6, 6))
{
logger.LogInformation("Sending {Value}...", i);
await Task.Delay(TimeSpan.FromSeconds(i));
await stream.OnNextAsync(i);
}
}
[ImplicitStreamSubscription("TestStream")]
public class TestGrain : IGrainBase, IGrainWithGuidKey
{
private readonly ILogger<TestGrain> _logger;
private readonly IGrainContext _grainContext;
public TestGrain(ILogger<TestGrain> logger, IGrainContext grainContext)
{
_logger = logger;
_grainContext = grainContext;
}
public IGrainContext GrainContext => _grainContext;
public async Task OnActivateAsync(CancellationToken token)
{
var streamProvider = this.GetStreamProvider("SimpleMemoryStreamProvider");
var stream = streamProvider.GetStream<int>("TestStream", this.GetPrimaryKey());
await stream.SubscribeAsync(
(value, token) =>
{
_logger.LogInformation("Received {Value}", value);
return Task.CompletedTask;
},
ex =>
{
_logger.LogError(ex, "A stream error has occurred.");
return Task.CompletedTask;
});
}
}
public class ClusterFixture : IDisposable
{
public ClusterFixture()
{
var builder = new TestClusterBuilder(1);
builder.AddSiloBuilderConfigurator<TestSiloConfig>();
Cluster = builder.Build();
Cluster.Deploy();
SiloHost = ((InProcessSiloHandle)Cluster.Primary).SiloHost;
}
public TestCluster Cluster { get; private set; }
public IHost SiloHost { get; private set; }
public void Dispose()
{
Cluster.StopAllSilos();
Cluster.Dispose();
GC.SuppressFinalize(this);
}
private class TestSiloConfig : ISiloConfigurator
{
public void Configure(ISiloBuilder hostBuilder)
{
hostBuilder
.ConfigureLogging(logger => logger.AddDebug())
.ConfigureServices(services =>
{
services
.Configure<StreamCacheEvictionOptions>("SimpleMemoryStreamProvider", options =>
{
options.DataMaxAgeInCache = TimeSpan.FromSeconds(6);
options.DataMinTimeInCache = TimeSpan.FromSeconds(1);
});
});
hostBuilder
.AddMemoryGrainStorageAsDefault()
.AddMemoryGrainStorage(ProviderConstants.DEFAULT_PUBSUB_PROVIDER_NAME)
.AddMemoryStreams<DefaultMemoryMessageBodySerializer>("SimpleMemoryStreamProvider", configure =>
{
configure.ConfigurePullingAgent(ob => ob.Configure(options =>
{
options.StreamInactivityPeriod = TimeSpan.FromSeconds(6);
}));
configure.ConfigureStreamPubSub(Orleans.Streams.StreamPubSubType.ImplicitOnly);
});
}
}
}
23:46:08:475 Logger: Information: Sending 6...
23:46:14:483 Logger: Information: Sending 7...
23:46:14:734 TestGrain: Information: Received 6
23:46:21:495 Logger: Information: Sending 8...
23:46:29:492 Logger: Information: Sending 9...
23:46:29:492 TestGrain: Error: A stream error has occurred.
23:46:29:492
23:46:29:492 Orleans.Streams.QueueCacheMissException: Item not found in cache. Requested: [EventSequenceToken: SeqNum=638111979680907519, EventIndex=0], Low: [EventSequenceToken: SeqNum=638111979680907521, EventIndex=0], High: [EventSequenceToken: SeqNum=638111979680907521, EventIndex=0]
23:46:29:492 at Orleans.Providers.Streams.Common.PooledQueueCache.SetCursor(Cursor cursor, StreamSequenceToken sequenceToken) in /_/src/Orleans.Streaming/Common/PooledCache/PooledQueueCache.cs:line 215
23:46:29:492 at Orleans.Providers.Streams.Common.PooledQueueCache.GetCursor(StreamId streamId, StreamSequenceToken sequenceToken) in /_/src/Orleans.Streaming/Common/PooledCache/PooledQueueCache.cs:line 118
23:46:29:492 at Orleans.Providers.MemoryPooledCache`1.Cursor..ctor(PooledQueueCache cache, StreamId streamId, StreamSequenceToken token) in /_/src/Orleans.Streaming/MemoryStreams/MemoryPooledCache.cs:line 107
23:46:29:492 at Orleans.Providers.MemoryPooledCache`1.GetCacheCursor(StreamId streamId, StreamSequenceToken token) in /_/src/Orleans.Streaming/MemoryStreams/MemoryPooledCache.cs:line 170
23:46:29:492 at Orleans.Streams.PersistentStreamPullingAgent.DoHandshakeWithConsumer(StreamConsumerData consumerData, StreamSequenceToken cacheToken) in /_/src/Orleans.Streaming/PersistentStreams/PersistentStreamPullingAgent.cs:line 305
23:46:29:492 TestGrain: Information: Received 8
23:46:38:491 Logger: Information: Sending 10...
23:46:43:738 The thread 0x3ec8 has exited with code 0 (0x0).
23:46:48:498 Logger: Information: Sending 11...
23:46:48:498 TestGrain: Error: A stream error has occurred.
23:46:48:498
23:46:48:498 Orleans.Streams.QueueCacheMissException: Item not found in cache. Requested: [EventSequenceToken: SeqNum=638111979680907521, EventIndex=0], Low: [EventSequenceToken: SeqNum=638111979680907523, EventIndex=0], High: [EventSequenceToken: SeqNum=638111979680907523, EventIndex=0]
23:46:48:498 at Orleans.Providers.Streams.Common.PooledQueueCache.SetCursor(Cursor cursor, StreamSequenceToken sequenceToken) in /_/src/Orleans.Streaming/Common/PooledCache/PooledQueueCache.cs:line 215
23:46:48:498 at Orleans.Providers.Streams.Common.PooledQueueCache.GetCursor(StreamId streamId, StreamSequenceToken sequenceToken) in /_/src/Orleans.Streaming/Common/PooledCache/PooledQueueCache.cs:line 118
23:46:48:498 at Orleans.Providers.MemoryPooledCache`1.Cursor..ctor(PooledQueueCache cache, StreamId streamId, StreamSequenceToken token) in /_/src/Orleans.Streaming/MemoryStreams/MemoryPooledCache.cs:line 107
23:46:48:498 at Orleans.Providers.MemoryPooledCache`1.GetCacheCursor(StreamId streamId, StreamSequenceToken token) in /_/src/Orleans.Streaming/MemoryStreams/MemoryPooledCache.cs:line 170
23:46:48:498 at Orleans.Streams.PersistentStreamPullingAgent.DoHandshakeWithConsumer(StreamConsumerData consumerData, StreamSequenceToken cacheToken) in /_/src/Orleans.Streaming/PersistentStreams/PersistentStreamPullingAgent.cs:line 305
23:46:48:498 TestGrain: Information: Received 10
Hello @adityamandaleeka, These bugs were planned to be fixed in the 7.1.1 version, but the currently released version is 7.2.1. Is it planned to fix these in the next versions?
This issue results in missing stream message. It seems to get reproduced if your stream has message send gaps longer then StreamInactivityPeriod that is 30 min by default.
It seems that after stream deactivation due to inactivity next message will not reactivate it, it will move the cursor but be purged (see DataMaxAgeInCache), message after activates it but fails due to missing message in cache.
Tried workaround: to set StreamInactivityPeriod to multiple days (to be sure that all streams are always active, its ok in our case) - helps with test code above but doesn't work in real app