Orleans.Streams.Kafka icon indicating copy to clipboard operation
Orleans.Streams.Kafka copied to clipboard

Deserialize string value from external topic

Open szingg opened this issue 5 years ago • 1 comments

I'm Producing string Values like new ProducerBuilder<string, string>(config).ProduceAsync("my-topic", new Message<string, string> { Key = "my-key", Value = "Hello" });

The stream is consumed inside a grain.

Using Version 2.0.2 messages are not delivered. While debugging i get a System.InvalidCastException: 'Object must implement IConvertible.' exception in KafkaExternalBatchContainer line 53

Using Version 3.0.0-dev-93 i get following exception Exception while retrying the 2th time reading from queue test-topic_0-322024159 Newtonsoft.Json.JsonReaderException: Unexpected character encountered while parsing value: H. Path '', line 1, position 1. at Newtonsoft.Json.JsonTextReader.ReadStringValue(ReadType readType) at Newtonsoft.Json.JsonTextReader.ReadAsString() at Newtonsoft.Json.JsonReader.ReadForType(JsonContract contract, Boolean hasConverter) at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.Deserialize(JsonReader reader, Type objectType, Boolean checkAdditionalContent) at Newtonsoft.Json.JsonSerializer.DeserializeInternal(JsonReader reader, Type objectType) at Orleans.Streams.Utils.Serialization.JsonExternalStreamDeserializer.Deserialize(QueueProperties queueProps, Type type, Byte[] data) at Orleans.Streams.Kafka.Consumer.ConsumeResultExtensions.ToBatchContainer(ConsumeResult2 result, SerializationContext serializationContext, QueueProperties queueProperties) at Orleans.Streams.Kafka.Core.KafkaAdapterReceiver.PollForMessages(Int32 maxCount, CancellationTokenSource cancellation) at Orleans.Streams.PersistentStreamPullingAgent.ReadFromQueue(QueueId myQueueId, IQueueAdapterReceiver rcvr, Int32 maxCacheAddCount) in D:\build\agent\_work\25\s\src\Orleans.Runtime\Streams\PersistentStream\PersistentStreamPullingAgent.cs:line 435 at Orleans.AsyncExecutorWithRetries.ExecuteWithRetriesHelper[T](Funn2 function, Int32 callCounter, Int32 maxNumSuccessTries, Int32 maxNumErrorTries, TimeSpan maxExecutionTime, DateTime startExecutionTime, Func3 retryValueFilter, Func3 retryExceptionFilter, IBackoffProvider onSuccessBackOff, IBackoffProvider onErrorBackOff) in D:\build\agent\_work\25\s\src\Orleans.Core\Async\AsyncExecutorWithRetries.cs:line 143

szingg avatar Aug 06 '19 09:08 szingg

this issue may even occur for all primitive types

szingg avatar Aug 06 '19 11:08 szingg