rdkafka-dotnet
rdkafka-dotnet copied to clipboard
producer.Dispose is stuck
Hi, I try to use the SimpleProducer example. After the produce action, it's look like the producer.Dispose enters an infinite loop.
public async Task<bool> ProduceMessage(string message)
{
try
{
using (Producer producer = new Producer(_kafkaOptions.Uri))
{
using (Topic topic = producer.Topic(_kafkaOptions.Topic))
{
byte[] data = Encoding.UTF8.GetBytes(message);
await topic.Produce(data);
return true;
}
}
}
catch (RdKafkaException ex)
{
return false;
}
}
Am I doing something wrong? thanks!
Hi, is the message delivered successfully? Producer.Dispose will block until all messages in flight have either been delivered or timed out. Try enabling config["debug"] = "all" to see what's going on.
Also creating a Producer is expensive, so it's a good idea to keep it alive between calls to ProduceMessage.
Hi, the message delivered successfully, but still the problem persists. If the message delivered successfully (only one message), right afterwards the dispose is called? Can it help if I change the message timeout? thanks
Which version are you using? In 0.9.1, there are some bugs related to conf, eg if you have a group.id in your producer config, dispose will hang. Be sure to not use any consumer config. There were also several bugs whch are corrected in latest librdkafka 0.9.2. You can try the preview of rdkafka-dotnet to bind to it
Changing timeout should not change anything giving your code (And yes, try keeping a producer/topic open only once, recreating it every time will very highly impact your throughput as you won't used internal queue for batching)
Observing the same issue with 0.9.2-ci-181 (prerelease) though from an integration test, when using await
.
.Wait()
or .Result
works fine. Could be a synchronization context issue.
Edit: Actually seeing the same with 0.9.1 as well.
@amccague What is your exact code?
I have the same problem. If there are messages in the queue and the broker then becomes unavailable, the Dispose
method enters an unending loop due to it waiting for the queue to become clear, which it never will be due to the broker being unavailable.
The problematic code is here:
if (disposing)
{
// Wait until all outstanding sends have completed
while (OutQueueLength > 0)
{
handle.Poll((IntPtr) 100);
}
handle.Dispose();
}
One solution would be to add a timeout or max retry size, something to ensure that the loop at some point terminates.
May be related to #58.
I think @amccague is on to something. I ran into this issue as well and found that the problem only happens when I use async/await.
I've created integration tests that demonstrate this using the v0.92-ci-186 NuGet package. The asynchronous version hangs forever in producer.Dispose. The synchronous version using .Result correctly finishes the unit test with an exception due to the cluster being down and no brokers available.
using Microsoft.VisualStudio.TestTools.UnitTesting;
using RdKafka;
using System.Configuration;
using System.Diagnostics;
using System.Text;
using System.Threading.Tasks;
namespace RdKafkaDotNet
{
[TestClass]
public class RdKafkaTests
{
private const string TopicName = "test";
private const string TestMessage = "TestMessage";
[TestMethod]
public async Task AsyncTest()
{
using (var producer = CreateProducer())
{
using (var topic = CreateTopic(producer, TopicName))
{
var result = await topic.Produce(Encoding.UTF8.GetBytes(TestMessage));
Assert.IsNotNull(result);
Assert.IsTrue(result.Offset > 0);
}
}
}
[TestMethod]
public void SyncTest()
{
using (var producer = CreateProducer())
{
using (var topic = CreateTopic(producer, TopicName))
{
var result = topic.Produce(Encoding.UTF8.GetBytes(TestMessage)).Result;
Assert.IsNotNull(result);
Assert.IsTrue(result.Offset > 0);
}
}
}
private static Producer CreateProducer()
{
var config = new Config
{
["debug"] = "all",
Logger = (handle, level, fac, buf) => Debug.WriteLine($"[Logger]: {handle}, {level}, {fac}, {buf}")
};
var producer = new Producer(config, ConfigurationManager.AppSettings["kafka.brokerurl"]);
producer.OnStatistics += (sender, s) => Debug.WriteLine("[OnStatistics]: " + s);
producer.OnError += (sender, args) => Debug.WriteLine("[OnError]: " + args.Reason + " (" + args.ErrorCode + ")");
return producer;
}
private static Topic CreateTopic(Producer producer, string topic)
{
var config = new TopicConfig
{
["message.timeout.ms"] = "15000"
};
return producer.Topic(topic, config);
}
}
}