Deserialization error
Error : A response could not be decoded for the node - Kafka.Protocol.ProtocolException: The record batch says it has length that stops at 1049715 but the list of all batches stop at 1048641.
Throws : Could not deserialize a message for topic xyz / partition 2. Because our configuration is set to 'ConsumerErrorStrategy == Discard', we will now read from latest offset.
In my consumer, I am seeing the above error intermittently and whenever I receive this error, it discards a whole lot of messages from that partition.
So for example I receive this error in partition -2 offset (5100), whenever it resumes to consume from that partition it starts from the latest for example offset (9000). It discards the 4000 messages and I lose them.
Can you help me by explaining what may be wrong or what your library does in this case, and any ideas how to overcome such scenario.
Hello, The consumer is not able to deserialize the batch of messages because the batch lenght does not seem to match. This should not happen. How are those messages produced? In such case it is usually not a good idea to retry from the culprit offset because if the batch stored on the servers is permanently broken, you will be reading the failing batch in a loop. The current discard strategy is quite basic: as the driver does not know how many messages are corrupted, it will just discard all of them and go to the latest offset.
This is probably because of the following optimization:
the server is allowed to return a partial message at the end of the message set
This was fixed internally. We need to make a new release.
@verdie-g when can I expect the new release ? I hope it will be soon. Please fill me on that.
I just need to merge #47
The partial message thing was already taken care of when deserialising message sets. The new problem was introduced when supporting record batches?
The partial message thing was already taken care of when deserialising message sets. The new problem was introduced when supporting record batches?
Precisely. Should be ok with the fix.
hi,
when i subscribe to topic, k# throws exception like below
[2020-07-24 14:45:48] ERROR A response could not be decoded for the node (Id:2 Host:localhostt Port:9092): Kafka.Protocol.UncompressException: Invalid compressed data.
---> System.IO.InvalidDataException: Input is not a valid snappy-compressed block
at Snappy.SnappyCodec.Uncompress(Byte[] input, Int32 offset, Int32 length, Byte[] output, Int32 outOffset)
at Kafka.Protocol.Basics.Uncompress(ReusableMemoryStream uncompressed, Byte[] body, Int32 offset, Int32 length, CompressionCodec codec)
--- End of inner exception stack trace ---
at Kafka.Protocol.Basics.Uncompress(ReusableMemoryStream uncompressed, Byte[] body, Int32 offset, Int32 length, CompressionCodec codec)
at Kafka.Protocol.FetchPartitionResponse.LazyDeserializeMessageSet(ReusableMemoryStream stream, Int32 messageSetSize, Tuple`2 deserializers)+MoveNext()
at System.Collections.Generic.List`1.InsertRange(Int32 index, IEnumerable`1 collection)
at System.Collections.Generic.List`1.AddRange(IEnumerable`1 collection)
at Kafka.Protocol.FetchPartitionResponse.DeserializeMessageSet(ReusableMemoryStream stream, Tuple`2 deserializers)
at Kafka.Protocol.FetchPartitionResponse.Deserialize(ReusableMemoryStream stream, Object extra, ApiVersion version)
at Kafka.Protocol.TopicData`1.Deserialize(ReusableMemoryStream stream, Object extra, ApiVersion version)
at Kafka.Protocol.Basics.DeserializeArrayExtra[TData](ReusableMemoryStream stream, Object extra, ApiVersion version)
at Kafka.Protocol.CommonResponse`1.Deserialize(ReusableMemoryStream stream, Object extra, ApiVersion version)
at Kafka.Protocol.FetchResponse.Deserialize(ReusableMemoryStream stream, Object extra, ApiVersion version)
at Kafka.Cluster.Node.Serialization.DeserializeResponse[TResponse](Int32 correlationId, ReusableMemoryStream data, ApiVersion version)
at Kafka.Cluster.Node.ProcessFetchResponse(Int32 correlationId, ReusableMemoryStream responseData, IBatchByTopic`1 originalRequest, ApiVersion version)
[2020-07-24 14:45:48] ERROR Could not deserialize a message for topic mytopic / partition 0. Because our configuration is set to 'ConsumerErrorStrategy == Discard', we will now read from latest offset.
here my configs
producer
var serializationConfig = new SerializationConfig()
{
SerializeOnProduce = true
};
var serializer = new StringSerializer();
serializationConfig.SetDefaultSerializers( serializer, serializer);
var clusterClient = new ClusterClient(new Configuration
{
Seeds = _kafkaBrokerOptions.Servers,
SerializationConfig = serializationConfig,
ClientRequestTimeoutMs = (int) kafkaBrokerOptions.RequestTimeout.TotalMilliseconds,
ClientId = $"K#-{Guid.NewGuid()}__producer"
}, new ConsoleLogger());
var clusterClient.Produce("mytopic", "hello world");
consumer
var serializationConfig = new SerializationConfig()
{
SerializeOnProduce = true
};
var deserializer = new StringDeserializer();
serializationConfig.SetDefaultDeserializers(deserializer,deserializer);
var clusterClient = new ClusterClient(new Configuration
{
ErrorStrategy =ErrorStrategy.Retry,
Seeds = _kafkaBrokerOptions.Servers,
SerializationConfig = serializationConfig,
ClientRequestTimeoutMs = (int)kafkaBrokerOptions.RequestTimeout.TotalMilliseconds,
ClientId = $"K#-{Guid.NewGuid()}__consumer"
}, new ConsoleLogger());
clusterClient.Messages.Where(kr => kr.Topic == "mytopic")
.Subscribe(kr => Console.WriteLine("{0}/{1} {2}: {3}", kr.Topic, kr.Partition, kr.Offset, kr.Value as string));
var consConf = new ConsumerGroupConfiguration
{
SessionTimeoutMs = 30000,
RebalanceTimeoutMs = 20000,
DefaultOffsetToReadFrom = Offset.Lastest,
AutoCommitEveryMs = 5000
};
_clusterClient.Subscribe("test-cgid", new []{topic}, consConf);