confluent-kafka-dotnet
confluent-kafka-dotnet copied to clipboard
Confluent.Kafka.ConsumeException: Local: Timed out
Description
We are experiencing the below error intermittently while consuming the messages from Kafka
Confluent.Kafka.ConsumeException: Local: Timed out
at Confluent.Kafka.Consumer2.ConsumeImpl[K,V](Int32 millisecondsTimeout, IDeserializer
1 keyDeserializer, IDeserializer1 valueDeserializer) at Confluent.Kafka.Consumer
2.Consume(CancellationToken cancellationToken)
How to reproduce
The below configuration is using in our application return new ConsumerConfig { BootstrapServers = _consumerConfiguration.BootstrapServers, GroupId = _consumerConfiguration.GroupId, SessionTimeoutMs = 100000, AutoOffsetReset = AutoOffsetReset.Earliest, EnablePartitionEof = true, EnableAutoCommit = _consumerConfiguration.EnableAutoCommit };
Checklist
Please provide the following information:
- [ ] A complete (i.e. we can run it), minimal program demonstrating the problem. No need to supply a project file.
- [ ] Confluent.Kafka nuget version.
- [ ] Apache Kafka version.
- [ ] Client configuration.
- [ ] Operating system.
- [ ] Provide logs (with "debug" : "..." as necessary in configuration).
- [ ] Provide broker log excerpts.
- [ ] Critical issue.
i don't have time to trace this through to be 100% sure, but this is due to an op on the fetch queue having this error set, which I would guess is due to a protocol fetch request timing out. so indicative of network connectivity / broker issues.
also, this seems to be a case of librdkafka leaking a re-tryable error (only non-retryable errors should be reported to the user in this way). we need to audit all this and precisely document / tweak.
you can safely ignore this (perhaps just log it).
We are getting this error when there are broker issues, but the consumer never seems to recover, it just sits there for hours until we find it the next morning. If we restart then it starts consuming again. Admittedly we do need to fix our intermittent broker issues but I would like to understand how I can be more robust to this error. Currently I am trying to work out how to catch it and automatically rebuild the consumer.
@ed-alertedh - debug logs (set Debug
config to "all") from around the time the problem starts (+ later) will help us a lot here. also, make sure you're using the latest version.
I worked out that because I had called ConsumerBuilder.SetErrorHandler
I needed to check for fatal errors in my error handler. If I don't set an error handler will I receive the fatal errors as an exception instead?
Anyway, my current implementation doesn't sit for long enough to trigger Confluent.Kafka.ConsumeException: Local: Timed out
any more. It hits other fatal errors first. My coworker is investigating errors in the broker logs too. For some reason the consumer seems to get into a state where it is only communicating with one of our two brokers correctly, even after I attempt to stop the consumer and create a new one. If I restart the whole program it immediately starts consuming correctly. So I am wondering if my restart code is not enough to reset the state of everything back to how it is when the program starts up...
I have upgraded to the latest version and will turn on the debug logs to trace things in more detail.
This is the core of the implementation I am using including my attempt at restart logic:
KafkaSubscriber.cs
using Confluent.Kafka;
using System;
using System.IO;
using System.Threading;
using System.Collections.Generic;
using System.Text.RegularExpressions;
using System.Linq;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System.Threading.Tasks;
using NLog;
namespace KafkaAdapter
{
public class KafkaSubscriber
{
private KafkaSettings kafkaSettings;
private Logger logger;
public KafkaSubscriber(KafkaSettings kafkaSettings)
{
this.logger = LoggingFactory.Logger;
this.kafkaSettings = kafkaSettings;
}
protected CancellationTokenSource CancellationTokenSource { get; set; }
public event EventHandler<ErrorEventArgs> ReceiveError;
/// The callback is called when a fatal error occurs that stops the Kafka consumer from continuing to work
protected virtual void OnReceiveError(ErrorEventArgs e)
{
ReceiveError?.Invoke(this, e);
}
protected Task ConsumerTask { get; set; }
private IEnumerable<TopicPartitionOffset> partitionsAssignedHandler(IConsumer<Ignore, string> consumer, List<TopicPartition> ps)
{
var result = new List<TopicPartitionOffset>();
foreach (var tp in ps)
{
try
{
var watermark = consumer.QueryWatermarkOffsets(tp, TimeSpan.FromSeconds(20));
logger.Debug($"Partition {tp.Partition.Value}, Low watermark: {watermark.Low}, High watermark: {watermark.High}");
result.Add(new TopicPartitionOffset(tp, watermark.High > 0 ? watermark.High - 1 : 0));
}
catch (Exception e)
{
logger.Error(e);
}
}
return result;
}
public async Task Start()
{
logger.Info("Starting Kafka consumer");
if (ConsumerTask != null && ConsumerTask.Status != TaskStatus.RanToCompletion && ConsumerTask.Status != TaskStatus.Canceled && ConsumerTask.Status != TaskStatus.Faulted)
{
throw new InvalidOperationException("Cannot start subscriber while it is already running!");
}
logger.Debug("Create new ConsumerTask");
CancellationTokenSource = new CancellationTokenSource();
IConsumer<Ignore, string> c = null;
Exception onReceiveErrorException = null;
try
{
c = new ConsumerBuilder<Ignore, string>(kafkaSettings.configuration)
.SetLogHandler(this.LogHandler)
.SetErrorHandler(this.ErrorHandler)
// replay the last message that was published to the topic so that the server populates some items on startup (even if producer is down)
.SetPartitionsAssignedHandler(partitionsAssignedHandler)
.Build();
ConsumerTask = Task.Run(() =>
{
logger.Debug("Consumer built, subscribing to topics");
c.Subscribe(kafkaSettings.topics);
logger.Debug("Entering consume loop");
while (true)
{
var cr = c.Consume(CancellationTokenSource.Token);
logger.Debug($"Parsing message at offset {cr.TopicPartitionOffset}:\n{cr.Value}");
// < message parsing logic>
}
});
await ConsumerTask;
}
catch (OperationCanceledException)
{
if (c != null)
{
// Ensure the consumer leaves the group cleanly and final offsets are committed.
logger.Info("Disposing consumer");
c.Close();
c.Dispose();
}
}
catch (Exception e)
{
onReceiveErrorException = e;
logger.Error("Caught exception in consumer loop",e);
// for now we don't try and clean up properly because we are assuming we have lost broker connection
// and I suspect it was hanging when I did try to do that
// this issue is closed but perhaps it can still be triggered in certain cases... https://github.com/confluentinc/confluent-kafka-dotnet/issues/604
}
// don't throw until we have left the Task thread - this allows the
// event handler to call .Stop() or .Restart() if they choose
// without causing deadlocking
if (onReceiveErrorException != null)
{
logger.Info("Calling OnReceiveError");
OnReceiveError(new ErrorEventArgs(onReceiveErrorException));
}
}
public async Task Stop()
{
logger.Info("Waiting for Kafka consumer to stop...");
CancellationTokenSource.Cancel();
// Wait() seems to hang indefinitely in any of the error/cancel states of the task
if (!ConsumerTask.IsCompleted && !ConsumerTask.IsFaulted && !ConsumerTask.IsCanceled)
{
await ConsumerTask;
}
logger.Info("Kafka consumer stopped.");
ConsumerTask = null;
}
public async Task Restart()
{
logger.Info("Restarting Kafka consumer");
await Stop();
await Start();
}
private void ErrorHandler(IConsumer<Ignore, string> consumer, Error error)
{
logger.Error("Kafka ErrorHandler", error);
if (error.IsFatal || error.Code == ErrorCode.Local_TimedOut)
{
logger.Info("Throwing fatal error code as exception");
throw new KafkaException(error);
}
}
private void LogHandler(IConsumer<Ignore, string> consumer, LogMessage message)
{
//< forward logs to NLog>
}
}
}
currently the consumer won't emit any fatal errors (with transactions, that's going to change). fatal errors (Fatal flag is set on the error object) should be extremely rare, and pretty much should never happen under normal operation. they will only happen in some unusual log-truncation scenarios or if there's a bug in kafka.
in your consume loop, you should catch (and log) exceptions emitted by consume and continue consuming. they aren't fatal. currently some retryable errors do leak out to the application. they should be completely encapsulated inside the consumer, we are going to fix this.
worth noting that in v1.3, exceptions emitted in handlers will not be propagated to the main loop (they will cause the application to terminate). in v1.4 they will be. i don't think you have an issue here however.
As you say, we probably do have issues in our broker. It has been configured to start discarding old messages when the topic gets too large (we have another consumer ingesting to a DB for persistence), would that have anything to do with log-truncation?
I've attached a debug log file starting from just before a fatal error code is sent to the ErrorHandler. There are a few of my logs in there but they are mostly from Confluent.Kafka - prefixed with [Kafka]
. Even on v1.3.0 I still seem to be able to catch the exceptions I throw from the ErrorHandler. I will try going back to catching ConsumeException
and logging it rather than giving up. But as you can see from the logs, the fatal error code triggers a restart before any ConsumeException
is thrown.
The restart occurs on line 122
2020-01-22 00:10:40.5317 [Debug] Tag updates with timestamp 1/22/2020 12:03:29 AM: :
2020-01-22 00:10:40.6066 [Debug] Tag updates with timestamp 1/22/2020 12:03:29 AM: :
2020-01-22 00:10:42.0315 [Error] Kafka ErrorHandler :
2020-01-22 00:10:43.3568 [Info] Throwing fatal error code as exception :
2020-01-22 00:10:43.5283 [Debug] [Kafka][OFFSET] [thrd:main]: Topic inference [0]: stored offset 305706, committed offset 305706: not including in commit :
2020-01-22 00:10:44.7413 [Error] Caught exception in consumer loop : GroupCoordinator: 1 request(s) timed out: disconnect (after 641890ms in state UP)
2020-01-22 00:10:46.0214 [Debug] [Kafka][OFFSET] [thrd:main]: Topic inference [1]: stored offset 295785, committed offset 295783: setting stored offset 295785 for commit :
2020-01-22 00:10:47.3126 [Info] Calling OnReceiveError :
2020-01-22 00:10:48.5915 [Debug] [Kafka][COMMIT] [thrd:main]: Deferring "cgrp auto commit timer" offset commit for 1 partition(s) in state wait-coord: no coordinator available :
2020-01-22 00:10:49.8728 [Error] Kafka receive error : GroupCoordinator: 1 request(s) timed out: disconnect (after 641890ms in state UP)
2020-01-22 00:10:51.1957 [Debug] [Kafka][COMMIT] [thrd:main]: Group "opcda32_consumer": unable to OffsetCommit in state wait-coord: cgrp auto commit timer: coordinator (10.143.1.20:9092/1) is unavailable: retrying later :
2020-01-22 00:10:52.3742 [Info] Attempting to restart the Kafka subscriber :
2020-01-22 00:10:54.6692 [Info] Restarting Kafka consumer :
2020-01-22 00:10:55.9193 [Info] Waiting for Kafka consumer to stop... :
2020-01-22 00:10:57.1877 [Info] Kafka consumer stopped. :
2020-01-22 00:10:58.6302 [Debug] [Kafka][OFFSET] [thrd:main]: Topic inference [0]: stored offset 305706, committed offset 305706: not including in commit :
2020-01-22 00:10:59.0791 [Info] Starting Kafka consumer :
2020-01-22 00:11:00.3765 [Debug] [Kafka][OFFSET] [thrd:main]: Topic inference [1]: stored offset 295785, committed offset 295783: setting stored offset 295785 for commit :
2020-01-22 00:11:02.0183 [Debug] Create new ConsumerTask :
ed-alertedh, I am seeing similar issue, have you got to the solution to it?
After these errors, some consumers doesn't resume message/record processing even if partition has records and consumer is healthy and polling.
- "error":"Local: Timed out","error":"commit error" : This is happening on commit.
- "error":"GroupCoordinator: 1 request(s) timed out: disconnect (after 6847266ms in state UP, 1 identical error(s) suppressed)" socket.timeout.ms is set to 60000ms right now.
We have the same issue in Producer, Exception like this:
Confluent.Kafka.ProduceException2[System.String,System.String]: Local: Message timed out at Confluent.Kafka.Producer
2.ProduceAsync(TopicPartition topicPartition, Message`2 message, CancellationToken cancellationToken)