confluent-kafka-dotnet
confluent-kafka-dotnet copied to clipboard
Object reference null while trying to produce to topic on higher loads during perf testing
Description
We are doing performance testing and trying to produce around 24 million messages to a topic with Partition Count 36 and Replication Factor 3. We are calling ProduceAsync method to push messages. We are getting below exception on higher loads:
System.NullReferenceException: Object reference not set to an instance of an object.
at Confluent.Kafka.Impl.SafeKafkaHandle.marshalHeaders(IEnumerable1 headers) at Confluent.Kafka.Impl.SafeKafkaHandle.Produce(String topic, Byte[] val, Int32 valOffset, Int32 valLength, Byte[] key, Int32 keyOffset, Int32 keyLength, Int32 partition, Int64 timestamp, IEnumerable1 headers, IntPtr opaque)
at Confluent.Kafka.Producer2.ProduceImpl(String topic, Byte[] val, Int32 valOffset, Int32 valLength, Byte[] key, Int32 keyOffset, Int32 keyLength, Timestamp timestamp, Partition partition, IEnumerable1 headers, IDeliveryHandler deliveryHandler)
at Confluent.Kafka.Producer2.ProduceAsync(TopicPartition topicPartition, Message2 message, CancellationToken cancellationToken)
How to reproduce
Checklist
- Confluent.Kafka 1.7.0.
Looks like a problem marshaling the headers. I don't see where a NPE could be happening there (apparently blind because apparently it is). Are your headers meaningfully different in the different produce requests? A line number would help - could get that by making your test part of the confluent-kafka-dotnet solution.
Are your headers meaningfully different in the different produce requests? No Header is same for all the requests
Below is the code snippet:
private readonly IKafkaProducer<Null, someObj> _producer;
//code logic
var headers = new Headers()
{
new Header("header1", Encoding.UTF8.GetBytes(header1Value.ToString())),
};
var msg = _mapper.Map<AuthTrxn, CustomObj>(someObj);
var kafkaMessage = new Message<Null, CustomObj> { Value = msg, Headers = headers };
await _producer.ProduceAsync(topicName, kafkaMessage);
what is the IKafkaProducer object? is it doing anything to header except pass it through to ProduceAsync (i assume not).
IKafkaProducer is just a wrapper which uses IProducer<TKey, TValue> from confluent kafka library.
is it doing anything to header except pass it through to ProduceAsync? No
@CwPiyush I tried to reproduce the issue locally on my system with 25M messages having headers, partition count= 36, and replication factor = 3 with the Benchmark test and didn't face any troubles in Producer.
To further help in the investigation, can you please do:
- Provide the sample code for your producer with the configuration that you are testing with?
- Can you try running Benchmark test as well and see if it errors out or not?
We went with other alternatives. I am closing the ticket for now.