orleans icon indicating copy to clipboard operation
orleans copied to clipboard

QueueCacheMissException: Item not found in cache.

Open wassim-k opened this issue 2 years ago • 2 comments

We've encounted QueueCacheMissException in production where it happens infrequently and at seemingly random times. I've managed to replicate it in the test code below, by basically taking the default configuration for streams and dividing it by 300s in order to maintain the original ratio.

[Fact]
public async Task Test()
{
    using var fixture = new ClusterFixture();
    var client = fixture.SiloHost.Services.GetRequiredService<IClusterClient>();
    var logger = fixture.SiloHost.Services.GetRequiredService<ILoggerFactory>().CreateLogger("Logger");
    var streamProvider = client.GetStreamProvider("SimpleMemoryStreamProvider");
    var stream = streamProvider.GetStream<int>("TestStream", Guid.Empty);

    foreach (var i in Enumerable.Range(6, 6))
    {
        logger.LogInformation("Sending {Value}...", i);
        await Task.Delay(TimeSpan.FromSeconds(i));
        await stream.OnNextAsync(i);
    }
}

[ImplicitStreamSubscription("TestStream")]
public class TestGrain : IGrainBase, IGrainWithGuidKey
{
    private readonly ILogger<TestGrain> _logger;
    private readonly IGrainContext _grainContext;

    public TestGrain(ILogger<TestGrain> logger, IGrainContext grainContext)
    {
        _logger = logger;
        _grainContext = grainContext;
    }

    public IGrainContext GrainContext => _grainContext;

    public async Task OnActivateAsync(CancellationToken token)
    {
        var streamProvider = this.GetStreamProvider("SimpleMemoryStreamProvider");
        var stream = streamProvider.GetStream<int>("TestStream", this.GetPrimaryKey());

        await stream.SubscribeAsync(
            (value, token) =>
            {
                _logger.LogInformation("Received {Value}", value);
                return Task.CompletedTask;
            },
            ex =>
            {
                _logger.LogError(ex, "A stream error has occurred.");
                return Task.CompletedTask;
            });
    }
}

public class ClusterFixture : IDisposable
{
    public ClusterFixture()
    {
        var builder = new TestClusterBuilder(1);

        builder.AddSiloBuilderConfigurator<TestSiloConfig>();

        Cluster = builder.Build();
        Cluster.Deploy();
        SiloHost = ((InProcessSiloHandle)Cluster.Primary).SiloHost;
    }

    public TestCluster Cluster { get; private set; }

    public IHost SiloHost { get; private set; }

    public void Dispose()
    {
        Cluster.StopAllSilos();
        Cluster.Dispose();
        GC.SuppressFinalize(this);
    }

    private class TestSiloConfig : ISiloConfigurator
    {
        public void Configure(ISiloBuilder hostBuilder)
        {
            hostBuilder
                .ConfigureLogging(logger => logger.AddDebug())
                .ConfigureServices(services =>
                {
                    services
                        .Configure<StreamCacheEvictionOptions>("SimpleMemoryStreamProvider", options =>
                        {
                            options.DataMaxAgeInCache = TimeSpan.FromSeconds(6);
                            options.DataMinTimeInCache = TimeSpan.FromSeconds(1);
                        });
                });

            hostBuilder
                .AddMemoryGrainStorageAsDefault()
                .AddMemoryGrainStorage(ProviderConstants.DEFAULT_PUBSUB_PROVIDER_NAME)
                .AddMemoryStreams<DefaultMemoryMessageBodySerializer>("SimpleMemoryStreamProvider", configure =>
                {
                    configure.ConfigurePullingAgent(ob => ob.Configure(options =>
                    {
                        options.StreamInactivityPeriod = TimeSpan.FromSeconds(6);
                    }));

                    configure.ConfigureStreamPubSub(Orleans.Streams.StreamPubSubType.ImplicitOnly);
                });
        }
    }
}
23:46:08:475	Logger: Information: Sending 6...
23:46:14:483	Logger: Information: Sending 7...
23:46:14:734	TestGrain: Information: Received 6
23:46:21:495	Logger: Information: Sending 8...
23:46:29:492	Logger: Information: Sending 9...
23:46:29:492	TestGrain: Error: A stream error has occurred.
23:46:29:492	
23:46:29:492	Orleans.Streams.QueueCacheMissException: Item not found in cache.  Requested: [EventSequenceToken: SeqNum=638111979680907519, EventIndex=0], Low: [EventSequenceToken: SeqNum=638111979680907521, EventIndex=0], High: [EventSequenceToken: SeqNum=638111979680907521, EventIndex=0]
23:46:29:492	   at Orleans.Providers.Streams.Common.PooledQueueCache.SetCursor(Cursor cursor, StreamSequenceToken sequenceToken) in /_/src/Orleans.Streaming/Common/PooledCache/PooledQueueCache.cs:line 215
23:46:29:492	   at Orleans.Providers.Streams.Common.PooledQueueCache.GetCursor(StreamId streamId, StreamSequenceToken sequenceToken) in /_/src/Orleans.Streaming/Common/PooledCache/PooledQueueCache.cs:line 118
23:46:29:492	   at Orleans.Providers.MemoryPooledCache`1.Cursor..ctor(PooledQueueCache cache, StreamId streamId, StreamSequenceToken token) in /_/src/Orleans.Streaming/MemoryStreams/MemoryPooledCache.cs:line 107
23:46:29:492	   at Orleans.Providers.MemoryPooledCache`1.GetCacheCursor(StreamId streamId, StreamSequenceToken token) in /_/src/Orleans.Streaming/MemoryStreams/MemoryPooledCache.cs:line 170
23:46:29:492	   at Orleans.Streams.PersistentStreamPullingAgent.DoHandshakeWithConsumer(StreamConsumerData consumerData, StreamSequenceToken cacheToken) in /_/src/Orleans.Streaming/PersistentStreams/PersistentStreamPullingAgent.cs:line 305
23:46:29:492	TestGrain: Information: Received 8
23:46:38:491	Logger: Information: Sending 10...
23:46:43:738	The thread 0x3ec8 has exited with code 0 (0x0).
23:46:48:498	Logger: Information: Sending 11...
23:46:48:498	TestGrain: Error: A stream error has occurred.
23:46:48:498	
23:46:48:498	Orleans.Streams.QueueCacheMissException: Item not found in cache.  Requested: [EventSequenceToken: SeqNum=638111979680907521, EventIndex=0], Low: [EventSequenceToken: SeqNum=638111979680907523, EventIndex=0], High: [EventSequenceToken: SeqNum=638111979680907523, EventIndex=0]
23:46:48:498	   at Orleans.Providers.Streams.Common.PooledQueueCache.SetCursor(Cursor cursor, StreamSequenceToken sequenceToken) in /_/src/Orleans.Streaming/Common/PooledCache/PooledQueueCache.cs:line 215
23:46:48:498	   at Orleans.Providers.Streams.Common.PooledQueueCache.GetCursor(StreamId streamId, StreamSequenceToken sequenceToken) in /_/src/Orleans.Streaming/Common/PooledCache/PooledQueueCache.cs:line 118
23:46:48:498	   at Orleans.Providers.MemoryPooledCache`1.Cursor..ctor(PooledQueueCache cache, StreamId streamId, StreamSequenceToken token) in /_/src/Orleans.Streaming/MemoryStreams/MemoryPooledCache.cs:line 107
23:46:48:498	   at Orleans.Providers.MemoryPooledCache`1.GetCacheCursor(StreamId streamId, StreamSequenceToken token) in /_/src/Orleans.Streaming/MemoryStreams/MemoryPooledCache.cs:line 170
23:46:48:498	   at Orleans.Streams.PersistentStreamPullingAgent.DoHandshakeWithConsumer(StreamConsumerData consumerData, StreamSequenceToken cacheToken) in /_/src/Orleans.Streaming/PersistentStreams/PersistentStreamPullingAgent.cs:line 305
23:46:48:498	TestGrain: Information: Received 10

wassim-k avatar Feb 05 '23 12:02 wassim-k

Hello @adityamandaleeka, These bugs were planned to be fixed in the 7.1.1 version, but the currently released version is 7.2.1. Is it planned to fix these in the next versions?

flash2048 avatar Aug 22 '23 14:08 flash2048

This issue results in missing stream message. It seems to get reproduced if your stream has message send gaps longer then StreamInactivityPeriod that is 30 min by default.

It seems that after stream deactivation due to inactivity next message will not reactivate it, it will move the cursor but be purged (see DataMaxAgeInCache), message after activates it but fails due to missing message in cache.

Tried workaround: to set StreamInactivityPeriod to multiple days (to be sure that all streams are always active, its ok in our case) - helps with test code above but doesn't work in real app

Bohdandn avatar Jan 08 '24 17:01 Bohdandn