orleans icon indicating copy to clipboard operation
orleans copied to clipboard

Orleans stream PersistentStreamPullingAgent: JsonSerializationException for Orleans.Providers.Streams.Common.EventSequenceTokenV2

Open iamsamcoder opened this issue 1 year ago • 6 comments

My app is running Orleans 3.5.1. I'm developing with docker locally and deploying to AKS.

Up to this point, my client and silo have ran in the same process and streaming has worked without issue. I recently created an external client to push stream messages from another service. Since this change, my messages are failing in AKS only, they work fine running locally in Docker.

Here is the exception message:

[{"severityLevel":"Error","outerId":"0","message":"Unable to find a constructor to use for type Orleans.Providers.Streams.Common.EventSequenceTokenV2. A class should either have a default constructor, one constructor with arguments or a constructor marked with the JsonConstructor attribute. Path 'SequenceToken.SequenceNumber', line 1, position 239.","type":"Newtonsoft.Json.JsonSerializationException","id":"35356266",
"parsedStack":[{"assembly":"System.Private.CoreLib, Version=6.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e","method":"System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw","level":0,"line":0},
{"assembly":"System.Private.CoreLib, Version=6.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e","method":"System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess","level":1,"line":0},
{"assembly":"System.Private.CoreLib, Version=6.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e","method":"System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification","level":2,"line":0},
{"assembly":"Orleans.Core, Version=2.0.0.0, Culture=neutral, PublicKeyToken=null","method":"Orleans.Internal.OrleansTaskExtentions+<<ToTypedTask>g__ConvertAsync|4_0>d`1.MoveNext","level":3,"line":0},
{"assembly":"System.Private.CoreLib, Version=6.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e","method":"System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw","level":4,"line":0},
{"assembly":"System.Private.CoreLib, Version=6.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e","method":"System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess","level":5,"line":0},
{"assembly":"System.Private.CoreLib, Version=6.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e","method":"System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification","level":6,"line":0},
{"assembly":"Orleans.Runtime, Version=2.0.0.0, Culture=neutral, PublicKeyToken=null","method":"Orleans.Streams.PersistentStreamPullingAgent+<DeliverBatchToConsumer>d__43.MoveNext","level":7,"line":0},
{"assembly":"System.Private.CoreLib, Version=6.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e","method":"System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw","level":8,"line":0},
{"assembly":"System.Private.CoreLib, Version=6.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e","method":"System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess","level":9,"line":0},
{"assembly":"System.Private.CoreLib, Version=6.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e","method":"System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification","level":10,"line":0},
{"assembly":"Orleans.Core, Version=2.0.0.0, Culture=neutral, PublicKeyToken=null","method":"Orleans.Internal.AsyncExecutorWithRetries+<ExecuteWithRetriesHelper>d__4`1.MoveNext","level":11,"line":0},
{"assembly":"System.Private.CoreLib, Version=6.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e","method":"System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw","level":12,"line":0},
{"assembly":"Orleans.Core, Version=2.0.0.0, Culture=neutral, PublicKeyToken=null","method":"Orleans.Internal.AsyncExecutorWithRetries+<ExecuteWithRetriesHelper>d__4`1.MoveNext","level":13,"line":0},
{"assembly":"System.Private.CoreLib, Version=6.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e","method":"System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw","level":14,"line":0},
{"assembly":"System.Private.CoreLib, Version=6.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e","method":"System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess","level":15,"line":0},
{"assembly":"System.Private.CoreLib, Version=6.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e","method":"System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification","level":16,"line":0},
{"assembly":"Orleans.Runtime, Version=2.0.0.0, Culture=neutral, PublicKeyToken=null","method":"Orleans.Streams.PersistentStreamPullingAgent+<RunConsumerCursor>d__41.MoveNext","level":17,"line":0
}]
}
]

The external client pushes a message without a sequence token:

var streamProvider = _client.GetStreamProvider(_followUpStreamOptions.StreamProviderName);

IAsyncObserver<FollowUpMonitorMessage> observer =
                streamProvider.GetStream<FollowUpMonitorMessage>(grainIdToMessage,
                    _followUpStreamOptions.StreamNamespace);

await observer.OnNextAsync(message);

Event though it shouldn't be necessary, I've tried adding all the same packages (except Orleans server, kubernetes, and event sourcing which are only in the silo app) to both the client and silo just in case, to ensure the same packages are used by both.

Client:

 <PackageReference Include="Microsoft.Orleans.Client" Version="3.5.1" />
    <PackageReference Include="Microsoft.Orleans.Clustering.AzureStorage" Version="3.5.1" />
    <PackageReference Include="Microsoft.Orleans.CodeGenerator.MSBuild" Version="3.5.1">
      <PrivateAssets>all</PrivateAssets>
      <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
    </PackageReference>
    <PackageReference Include="Microsoft.Orleans.Core" Version="3.5.1" />
    <PackageReference Include="Microsoft.Orleans.Core.Abstractions" Version="3.5.1" />
    <PackageReference Include="Microsoft.Orleans.OrleansRuntime" Version="3.5.1" />
    <PackageReference Include="Microsoft.Orleans.Persistence.AzureStorage" Version="3.5.1" />
    <PackageReference Include="Microsoft.Orleans.Runtime.Abstractions" Version="3.5.1" />
    <PackageReference Include="Microsoft.Orleans.Streaming.AzureStorage" Version="3.5.1" />

Silo:

<PackageReference Include="Microsoft.Orleans.Client" Version="3.5.1" />
    <PackageReference Include="Microsoft.Orleans.Clustering.AzureStorage" Version="3.5.1" />
    <PackageReference Include="Microsoft.Orleans.CodeGenerator.MSBuild" Version="3.5.1">
      <PrivateAssets>all</PrivateAssets>
      <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
    </PackageReference>
    <PackageReference Include="Microsoft.Orleans.Core" Version="3.5.1" />
    <PackageReference Include="Microsoft.Orleans.Core.Abstractions" Version="3.5.1" />
    <PackageReference Include="Microsoft.Orleans.EventSourcing" Version="3.5.1" />
    <PackageReference Include="Microsoft.Orleans.Hosting.Kubernetes" Version="3.5.1" />
    <PackageReference Include="Microsoft.Orleans.OrleansRuntime" Version="3.5.1" />
    <PackageReference Include="Microsoft.Orleans.OrleansTelemetryConsumers.AI" Version="3.5.1" />
    <PackageReference Include="Microsoft.Orleans.Persistence.AzureStorage" Version="3.5.1" />
    <PackageReference Include="Microsoft.Orleans.Runtime.Abstractions" Version="3.5.1" />
    <PackageReference Include="Microsoft.Orleans.Server" Version="3.5.1" />
    <PackageReference Include="Microsoft.Orleans.Streaming.AzureStorage" Version="3.5.1" />

iamsamcoder avatar Sep 07 '22 23:09 iamsamcoder

@iamsamcoder thank you for opening the issue. Are you able to find a corresponding error log from where the exception is thrown? If you search your logs for that exception, you may come across the one with the source stack trace (eg, showing JSON.NET stack frames). Otherwise, are you able to show how the silo & client is being configured?

ReubenBond avatar Sep 07 '22 23:09 ReubenBond

@ReubenBond Sorry about that, here is a log of the stack trace for the call.

[21:27:16 WRN] Exception reading message Request S10.244.0.12:11111:400281526*stg/PullingAgentSystemTarget/4a8e5aa5@S4a8e5aa5->S10.244.5.26:11111:400280637*grn/OrleansGrains.GrainImplementations.ProjectionGrain/219c9bd8-5586-43ba-99b2-cf60de056b38@63f9bbb3 #97052 from remote endpoint 10.244.0.12:55448 to local endpoint 10.244.5.26:11111
Newtonsoft.Json.JsonSerializationException: Unable to find a constructor to use for type Orleans.Providers.Streams.Common.EventSequenceTokenV2. A class should either have a default constructor, one constructor with arguments or a constructor marked with the JsonConstructor attribute. Path 'SequenceToken.SequenceNumber', line 1, position 239.
   at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.CreateNewObject(JsonReader reader, JsonObjectContract objectContract, JsonProperty containerMember, JsonProperty containerProperty, String id, Boolean& createdFromNonDefaultCreator)
   at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.CreateObject(JsonReader reader, Type objectType, JsonContract contract, JsonProperty member, JsonContainerContract containerContract, JsonProperty containerMember, Object existingValue)
   at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.CreateValueInternal(JsonReader reader, Type objectType, JsonContract contract, JsonProperty member, JsonContainerContract containerContract, JsonProperty containerMember, Object existingValue)
   at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.ResolvePropertyAndCreatorValues(JsonObjectContract contract, JsonProperty containerProperty, JsonReader reader, Type objectType)
   at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.CreateObjectUsingCreatorWithParameters(JsonReader reader, JsonObjectContract contract, JsonProperty containerProperty, ObjectConstructor`1 creator, String id)
   at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.CreateObject(JsonReader reader, Type objectType, JsonContract contract, JsonProperty member, JsonContainerContract containerContract, JsonProperty containerMember, Object existingValue)
   at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.CreateValueInternal(JsonReader reader, Type objectType, JsonContract contract, JsonProperty member, JsonContainerContract containerContract, JsonProperty containerMember, Object existingValue)
   at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.Deserialize(JsonReader reader, Type objectType, Boolean checkAdditionalContent)
   at Newtonsoft.Json.JsonSerializer.DeserializeInternal(JsonReader reader, Type objectType)
   at Newtonsoft.Json.JsonConvert.DeserializeObject(String value, Type type, JsonSerializerSettings settings)
   at Orleans.Serialization.SerializationManager.FallbackDeserializer(IDeserializationContext context, Type expectedType)
   at Orleans.Serialization.SerializationManager.DeserializeInner[TContext,TReader](SerializationManager sm, Type expected, TContext context, TReader reader)
   at Orleans.Serialization.BuiltInTypes.DeserializeImmutable[T](Type expected, IDeserializationContext context)
   at Orleans.Serialization.SerializationManager.DeserializeInner[TContext,TReader](SerializationManager sm, Type expected, TContext context, TReader reader)
   at Orleans.Serialization.BuiltInTypes.DeserializeInvokeMethodRequest(Type expected, IDeserializationContext context)
   at Orleans.Serialization.SerializationManager.DeserializeInner[TContext,TReader](SerializationManager sm, Type expected, TContext context, TReader reader)
   at Orleans.Runtime.Messaging.MessageSerializer.OrleansSerializer`1.Deserialize(ReadOnlySequence`1 input, T& value)
   at Orleans.Runtime.Messaging.MessageSerializer.TryRead(ReadOnlySequence`1& input, Message& message)
   at Orleans.Runtime.Messaging.Connection.ProcessIncoming()

iamsamcoder avatar Sep 08 '22 04:09 iamsamcoder

Here is the client configuration, which is registered as a IHostedService.

 public OrleansClusterClientHostedService(ILogger<OrleansClusterClientHostedService> logger, IConfiguration configuration)
        {
            _logger = logger;
            
            _logger.LogInformation("OrleansClusterClientHostedService constructor called");

            var connString = configuration["AZURE_STORAGE_CONNECTION_STRING"];

            Client = new ClientBuilder()
                .Configure<ClusterOptions>(options =>
                {
                    options.ClusterId = configuration["OrleansClusterId"];
                    options.ServiceId = configuration["OrleansServiceId"]; 
                })
                .Configure<SerializationProviderOptions>(options =>
                {
                    options.FallbackSerializationProvider = typeof(OrleansJsonSerializer).GetTypeInfo();
                })
                .AddAzureQueueStreams("AzureQueueProvider", opts => opts.Configure<IOptions<ClusterOptions>>((options, dep) =>
                {
                    options.ConnectionString = connString;
                    options.QueueNames = Enumerable.Range(0, 8)
                        .Select(n => $"stream-azurequeueprovider-{n}").ToList();
                }))
                .UseAzureStorageClustering(options => options.ConnectionString = connString)
                .ConfigureApplicationParts(parts => parts.AddApplicationPart(typeof(IDocumentGrain).Assembly))
                .Build();
        }

        public async Task StartAsync(CancellationToken cancellationToken)
        {
            _logger.LogInformation("Orleans cluster client hosted service StartAysnc");
            
            await Client.Connect(RetrySiloUnavailable);
        }

        public async Task StopAsync(CancellationToken cancellationToken)
        {
            await Client.Close();

            Client.Dispose();
        }

iamsamcoder avatar Sep 08 '22 05:09 iamsamcoder

And here's the silo streams configuration:

private static void SetupOrleansStreams(ISiloBuilder siloBuilder, string AZURE_STORAGE_CONNECTION_STRING)
        {
            siloBuilder.AddAzureQueueStreams("AzureQueueProvider", configurator =>
            {
                configurator.ConfigureAzureQueue(builder => builder.Configure(options =>
                {
                    options.ConnectionString = AZURE_STORAGE_CONNECTION_STRING;

                    options.QueueNames = Enumerable.Range(0, 8)
                        .Select(n => $"stream-azurequeueprovider-{n}").ToList();
                }));
                configurator.ConfigureCacheSize(1024);
                configurator.ConfigurePullingAgent(ob => ob.Configure(options =>
                {
                    options.GetQueueMsgsTimerPeriod = TimeSpan.FromMilliseconds(200);
                    options.BatchContainerBatchSize = 256;
                }));
            });
            siloBuilder.AddAzureBlobGrainStorage("PubSubStore",
                options => { options.ConnectionString = AZURE_STORAGE_CONNECTION_STRING; });
        }

And the silo serialization config


siloBuilder.Configure<SerializationProviderOptions>(options =>
                {
                    options.FallbackSerializationProvider = typeof(OrleansJsonSerializer).GetTypeInfo();
                });

iamsamcoder avatar Sep 08 '22 05:09 iamsamcoder

That serialization configuration is the problem here. I suggest you have a custom serialization provider which uses JSON only for types in your assemblies. You can copy the OrleansJsonSerializer or delegate to an instance of it, but from your IsSupportedType(Type) method, you would check the type's assembly to make sure it's from one of your dlls.

ReubenBond avatar Sep 08 '22 16:09 ReubenBond

Thanks @ReubenBond! I may not understand correctly, but I've only registered the OrleansJsonSerializer as the fallback serializer. Wouldn't the type in question Orleans.Providers.Streams.Common.EventSequenceTokenV2 have an automatically generated serializer from Orleans and not use the fallback?

iamsamcoder avatar Sep 08 '22 23:09 iamsamcoder

Resolved by creating customer serializer per Reuben's suggestion.

iamsamcoder avatar Sep 26 '22 23:09 iamsamcoder