confluent-kafka-dotnet
confluent-kafka-dotnet copied to clipboard
System.AccessViolationException when running in .NET 7
Description
After upgrading to .NET 7 we experience regular crashes in form of System.AccessViolationException and sometimes System.BadImageFormatException.
We are using a basic Kafka consumer poll loop as presented in the examples of this repository (AvroSpecific), with both key and value serialization in Avro, including a schema registry, and SASL-based connections. Our Kafka consumers run in separate hosted services, so each of them has its own poll loop and subscribes to a single topic.
We experience the problem both with Confluent.Kafka version 1.9.3 and 2.0.2. Our Kafka cluster runs Kafka version 3.2.1.
Our workloads run containerized in AKS, using the official dotnet/aspnet:7.0 as runtime image. We do not experience any issues when running .NET 6 applications with the dotnet/aspnet:6.0 image.
The System.AccessViolationException seems to primarily appear after the first consumer.Consume(cts.Token) call right after starting the workload:
Fatal error. System.AccessViolationException: Attempted to read or write protected memory. This is often an indication that other memory is corrupt.
at System.Runtime.InteropServices.Marshal.PtrToStructureHelper(IntPtr, System.Object, Boolean)
at System.Runtime.InteropServices.Marshal.PtrToStructure[[Confluent.Kafka.Impl.rd_kafka_topic_partition, Confluent.Kafka, Version=2.0.2.0, Culture=neutral, PublicKeyToken=12c514ca49093d1e]](IntPtr)
at Confluent.Kafka.Internal.Util+Marshal.PtrToStructure[[Confluent.Kafka.Impl.rd_kafka_topic_partition, Confluent.Kafka, Version=2.0.2.0, Culture=neutral, PublicKeyToken=12c514ca49093d1e]](IntPtr)
at Confluent.Kafka.Impl.SafeKafkaHandle+<>c__DisplayClass66_0.<GetTopicPartitionOffsetErrorList>b__0(Int32)
at System.Linq.Utilities+<>c__DisplayClass2_0`3[[System.Int32, System.Private.CoreLib, Version=7.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e],[Confluent.Kafka.Impl.rd_kafka_topic_partition, Confluent.Kafka, Version=2.0.2.0, Culture=neutral, PublicKeyToken=12c514ca49093d1e],[System.__Canon, System.Private.CoreLib, Version=7.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]].<CombineSelectors>b__0(Int32)
at System.Linq.Enumerable+SelectRangeIterator`1[[System.__Canon, System.Private.CoreLib, Version=7.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]].ToList()
at Confluent.Kafka.Impl.SafeKafkaHandle.GetTopicPartitionOffsetErrorList(IntPtr)
at Confluent.Kafka.Consumer`2[[System.Int64, System.Private.CoreLib, Version=7.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e],[System.__Canon, System.Private.CoreLib, Version=7.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]].RebalanceCallback(IntPtr, Confluent.Kafka.ErrorCode, IntPtr, IntPtr)
at Confluent.Kafka.Impl.NativeMethods.NativeMethods_Centos6.rd_kafka_consumer_poll(IntPtr, IntPtr)
at Confluent.Kafka.Impl.NativeMethods.NativeMethods_Centos6.rd_kafka_consumer_poll(IntPtr, IntPtr)
at Confluent.Kafka.Impl.Librdkafka.consumer_poll(IntPtr, IntPtr)
at Confluent.Kafka.Impl.SafeKafkaHandle.ConsumerPoll(IntPtr)
at Confluent.Kafka.Consumer`2[[System.Int64, System.Private.CoreLib, Version=7.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e],[System.__Canon, System.Private.CoreLib, Version=7.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]].Consume(Int32)
at Confluent.Kafka.Consumer`2[[System.Int64, System.Private.CoreLib, Version=7.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e],[System.__Canon, System.Private.CoreLib, Version=7.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]].Consume(System.Threading.CancellationToken)
...
The demo consumer application reads from 10 topics, but we do experience the issue also when less topics are consumed. Here are the last somewhat 30 log messages (with debug="consumer,cgrp,topic,fetch"), I can also share the entire log (~1800 lines if needed):
{"@t":"2023-02-01T08:35:17.5121257Z","@m":"\"rdkafka#consumer-8\": \"[thrd:main]: Group \\\"demo-dev\\\" changed join state init -> wait-join (state up)\"; level: Debug","@i":"ecba9ed9","MessageName":"rdkafka#consumer-8","Message":"[thrd:main]: Group \"demo-dev\" changed join state init -> wait-join (state up)","MessageLevel":"Debug","SourceContext":"Demo.KafkaConsumerFactory","mask":[]}
{"@t":"2023-02-01T08:35:17.8611816Z","@m":"\"rdkafka#consumer-2\": \"[thrd:sasl_ssl://***MASKED***:24748/1]: sasl_ssl://***MASKED***:24748/1: Fetch topic dev-topic2-v1 [0] at offset 551792 (v4)\"; level: Debug","@i":"ecba9ed9","MessageName":"rdkafka#consumer-2","Message":"[thrd:sasl_ssl://***MASKED***:24748/1]: sasl_ssl://***MASKED***:24748/1: Fetch topic dev-topic2-v1 [0] at offset 551792 (v4)","MessageLevel":"Debug","SourceContext":"Demo.KafkaConsumerFactory","mask":["IPv4"]}
{"@t":"2023-02-01T08:35:17.8614007Z","@m":"\"rdkafka#consumer-2\": \"[thrd:sasl_ssl://***MASKED***:24748/1]: sasl_ssl://***MASKED***:24748/1: Fetch 1/1/1 toppar(s)\"; level: Debug","@i":"ecba9ed9","MessageName":"rdkafka#consumer-2","Message":"[thrd:sasl_ssl://***MASKED***:24748/1]: sasl_ssl://***MASKED***:24748/1: Fetch 1/1/1 toppar(s)","MessageLevel":"Debug","SourceContext":"Demo.KafkaConsumerFactory","mask":["IPv4"]}
{"@t":"2023-02-01T08:35:17.8755633Z","@m":"\"rdkafka#consumer-5\": \"[thrd:sasl_ssl://***MASKED***:24748/1]: sasl_ssl://***MASKED***:24748/1: Fetch topic dev-topic1-v1 [6] at offset 365696 (v4)\"; level: Debug","@i":"ecba9ed9","MessageName":"rdkafka#consumer-5","Message":"[thrd:sasl_ssl://***MASKED***:24748/1]: sasl_ssl://***MASKED***:24748/1: Fetch topic dev-topic1-v1 [6] at offset 365696 (v4)","MessageLevel":"Debug","SourceContext":"Demo.KafkaConsumerFactory","mask":["IPv4"]}
{"@t":"2023-02-01T08:35:17.8757189Z","@m":"\"rdkafka#consumer-5\": \"[thrd:sasl_ssl://***MASKED***:24748/1]: sasl_ssl://***MASKED***:24748/1: Fetch 1/1/2 toppar(s)\"; level: Debug","@i":"ecba9ed9","MessageName":"rdkafka#consumer-5","Message":"[thrd:sasl_ssl://***MASKED***:24748/1]: sasl_ssl://***MASKED***:24748/1: Fetch 1/1/2 toppar(s)","MessageLevel":"Debug","SourceContext":"Demo.KafkaConsumerFactory","mask":["IPv4"]}
{"@t":"2023-02-01T08:35:17.8989473Z","@m":"\"rdkafka#consumer-3\": \"[thrd:sasl_ssl://***MASKED***:24748/1]: sasl_ssl://***MASKED***:24748/1: Fetch topic dev-topic3-v1 [8] at offset 16421 (v4)\"; level: Debug","@i":"ecba9ed9","MessageName":"rdkafka#consumer-3","Message":"[thrd:sasl_ssl://***MASKED***:24748/1]: sasl_ssl://***MASKED***:24748/1: Fetch topic dev-topic3-v1 [8] at offset 16421 (v4)","MessageLevel":"Debug","SourceContext":"Demo.KafkaConsumerFactory","mask":["IPv4"]}
{"@t":"2023-02-01T08:35:17.8992252Z","@m":"\"rdkafka#consumer-3\": \"[thrd:sasl_ssl://***MASKED***:24748/1]: sasl_ssl://***MASKED***:24748/1: Fetch topic dev-topic3-v1 [9] at offset 16641 (v4)\"; level: Debug","@i":"ecba9ed9","MessageName":"rdkafka#consumer-3","Message":"[thrd:sasl_ssl://***MASKED***:24748/1]: sasl_ssl://***MASKED***:24748/1: Fetch topic dev-topic3-v1 [9] at offset 16641 (v4)","MessageLevel":"Debug","SourceContext":"Demo.KafkaConsumerFactory","mask":["IPv4"]}
{"@t":"2023-02-01T08:35:17.8992987Z","@m":"\"rdkafka#consumer-3\": \"[thrd:sasl_ssl://***MASKED***:24748/1]: sasl_ssl://***MASKED***:24748/1: Fetch 2/2/3 toppar(s)\"; level: Debug","@i":"ecba9ed9","MessageName":"rdkafka#consumer-3","Message":"[thrd:sasl_ssl://***MASKED***:24748/1]: sasl_ssl://***MASKED***:24748/1: Fetch 2/2/3 toppar(s)","MessageLevel":"Debug","SourceContext":"Demo.KafkaConsumerFactory","mask":["IPv4"]}
{"@t":"2023-02-01T08:35:17.9498300Z","@m":"\"rdkafka#consumer-4\": \"[thrd:sasl_ssl://***MASKED***:24748/1]: sasl_ssl://***MASKED***:24748/1: Fetch topic dev-topic4-v1 [0] at offset 12465 (v4)\"; level: Debug","@i":"ecba9ed9","MessageName":"rdkafka#consumer-4","Message":"[thrd:sasl_ssl://***MASKED***:24748/1]: sasl_ssl://***MASKED***:24748/1: Fetch topic dev-topic4-v1 [0] at offset 12465 (v4)","MessageLevel":"Debug","SourceContext":"Demo.KafkaConsumerFactory","mask":["IPv4"]}
{"@t":"2023-02-01T08:35:17.9500641Z","@m":"\"rdkafka#consumer-4\": \"[thrd:sasl_ssl://***MASKED***:24748/1]: sasl_ssl://***MASKED***:24748/1: Fetch topic dev-topic4-v1 [3] at offset 12448 (v4)\"; level: Debug","@i":"ecba9ed9","MessageName":"rdkafka#consumer-4","Message":"[thrd:sasl_ssl://***MASKED***:24748/1]: sasl_ssl://***MASKED***:24748/1: Fetch topic dev-topic4-v1 [3] at offset 12448 (v4)","MessageLevel":"Debug","SourceContext":"Demo.KafkaConsumerFactory","mask":["IPv4"]}
{"@t":"2023-02-01T08:35:17.9501358Z","@m":"\"rdkafka#consumer-4\": \"[thrd:sasl_ssl://***MASKED***:24748/1]: sasl_ssl://***MASKED***:24748/1: Fetch 2/2/2 toppar(s)\"; level: Debug","@i":"ecba9ed9","MessageName":"rdkafka#consumer-4","Message":"[thrd:sasl_ssl://***MASKED***:24748/1]: sasl_ssl://***MASKED***:24748/1: Fetch 2/2/2 toppar(s)","MessageLevel":"Debug","SourceContext":"Demo.KafkaConsumerFactory","mask":["IPv4"]}
{"@t":"2023-02-01T08:35:18.0548545Z","@m":"\"rdkafka#consumer-2\": \"[thrd:main]: GroupCoordinator/99: Heartbeat for group \\\"demo-dev\\\" generation id 4\"; level: Debug","@i":"ecba9ed9","MessageName":"rdkafka#consumer-2","Message":"[thrd:main]: GroupCoordinator/99: Heartbeat for group \"demo-dev\" generation id 4","MessageLevel":"Debug","SourceContext":"Demo.KafkaConsumerFactory","mask":[]}
{"@t":"2023-02-01T08:35:18.0568379Z","@m":"\"rdkafka#consumer-2\": \"[thrd:main]: Group \\\"demo-dev\\\" heartbeat error response in state up (join-state steady, 2 partition(s) assigned): Broker: Group rebalance in progress\"; level: Debug","@i":"ecba9ed9","MessageName":"rdkafka#consumer-2","Message":"[thrd:main]: Group \"demo-dev\" heartbeat error response in state up (join-state steady, 2 partition(s) assigned): Broker: Group rebalance in progress","MessageLevel":"Debug","SourceContext":"Demo.KafkaConsumerFactory","mask":[]}
{"@t":"2023-02-01T08:35:18.0569604Z","@m":"\"rdkafka#consumer-2\": \"[thrd:main]: Group \\\"demo-dev\\\" is rebalancing (EAGER) in state up (join-state steady) with 2 assigned partition(s): rebalance in progress\"; level: Debug","@i":"ecba9ed9","MessageName":"rdkafka#consumer-2","Message":"[thrd:main]: Group \"demo-dev\" is rebalancing (EAGER) in state up (join-state steady) with 2 assigned partition(s): rebalance in progress","MessageLevel":"Debug","SourceContext":"Demo.KafkaConsumerFactory","mask":[]}
{"@t":"2023-02-01T08:35:18.0569985Z","@m":"\"rdkafka#consumer-2\": \"[thrd:main]: Group \\\"demo-dev\\\" changed join state steady -> wait-unassign-call (state up)\"; level: Debug","@i":"ecba9ed9","MessageName":"rdkafka#consumer-2","Message":"[thrd:main]: Group \"demo-dev\" changed join state steady -> wait-unassign-call (state up)","MessageLevel":"Debug","SourceContext":"Demo.KafkaConsumerFactory","mask":[]}
{"@t":"2023-02-01T08:35:18.0570296Z","@m":"\"rdkafka#consumer-2\": \"[thrd:main]: Group \\\"demo-dev\\\": delegating revoke of 2 partition(s) to application on queue rd_kafka_cgrp_new: rebalance in progress\"; level: Debug","@i":"ecba9ed9","MessageName":"rdkafka#consumer-2","Message":"[thrd:main]: Group \"demo-dev\": delegating revoke of 2 partition(s) to application on queue rd_kafka_cgrp_new: rebalance in progress","MessageLevel":"Debug","SourceContext":"Demo.KafkaConsumerFactory","mask":[]}
{"@t":"2023-02-01T08:35:18.0570693Z","@m":"\"rdkafka#consumer-2\": \"[thrd:main]: Pausing fetchers for 2 assigned partition(s): rebalance\"; level: Debug","@i":"ecba9ed9","MessageName":"rdkafka#consumer-2","Message":"[thrd:main]: Pausing fetchers for 2 assigned partition(s): rebalance","MessageLevel":"Debug","SourceContext":"Demo.KafkaConsumerFactory","mask":[]}
{"@t":"2023-02-01T08:35:18.0571051Z","@m":"\"rdkafka#consumer-2\": \"[thrd:main]: Library pausing 2 partition(s)\"; level: Debug","@i":"ecba9ed9","MessageName":"rdkafka#consumer-2","Message":"[thrd:main]: Library pausing 2 partition(s)","MessageLevel":"Debug","SourceContext":"Demo.KafkaConsumerFactory","mask":[]}
{"@t":"2023-02-01T08:35:18.0571332Z","@m":"\"rdkafka#consumer-2\": \"[thrd:main]: dev-topic2-v1 [0]: rd_kafka_toppar_op_pause_resume:2262: new version barrier v5\"; level: Debug","@i":"ecba9ed9","MessageName":"rdkafka#consumer-2","Message":"[thrd:main]: dev-topic2-v1 [0]: rd_kafka_toppar_op_pause_resume:2262: new version barrier v5","MessageLevel":"Debug","SourceContext":"Demo.KafkaConsumerFactory","mask":[]}
{"@t":"2023-02-01T08:35:18.0571938Z","@m":"\"rdkafka#consumer-2\": \"[thrd:main]: Pause dev-topic2-v1 [0] (v5)\"; level: Debug","@i":"ecba9ed9","MessageName":"rdkafka#consumer-2","Message":"[thrd:main]: Pause dev-topic2-v1 [0] (v5)","MessageLevel":"Debug","SourceContext":"Demo.KafkaConsumerFactory","mask":[]}
{"@t":"2023-02-01T08:35:18.0572280Z","@m":"\"rdkafka#consumer-2\": \"[thrd:main]: dev-topic2-v1 [1]: rd_kafka_toppar_op_pause_resume:2262: new version barrier v5\"; level: Debug","@i":"ecba9ed9","MessageName":"rdkafka#consumer-2","Message":"[thrd:main]: dev-topic2-v1 [1]: rd_kafka_toppar_op_pause_resume:2262: new version barrier v5","MessageLevel":"Debug","SourceContext":"Demo.KafkaConsumerFactory","mask":[]}
{"@t":"2023-02-01T08:35:18.0572781Z","@m":"\"rdkafka#consumer-2\": \"[thrd:main]: Pause dev-topic2-v1 [1] (v5)\"; level: Debug","@i":"ecba9ed9","MessageName":"rdkafka#consumer-2","Message":"[thrd:main]: Pause dev-topic2-v1 [1] (v5)","MessageLevel":"Debug","SourceContext":"Demo.KafkaConsumerFactory","mask":[]}
{"@t":"2023-02-01T08:35:18.0573284Z","@m":"\"rdkafka#consumer-2\": \"[thrd:main]: Group \\\"demo-dev\\\": clearing group assignment\"; level: Debug","@i":"ecba9ed9","MessageName":"rdkafka#consumer-2","Message":"[thrd:main]: Group \"demo-dev\": clearing group assignment","MessageLevel":"Debug","SourceContext":"Demo.KafkaConsumerFactory","mask":[]}
{"@t":"2023-02-01T08:35:18.0573882Z","@m":"\"rdkafka#consumer-2\": \"[thrd:main]: dev-topic2-v1 [0] received op PAUSE (v5) in fetch-state active (opv4)\"; level: Debug","@i":"ecba9ed9","MessageName":"rdkafka#consumer-2","Message":"[thrd:main]: dev-topic2-v1 [0] received op PAUSE (v5) in fetch-state active (opv4)","MessageLevel":"Debug","SourceContext":"Demo.KafkaConsumerFactory","mask":[]}
{"@t":"2023-02-01T08:35:18.0574485Z","@m":"\"rdkafka#consumer-2\": \"[thrd:main]: Pause dev-topic2-v1 [0]: at offset 551792 (state active, v5)\"; level: Debug","@i":"ecba9ed9","MessageName":"rdkafka#consumer-2","Message":"[thrd:main]: Pause dev-topic2-v1 [0]: at offset 551792 (state active, v5)","MessageLevel":"Debug","SourceContext":"Demo.KafkaConsumerFactory","mask":[]}
{"@t":"2023-02-01T08:35:18.0574840Z","@m":"\"rdkafka#consumer-2\": \"[thrd:main]: dev-topic2-v1 [1] received op PAUSE (v5) in fetch-state active (opv4)\"; level: Debug","@i":"ecba9ed9","MessageName":"rdkafka#consumer-2","Message":"[thrd:main]: dev-topic2-v1 [1] received op PAUSE (v5) in fetch-state active (opv4)","MessageLevel":"Debug","SourceContext":"Demo.KafkaConsumerFactory","mask":[]}
{"@t":"2023-02-01T08:35:18.0575337Z","@m":"\"rdkafka#consumer-2\": \"[thrd:main]: Pause dev-topic2-v1 [1]: at offset 552946 (state active, v5)\"; level: Debug","@i":"ecba9ed9","MessageName":"rdkafka#consumer-2","Message":"[thrd:main]: Pause dev-topic2-v1 [1]: at offset 552946 (state active, v5)","MessageLevel":"Debug","SourceContext":"Demo.KafkaConsumerFactory","mask":[]}
{"@t":"2023-02-01T08:35:18.0575680Z","@m":"\"rdkafka#consumer-2\": \"[thrd:main]: Group \\\"demo-dev\\\" received op GET_REBALANCE_PROTOCOL in state up (join-state wait-unassign-call)\"; level: Debug","@i":"ecba9ed9","MessageName":"rdkafka#consumer-2","Message":"[thrd:main]: Group \"demo-dev\" received op GET_REBALANCE_PROTOCOL in state up (join-state wait-unassign-call)","MessageLevel":"Debug","SourceContext":"Demo.KafkaConsumerFactory","mask":[]}
{"@t":"2023-02-01T08:35:18.0582929Z","@m":"\"rdkafka#consumer-4\": \"[thrd:main]: GroupCoordinator/99: Heartbeat for group \\\"demo-dev\\\" generation id 4\"; level: Debug","@i":"ecba9ed9","MessageName":"rdkafka#consumer-4","Message":"[thrd:main]: GroupCoordinator/99: Heartbeat for group \"demo-dev\" generation id 4","MessageLevel":"Debug","SourceContext":"Demo.KafkaConsumerFactory","mask":[]}
As mentioned before, sometimes the consumers also crash with System.BadImageFormatException, but I don't know whether this is a completely different issue:
{"@t":"2023-02-01T12:04:55.4243487Z","@m":"\"topic1-v1-Consumer\"-\"rdkafka-ff28715d-b297-432f-b49f-c69c31068e64\": Unhandled error while consuming message","@i":"1180be2a","@l":"Error","@x":"System.BadImageFormatException: Bad binary signature. (0x80131192)\n
at Confluent.Kafka.Consumer`2.Consume(Int32 millisecondsTimeout)\n
at Confluent.Kafka.Consumer`2.Consume(CancellationToken cancellationToken)\n
...
{"@t":"2023-02-01T12:04:55.4243674Z","@m":"\"topic2-v1-Consumer\"-\"rdkafka-936d9162-107b-4578-8d5f-b8b1fb53bf5f\": Unhandled error while consuming message","@i":"1180be2a","@l":"Error","@x":"System.BadImageFormatException: Illegal or unimplemented ELEM_TYPE in signature. The format of the file '/app/Confluent.Kafka.dll' is invalid.\n
at Confluent.Kafka.Consumer`2.Consume(Int32 millisecondsTimeout)\n
at Confluent.Kafka.Consumer`2.Consume(CancellationToken cancellationToken)\n
...
Checklist
Please provide the following information:
- [ ] A complete (i.e. we can run it), minimal program demonstrating the problem. No need to supply a project file.
- [x] Confluent.Kafka nuget version.
- [x] Apache Kafka version.
- [ ] Client configuration.
- [x] Operating system.
- [x] Provide logs (with "debug" : "..." as necessary in configuration).
- [ ] Provide broker log excerpts.
- [x] Critical issue.
We are experiencing the same, more or less.
The setup is:
- Confluent.Kafka 1.9.3 (or 2.0.2, it's still there).
- .NET 5.0 running on Windows Server (no containers).
- Apache Kafka 2.8.
- Several consumer per topic, partitions are auto-assigned.
- Manual commits: we collect all offsets and pass them to
.Commit()periodically, in-sync with the consumer.
Some background.
We came across the AccessViolationException when we tried to "monitor" consumers. The reason for monitoring is we see that after hours or days of application running - the consumers disappear. Of course, in some cases it is related to heartbeats or session timeouts, but sometimes Kafka broker just drops consumers from the consumer group without a reason. This is especially painful if you have a single consumer for a topic, and it is being dropped all of a sudden. We decided to check on consumer objects in a separate thread by getting their Assignment property, which should return the topic-partition values. And this is when the AccessViolationException exception shows up. The application crashes as there's no way to handle it.
Another insight we got is before the crash, the consumer object gets unusable: that Assignment getter never returns. So there's definitely something wrong with the multithreading access to handles.
Interestingly, there have been some discussions in the original librdkafka repo on the same behavior, and it looks like they agree upon they should be sending ObjectDisposedException instead (that would probably shed more light on what's going on).
I'm not able to reproduce this. I tried a sample application with dotnet 7.0 and Kafka 3.2.1 which is working fine. Can you provide sample code in which you faced the issues with the client?
Please take a look at the sample code.
It is quite a contrived example. It is a simple consumer and a monitoring background process that check the consumer's assignment. As the consumer is not instantly connected, monitoring detects that the assignment doesn't return value back and proceeds with unsubscribing and disposing.
It fails right from the start.


Some issue is crashing my app sometimes when trying to Close the Confluent.Kafka.Consumer.
Fatal error. System.AccessViolationException: Attempted to read or write protected memory. This is often an indication that other memory is corrupt.
Repeat 2 times:
--------------------------------
at Confluent.Kafka.Impl.NativeMethods.NativeMethods.rd_kafka_destroy_flags(IntPtr, IntPtr)
--------------------------------
at Confluent.Kafka.Impl.Librdkafka.destroy_flags(IntPtr, IntPtr)
at Confluent.Kafka.Impl.SafeKafkaHandle.ReleaseHandle()
at System.Runtime.InteropServices.SafeHandle.InternalRelease(Boolean)
at Confluent.Kafka.Impl.SafeKafkaHandle.Dispose(Boolean)
at System.Runtime.InteropServices.SafeHandle.Dispose()
at Confluent.Kafka.Consumer`2[[System.__Canon, System.Private.CoreLib, Version=7.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e],[System.__Canon, System.Private.CoreLib, Version=7.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]].Dispose(Boolean)
at Confluent.Kafka.Consumer`2[[System.__Canon, System.Private.CoreLib, Version=7.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e],[System.__Canon, System.Private.CoreLib, Version=7.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]].Close()
at WebApi.Kafka.KafkaListener+<StartConsumptionLoopAsync>d__11.MoveNext()
Hello. Our apps crashes several times with similar exceptions
CoreCLR Version: 7.0.723.27404
.NET Version: 7.0.7
Description: The process was terminated due to an unhandled exception.
Exception Info: System.AccessViolationException: Attempted to read or write protected memory. This is often an indication that other memory is corrupt.
Stack:
at Confluent.Kafka.Impl.NativeMethods.NativeMethods.rd_kafka_consumer_poll(IntPtr, IntPtr)
at Confluent.Kafka.Impl.NativeMethods.NativeMethods.rd_kafka_consumer_poll(IntPtr, IntPtr)
at Confluent.Kafka.Consumer`2[[System.__Canon, System.Private.CoreLib, Version=7.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e],[System.__Canon, System.Private.CoreLib, Version=7.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]].Consume(Int32)
at Confluent.Kafka.Consumer`2[[System.__Canon, System.Private.CoreLib, Version=7.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e],[System.__Canon, System.Private.CoreLib, Version=7.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]].Consume(System.Threading.CancellationToken)
at KafkaFlow.Consumers.Consumer+<ConsumeAsync>d__39.MoveNext()
at System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1+AsyncStateMachineBox`1[[System.__Canon, System.Private.CoreLib, Version=7.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e],[KafkaFlow.Consumers.Consumer+<ConsumeAsync>d__39, KafkaFlow, Version=2.4.1.0, Culture=neutral, PublicKeyToken=null]].ExecutionContextCallback(System.Object)
at System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object)
at System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1+AsyncStateMachineBox`1[[System.__Canon, System.Private.CoreLib, Version=7.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e],[KafkaFlow.Consumers.Consumer+<ConsumeAsync>d__39, KafkaFlow, Version=2.4.1.0, Culture=neutral, PublicKeyToken=null]].MoveNext(System.Threading.Thread)
at System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1+AsyncStateMachineBox`1[[System.__Canon, System.Private.CoreLib, Version=7.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e],[KafkaFlow.Consumers.Consumer+<ConsumeAsync>d__39, KafkaFlow, Version=2.4.1.0, Culture=neutral, PublicKeyToken=null]].MoveNext()
at System.Threading.Tasks.AwaitTaskContinuation.RunOrScheduleAction(System.Runtime.CompilerServices.IAsyncStateMachineBox, Boolean)
at System.Threading.Tasks.Task.RunContinuations(System.Object)
at System.Threading.SemaphoreSlim+<WaitUntilCountOrTimeoutAsync>d__31.MoveNext()
at System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object)
at System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1+AsyncStateMachineBox`1[[System.Boolean, System.Private.CoreLib, Version=7.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e],[System.Threading.SemaphoreSlim+<WaitUntilCountOrTimeoutAsync>d__31, System.Private.CoreLib, Version=7.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]].MoveNext(System.Threading.Thread)
at System.Threading.Tasks.AwaitTaskContinuation.RunOrScheduleAction(System.Action, Boolean)
at System.Threading.Tasks.Task.RunContinuations(System.Object)
at System.Threading.Tasks.Task`1[[System.Boolean, System.Private.CoreLib, Version=7.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]].TrySetResult(Boolean)
at System.Threading.Tasks.Task+CancellationPromise`1[[System.Boolean, System.Private.CoreLib, Version=7.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]].System.Threading.Tasks.ITaskCompletionAction.Invoke(System.Threading.Tasks.Task)
at System.Threading.ThreadPoolWorkQueue.Dispatch()
at System.Threading.PortableThreadPool+WorkerThread.WorkerThreadStart()
In our case it was caused by multiple threads trying to stop consumer simultaneously.
I again retried reproducing this with .NET 8 but unable to do so on Mac and Linux. Can anyone confirm with which OS is System.AccessViolationException occurring?
An unhandled exception of type 'System.AccessViolationException' occurred in Confluent.Kafka.dll
Attempted to read or write protected memory. This is often an indication that other memory is corrupt.
Windows 10, .Net 8
using dotnet 8.0.300 Confluent.Kafka 2.4.0 I ran into similar issue. I was creating thread in my subscriber class to consume messages from a topic asynchronously. When I stopped the subscriber, I ran into the following error:
Error: A task was canceled.
Fatal error. System.AccessViolationException: Attempted to read or write protected memory. This is often an indication that other memory is corrupt.
Repeat 2 times:
--------------------------------
at Confluent.Kafka.Impl.NativeMethods.NativeMethods.rd_kafka_consumer_poll(IntPtr, IntPtr)
--------------------------------
at Confluent.Kafka.Consumer`2[[System.__Canon, System.Private.CoreLib, Version=6.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e],[System.__Canon, System.Private.CoreLib, Version=6.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]].Consume(Int32)
at Confluent.Kafka.Consumer`2[[System.__Canon, System.Private.CoreLib, Version=6.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e],[System.__Canon, System.Private.CoreLib, Version=6.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]].Consume(System.Threading.CancellationToken)
at task2.Services.KafkaBroker+<>c__DisplayClass4_1+<<RecieveMessage>b__0>d.MoveNext()
at System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[[System.__Canon, System.Private.CoreLib, Version=6.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]](System.__Canon ByRef)
at System.Runtime.CompilerServices.AsyncVoidMethodBuilder.Start[[System.__Canon, System.Private.CoreLib, Version=6.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]](System.__Canon ByRef)
at task2.Services.KafkaBroker+<>c__DisplayClass4_1.<RecieveMessage>b__0()
at System.Threading.Thread.StartCallback()
Issue was solved by moving my ConsumerBuilder object and consumer.Subscribe(topic) inside my thread