confluent-kafka-dotnet
confluent-kafka-dotnet copied to clipboard
What is the best approach to consume messages without overloading with a lot of task processing threads
We have an application in production which consumes the messages from a kafka topic and spawn a new task processor thread to process the message. After spawning the new thread, it commits the message without confirming whether the new task processor thread can save to database or not. This is causing couple of issues like when we have a lot of messages from producer pending to be picked up by consumer and there is some issue with task processor thread, then even if the earlier threads did not finish the work, it will pick up new messages and spawn new threads. Hence at certain point of time if there are 6000 messages waiting in broker, there is a possibility of 6000 task processor threads active with database inserts causing database server cpu utilization to be 100%. Is there a better way we can throttle the message processing and commit from the task processor threads or so? This topic is having 5 partitions in our environment and our consumer application runs from 3 clustered servers. What would be the best approach to commit the messages after task processor finished processing and is there any way we can batch the message pick up to pick only 100 or 200 messages from broker even if 6000 messages are present and once processing completed for that batch, pick up the next batch.
The code snippet for us is as follows:
private void Run_Poll(bool isLog)
{
try
{
// If cancellation is requested, do nothing.
if (_cancellationToken.IsCancellationRequested)
return;
Dictionary<string, object> consumerConfiguration = ConstructConsumerConfig();
// Note: All event handlers are called on the main thread.
using (var consumer = new Consumer<string, string>(consumerConfiguration, new StringDeserializer(Encoding.UTF8), new StringDeserializer(Encoding.UTF8)))
{
// If cancellation is requested, do nothing.
if (_cancellationToken.IsCancellationRequested)
return;
consumer.OnError += Consumer_OnError;
consumer.OnConsumeError += Consumer_OnConsumeError;
consumer.OnOffsetsCommitted += Consumer_OnOffsetsCommitted;
consumer.OnStatistics += Consumer_OnStatistics;
consumer.OnPartitionsAssigned += Consumer_OnPartitionsAssigned;
consumer.OnPartitionsRevoked += Consumer_OnPartitionsRevoked;
if (isLog)
{
consumer.OnMessage += Consumer_OnLogMessage;
consumer.Subscribe(new string[] { _kafkaConfig.KafkaObjectLogsTopic });
}
else
{
consumer.OnMessage += Consumer_OnMessage;
consumer.Subscribe(new string[] { _kafkaConfig.KafkaObjectDetailsTopic });
}
string subscribedlogMsg = $"Subscribed to: [{string.Join(", ", consumer.Subscription)}]";
ErrTagHandler.GetInstance().PublishRequiredLogInfo(subscribedlogMsg, 51210);
while (!_cancellationToken.IsCancellationRequested)
{
consumer.Poll(TimeSpan.FromMilliseconds(_kafkaConfig.PollBlockIntervalInMilliSec));
}
}
}
catch (Exception ex)
{
ErrTagHandler.GetInstance().PublishExceptionInfo("RunPollError", ex, "Unexpected error occured in KAFKA Consumer Unexpected error occurred in KAFKA Consumer Run Poll thread.");
}
}
private void Consumer_OnMessage(object sender, Message<string, string> message)
{
// If cancellation is requested, don't start the task
if (_cancellationToken.IsCancellationRequested)
return;
if (_kafkaConfig.DebugPausePolling) return; //person debugging has requested a pause in the polling
if (_kafkaConfig.EnableDebugLog)
{
string logMessage = $"Topic: {message.Topic} | Partition:{message.Partition} | Offset:{message.Offset} | Message={message.Value}";
ErrTagHandler.GetInstance().PublishLogInfo(logMessage, 51200);
}
try
{
// Invoke the Action delegate to handle the Object detail message arrival (this is where we are spawning a new thread)
System.Threading.Tasks.Task.Factory.StartNew(() => _objectArrivalHandler?.Invoke(message));
}
catch (Exception ex)
{
ErrTagHandler.GetInstance().PublishExceptionInfo("ObjectSaveTaskStartError", ex);
}
try
{
// If cancellation is requested, don't try to commit offset
if (_cancellationToken.IsCancellationRequested)
return;
if (sender is Consumer<string, string> consumer)
{
if (message.Offset % _kafkaConfig.OffsetCommitBatchSize == 0)
{
// If cancellation is requested, don't try to commit offset
if (_cancellationToken.IsCancellationRequested)
return;
var committedOffsets = consumer.CommitAsync(message).Result;
}
}
}
catch (Exception ex)
{
ErrTagHandler.GetInstance().PublishExceptionInfo("Consumer_OnOffsetsCommitError", ex, "Unexpected error occured while commiting offset.");
}
}
How to reproduce
Checklist
Please provide the following information:
- [x] Confluent.Kafka nuget version: 0.11.4.0
- [x] Apache Kafka version: 0.11.02
- [ ] Client configuration:
- [ ] Operating system:
- [ ] Provide logs (with "debug" : "..." as necessary in configuration)
- [ ] Provide broker log excerpts
- [ ] Critical issue
I have done something similar but have used 1. beta 2 version of Confluent DotNet (Poll is removed in this built). I know its quite a bit of work but I strongly recommend you to first upgrade to latest one (I yet to upgrade to beta 3).
My design is, the program retrieves the n number of message (by configuration) and start the task (like processing batch of messages) and if anything fails then those messages will be added to retry topic (same payload but my another consumer would try to process these messages after certain time) and continue to process the rest of the message.
Hope this helps.
Thank you !!! We will try this with newer version beta 3 approach. Thanks again for the suggestion.
a big difference moving to the 1.0 api is partition eof notification is via the ConsumeResult
not via an event, and by default this is turned off. this allows for much cleaner consume loop logic.
I've marked this issue as enhancement since something like this would be a great use-case to do an example for.
BTW: if you are only loading data into a database, have you looked into Kafka Connect & off-the-shelf connectors?
any update on this ?