Orleans.Streams.Kafka
Orleans.Streams.Kafka copied to clipboard
Deserialize string value from external topic
I'm Producing string Values like
new ProducerBuilder<string, string>(config).ProduceAsync("my-topic", new Message<string, string> { Key = "my-key", Value = "Hello" });
The stream is consumed inside a grain.
Using Version 2.0.2 messages are not delivered.
While debugging i get a System.InvalidCastException: 'Object must implement IConvertible.'
exception
in KafkaExternalBatchContainer
line 53
Using Version 3.0.0-dev-93 i get following exception
Exception while retrying the 2th time reading from queue test-topic_0-322024159 Newtonsoft.Json.JsonReaderException: Unexpected character encountered while parsing value: H. Path '', line 1, position 1. at Newtonsoft.Json.JsonTextReader.ReadStringValue(ReadType readType) at Newtonsoft.Json.JsonTextReader.ReadAsString() at Newtonsoft.Json.JsonReader.ReadForType(JsonContract contract, Boolean hasConverter) at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.Deserialize(JsonReader reader, Type objectType, Boolean checkAdditionalContent) at Newtonsoft.Json.JsonSerializer.DeserializeInternal(JsonReader reader, Type objectType) at Orleans.Streams.Utils.Serialization.JsonExternalStreamDeserializer.Deserialize(QueueProperties queueProps, Type type, Byte[] data) at Orleans.Streams.Kafka.Consumer.ConsumeResultExtensions.ToBatchContainer(ConsumeResult2 result, SerializationContext serializationContext, QueueProperties queueProperties) at Orleans.Streams.Kafka.Core.KafkaAdapterReceiver.PollForMessages(Int32 maxCount, CancellationTokenSource cancellation) at Orleans.Streams.PersistentStreamPullingAgent.ReadFromQueue(QueueId myQueueId, IQueueAdapterReceiver rcvr, Int32 maxCacheAddCount) in D:\build\agent\_work\25\s\src\Orleans.Runtime\Streams\PersistentStream\PersistentStreamPullingAgent.cs:line 435 at Orleans.AsyncExecutorWithRetries.ExecuteWithRetriesHelper[T](Funn2 function, Int32 callCounter, Int32 maxNumSuccessTries, Int32 maxNumErrorTries, TimeSpan maxExecutionTime, DateTime startExecutionTime, Func3 retryValueFilter, Func3 retryExceptionFilter, IBackoffProvider onSuccessBackOff, IBackoffProvider onErrorBackOff) in D:\build\agent\_work\25\s\src\Orleans.Core\Async\AsyncExecutorWithRetries.cs:line 143
this issue may even occur for all primitive types