Issues with streams during rolling deploy (Orleans.Streams.QueueCacheMissException)
I've spent a couple of days fighting issues with stream events gone missing during rolling deploy. I created a minimal repro, to rule out any business logic issues, but I can still reproduce the error. It's possible that this configuration is sub-optimal or that I'm doing something wrong, so please advice me.
Setup
Orleans version: 7.2.6 Stream pub sub configuration: StreamPubSubType.ImplicitOnly Stream storage: MemoryGrainStorage for "PubSubStore" (if this is applicable with ImplicitOnly?) GrainDirectory: Redis DefaultCompatibilityStrategy: BackwardCompatible DefaultVersionSelectorStrategy: LatestVersion Using Postgres as grain storage/membership
Repro
- Start silo A
- Start sending traffic to the cluster, see producer and consumer below
- Consumer (V1) activates and starts receiving events
- Start silo B, which joins the cluster, now a total of 2 silos in the cluster
- StreamGrain (V1) will deactivate on Silo A, and activate (V2) on silo B. This happens due to the version selector strategy, plus a call to another grain method (since implicit stream subscription grains won't move automatically)
- Stop silo A after some time
- One (1)
Orleans.Streams.QueueCacheMissExceptionoccurs on Silo B at the consumers end. This is the only error I get. No error occur at the producers end - Everything seems to recover, but some events never reaches the consumer (500 events out of 7000)
Questions
- Why is this happening, and can it be prevented?
- Is there any way to recover the events that the consumer grain never received?
- Is there another configuration that would be more suitable for rolling deploys?
Error
2024-05-08 12:30:23.571 [FTL] StreamGrain - Stream failed for grain "fd4e338d-36dd-4533-99c8-909c3a4ac187" Orleans.Streams.QueueCacheMissException: Item not found in cache. Requested: [EventSequenceToken: SeqNum=638507681451236680, EventIndex=0], Low: [EventSequenceToken: SeqNum=638507681451236716, EventIndex=0], High: [EventSequenceToken: SeqNum=638507681451236729, EventIndex=0]
at Orleans.Providers.Streams.Common.PooledQueueCache.SetCursor(Cursor cursor, StreamSequenceToken sequenceToken) in /_/src/Orleans.Streaming/Common/PooledCache/PooledQueueCache.cs:line 215
at Orleans.Providers.Streams.Common.PooledQueueCache.GetCursor(StreamId streamId, StreamSequenceToken sequenceToken) in /_/src/Orleans.Streaming/Common/PooledCache/PooledQueueCache.cs:line 118
at Orleans.Providers.MemoryPooledCache`1.Cursor..ctor(PooledQueueCache cache, StreamId streamId, StreamSequenceToken token) in /_/src/Orleans.Streaming/MemoryStreams/MemoryPooledCache.cs:line 107
at Orleans.Providers.MemoryPooledCache`1.GetCacheCursor(StreamId streamId, StreamSequenceToken token) in /_/src/Orleans.Streaming/MemoryStreams/MemoryPooledCache.cs:line 170
at Orleans.Streams.PersistentStreamPullingAgent.DoHandshakeWithConsumer(StreamConsumerData consumerData, StreamSequenceToken cacheToken) in /_/src/Orleans.Streaming/PersistentStreams/PersistentStreamPullingAgent.cs:line 303
--- End of stack trace from previous location ---
Producer
var stream =
IClusterClient.GetStream<StreamEvent>(
StreamId.Create(
"Namespace.StreamEvent",
request.GrainId ) );
await stream.OnNextAsync( new StreamEvent( request.Id ) );
Consumer
[ImplicitStreamSubscription( streamNamespace: STREAM_NS )]
public class StreamGrain : IGrainBase, IStreamGrain
{
private const string STREAM_NS = "Namespace.StreamEvent";
private IPersistentState<StreamState> Storage { get; }
private ILogger<StreamGrain> Logger { get; }
public IGrainContext GrainContext { get; }
public StreamGrain(
[PersistentState( nameof( StreamGrain ) )]
IPersistentState<StreamState> storage,
IGrainContext grainContext,
ILogger<StreamGrain> logger )
{
Storage = storage;
GrainContext = grainContext;
Logger = logger;
}
public async Task OnActivateAsync( CancellationToken token )
{
LogGrainStatus( "Activating" );
var streamId = StreamId.Create( STREAM_NS, this.GetPrimaryKey() );
var stream = this.GetDefaultStreamProvider().GetStream<StreamEvent>( streamId );
await stream.SubscribeAsync(
onNextAsync: Register,
onErrorAsync: OnError,
token: Storage.State.LastStreamToken );
}
public Task OnDeactivateAsync( DeactivationReason reason, CancellationToken token )
{
LogGrainStatus( "Deactivating" );
return Task.CompletedTask;
}
public async Task Register( StreamEvent input, StreamSequenceToken token )
{
Logger.LogInformation( "Storing event: {Id}; token {Token}", input.Id, token );
Storage.State.LastStreamToken = token;
Storage.State.ReceivedEvents.Add( input.Id );
await Storage.WriteStateAsync();
}
private Task OnError( Exception ex )
{
Logger.LogCritical( ex, "Stream failed for grain {Id}", this.GetPrimaryKey() );
return Task.CompletedTask;
}
private void LogGrainStatus( string state ) =>
Logger.LogInformation(
"{State} stream grain {Id}; Stream token: {Token}; Events: {Events}",
state,
this.GetPrimaryKey(),
Storage.State.LastStreamToken,
Storage.State.ReceivedEvents.Count );
}
We are also getting this error during a rolling deploy. We dont have very high frequency events so I dont believe we are losing events due to this, but I havent actually checked for this.
Would be great to hear some official advice here as I will likely just disable the error logging from this callback or force a hard rollout when upgrading the cluster, neither seem like great solutions tbh.