confluent-kafka-dotnet
confluent-kafka-dotnet copied to clipboard
Consumer refetching/Rewinding the offset when new consumer joins
Description
Consumer is refetching the same message again which causes the Outgoing messages is more in Eventhub metrics. I have a C# Consumer where am pulling the message and storing in to the list. As Confluent Kafka Dotnet does not support Batch Message Processing am handling from my application. Once read all the messages from local queue to the in memory list in application side and process it. To control the frequent the fetch as suggested by @mhowlett , as am consuming the first message using Consume(1000) for further message consume uses with Consume(TimeSpan.Zero).
Before I start the consumer, accumulated 9861 messages in the topic where am trying to consume. BTW I have a topic with 3 partitions.
How to reproduce
Sample Code to reproduce
using Confluent.Kafka;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace IMS.Kafka.Package.Testbed
{
public class ConsumeFetchControl : IHostedService
{
private readonly ILogger _logger;
IConsumer<Null, string> consumer;
CancellationTokenSource cts = new();
public ConsumeFetchControl(ILogger logger) => _logger = logger;
public async Task Consumer(CancellationToken cancellationToken)
{
string bootstrapServers = "vishnu-1tu-test.servicebus.windows.net:9093";
string password = "<REPLACE_PASSWORD_HERE>";
string topic = "TestTopic";
string consumerGroup = "mygrp"; // Replace with your consumer group name
List<ConsumeResult<Null, string>> batchConsumeMessages = new();
var config = new ConsumerConfig
{
BootstrapServers = bootstrapServers,
GroupId = consumerGroup,
ClientId = consumerGroup + DateTime.UtcNow.Ticks,
AutoOffsetReset = AutoOffsetReset.Earliest, // Start consuming from the beginning of the topic
EnableAutoOffsetStore = true,
EnableAutoCommit = false, // Disable auto-commit to have more control over the consumer's progress
AutoCommitIntervalMs = 10000,
EnablePartitionEof = true,
ConnectionsMaxIdleMs = 180000,
MaxPartitionFetchBytes = 1048576,
QueuedMaxMessagesKbytes = 10240,
PartitionAssignmentStrategy = PartitionAssignmentStrategy.CooperativeSticky,
IsolationLevel = IsolationLevel.ReadUncommitted,
SocketNagleDisable = true,
SocketKeepaliveEnable = true,
MetadataMaxAgeMs = 180000,
SessionTimeoutMs = 30000,
MaxPollIntervalMs = 300000,
CancellationDelayMaxMs = 200,
SaslUsername = "$ConnectionString",
SaslPassword = password,
SaslMechanism = SaslMechanism.Plain,
SecurityProtocol = SecurityProtocol.SaslSsl,
Debug = "consumer,cgrp,topic,fetch"
};
consumer = new ConsumerBuilder<Null, string>(config)
.SetLogHandler((c, logHandler) =>
{
_logger!.LogInformation($"Consumer Log Handler : {logHandler.Level.ToString().ToUpper()}|{DateTime.UtcNow.ToString("dd-MM-yyyy HH:mm:ss.fff")}|{logHandler.Facility}|{logHandler.Name}|{logHandler.Message}");
})
.SetPartitionsAssignedHandler((c, partitions) =>
{
string topicName = partitions?.FirstOrDefault()?.Topic ?? string.Empty;
if (!string.IsNullOrWhiteSpace(topicName))
{
string formattedAssigned = string.Join(',', partitions!.Select(p => p.Partition.Value).OrderBy(p => p));
var finalAssignment = c.Assignment.Concat(partitions!)?.Select(p => p.Partition.Value).OrderBy(p => p);
string formattedRemaining = string.Join(',', finalAssignment!);
_logger!.LogInformation($"{topicName} Newly assigned partition(s) : {formattedAssigned}, Current state: {formattedRemaining} @ {DateTime.UtcNow.ToString("dd-MM-yyyy HH:mm:ss.fff")}");
}
})
.SetPartitionsRevokedHandler((c, partitions) =>
{
string topicName = partitions?.FirstOrDefault()?.Topic ?? string.Empty;
if (!string.IsNullOrWhiteSpace(topicName))
{
string formattedRevoked = string.Join(',', partitions!.Select(p => p.Partition.Value).OrderBy(p => p));
var finalAssignment = c.Assignment.Where(atp => partitions!.Where(rtp => rtp.TopicPartition == atp).Count() == 0).Select(p => p.Partition.Value).OrderBy(p => p);
string formattedRemaining = string.Join(',', finalAssignment);
_logger!.LogInformation($"{topicName} Revoked partition(s) : {formattedRevoked}, Current state: {formattedRemaining} @ {DateTime.UtcNow.ToString("dd-MM-yyyy HH:mm:ss.fff")}");
}
})
.SetPartitionsLostHandler((c, partitions) =>
{
string topicName = partitions?.FirstOrDefault()?.Topic ?? string.Empty;
if (!string.IsNullOrWhiteSpace(topicName))
{
string formattedLost = string.Join(',', partitions?.Select(p => p.Partition.Value).OrderBy(p => p));
var finalAssignment = c.Assignment.Where(atp => partitions!.Where(rtp => rtp.TopicPartition == atp).Count() == 0).Select(p => p.Partition.Value).OrderBy(p => p);
string formattedRemaining = string.Join(',', finalAssignment);
_logger!.LogInformation($"{topicName} Lost partition(s) : {formattedLost}, Current state: {formattedRemaining} @ {DateTime.UtcNow.ToString("dd-MM-yyyy HH:mm:ss.fff")}");
}
})
.Build();
consumer.Subscribe(topic);
bool isFirstFetchDone = false;
int batchMessageCount = 0;
int maxMessagePerBatchToCommit = 500;
while (!cts.IsCancellationRequested)
{
try
{
var consumeResult = isFirstFetchDone ? consumer.Consume(TimeSpan.Zero) : consumer.Consume(1000);
if (consumeResult == null) { continue; }
if (consumeResult.IsPartitionEOF)
{
_logger.LogInformation("Reached End of Partition.");
//DO THE MESSAGE PROCESSING LOGIC HERE
consumer.Commit(consumeResult); //Assuming all processed successfully and committing
isFirstFetchDone = false;
batchMessageCount = 0;
}
if (consumeResult.Message.Value is string)
{
isFirstFetchDone = true;
batchConsumeMessages.Add(consumeResult);
batchMessageCount++;
}
if (maxMessagePerBatchToCommit == batchMessageCount)
{
//DO THE MESSAGE PROCESSING LOGIC HERE
if (true) //If all success
{
consumer.Commit(consumeResult); //Assuming all processed successfully and committing
isFirstFetchDone = false;
}
else //In case of any failure in the batch. Not considered the Transcational producer since Eventhub does not support this feature.
{
consumer.Seek(batchConsumeMessages.First().TopicPartitionOffset);
}
batchMessageCount = 0;
}
}
catch (KafkaException ex) when (ex.Message == "Broker: Specified group generation id is not valid" || ex.Message == "Local: Broker handle destroyed")
{
if (ex.Error.IsFatal)
{
// https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#fatal-consumer-errors
break;
}
//NOT TO LOG ANYTHING
}
catch (ConsumeException e) when (e.Error.Reason.Contains("Application maximum poll interval"))
{
//DO NOT REMOVE THIS BLOCK and DO NOT THROE EXCPEPTION.
//THIS LEADS TO STOP THE MESSAGE CONSUMPTION
_logger.LogWarning($"Received ConsumeException with Application maximum poll interval exceeded reason for the topic {topic}. Error Code : {e.Error.Code}, Resaon : {e.Error.Reason}");
}
catch (KafkaException e) when (e.Error.Reason.Contains("Application maximum poll interval"))
{
//DO NOT REMOVE THIS BLOCK and DO NOT THROE EXCPEPTION.
//THIS LEADS TO STOP THE MESSAGE CONSUMPTION
_logger.LogWarning($"Received KafkaException with Application maximum poll interval exceeded reason for the topic {topic}. Error Code : {e.Error.Code}, Resaon : {e.Error.Reason}");
}
catch (OperationCanceledException ex)
{
_logger.LogWarning($"OperationCanceledException caught in the topic {topic}: Message={ex.Message}, Source={ex.Source}, InnerException={ex.InnerException}, StackTrace:{ex.StackTrace}");
}
}
}
public async Task StartAsync(CancellationToken cancellationToken)
{
await Consumer(cancellationToken);
}
public async Task StopAsync(CancellationToken cancellationToken)
{
try
{
cts.Cancel();
if (consumer != null && (consumer?.Subscription?.Any() ?? false))
{
consumer?.Close();
consumer?.Dispose();
}
}
catch (KafkaException ex)
{
_logger!.LogWarning($"Handled Kafka Exception in {nameof(StopAsync)}");
}
catch (Exception ex)
{
_logger!.LogWarning($"Exception caught in {nameof(StopAsync)} and handled. Message={ex.Message}");
}
}
}
}
Checklist
Please provide the following information:
- [x] A complete (i.e. we can run it), minimal program demonstrating the problem. No need to supply a project file. Pasted the code in to How to reproduce section
- [x] Confluent.Kafka nuget version. 2.1.1
- [x] Apache Kafka version. - Using the Kafka enabled Azure EventHub
- [x] Client configuration.
"bootstrap.servers" : "vishnu-1tu-bug-test.servicebus.windows.net:9093", "group.id" : "derotest-bug-23092023-4", "client.id":"derotest-bug-23092023-4638310463137519262", "auto.offset.reset":"earliest" "enable.auto.offset.store": "True" "enable.auto.commit": "False" "auto.commit.interval.ms": "10000" "enable.partition.eof: "True" "connections.max.idle.ms: "180000" "max.partition.fetch.bytes": "1048576" "queued.max.messages.kbytes":"10240" "partition.assignment.strategy": "cooperative-sticky" "isolation.level": "read_uncommitted" "socket.nagle.disable": "True" "socket.keepalive.enable": "True" "metadata.max.age.ms": "180000" "session.timeout.ms": "30000" "max.poll.interval.ms": "300000" "dotnet.cancellation.delay.max.ms": "200" "sasl.username": "$ConnectionString" "sasl.password"<PASSWORD>" "sasl.mechanism":"PLAIN" "security.protocol": "sasl_ssl" "debug": "consumer,cgrp,topic,fetch" "allow.auto.create.topics": "false" - [x] Operating system. Running the application in AKS with 3 replicas in the POD
- [x] Provide logs (with "debug" : "..." as necessary in configuration). Attached the log file with "consumer,cgrp,topic,fetch" DebugLog in CSV
- [x] Provide broker log excerpts.
- [x] Critical issue. Yes
@mhowlett @edenhill Could you please tell me what is happening here.
The metrics from the Eventhub Shows that number of outgoing message is more where i have only 9861 message to consume. From MS Support i got the info that the client is re reading the same message. Am bit not sure how to interpret the actual debug log message.
Edited: Each message is around 4.24 KB in the consume topic.
I see the below lines in the log, Is this causing the issue. If yes how to control this.
9/23/2023, 6:11:59.174 AM kafka-eventhub-test-0.(none) Consumer Log Handler : DEBUG|23-09-2023 06:11:59.1747870|FETCH|derotest-bug-23092023-4638310463137519149#consumer-10|[thrd:sasl_ssl://vishnu-1tu-test.servicebus.windows.net:9093/boot]: sasl_ssl://vishnu-1tu-test.servicebus.windows.net:9093/0: Topic TestTopic [0] in state active at offset 0 (leader epoch 0) (2506/100000 msgs, 10643/10240 kb queued, opv 4) is not fetchable: queued.max.messages.kbytes exceeded
9/23/2023, 6:12:00.078 AM kafka-eventhub-test-0.(none) Consumer Log Handler : DEBUG|23-09-2023 06:12:00.0781550|FETCH|derotest-bug-23092023-4638310463137519262#consumer-13|[thrd:sasl_ssl://vishnu-1tu-test.servicebus.windows.net:9093/boot]: sasl_ssl://vishnu-1tu-test.servicebus.windows.net:9093/0: Topic TestTopic [1] in state active at offset 0 (leader epoch 0) (2522/100000 msgs, 10711/10240 kb queued, opv 4) is not fetchable: queued.max.messages.kbytes exceeded
9/23/2023, 6:12:00.223 AM kafka-eventhub-test-0.(none) Consumer Log Handler : DEBUG|23-09-2023 06:12:00.2238018|FETCH|derotest-bug-23092023-4638310463137519410#consumer-12|[thrd:sasl_ssl://vishnu-1tu-test.servicebus.windows.net:9093/boot]: sasl_ssl://vishnu-1tu-test.servicebus.windows.net:9093/0: Topic TestTopic [2] in state active at offset 0 (leader epoch 0) (2513/100000 msgs, 10672/10240 kb queued, opv 4) is not fetchable: queued.max.messages.kbytes exceeded
9/23/2023, 6:12:08.062 AM kafka-eventhub-test-0.(none) Consumer Log Handler : DEBUG|23-09-2023 06:12:08.0628556|FETCH|derotest-bug-23092023-4638310463137519149#consumer-10|[thrd:sasl_ssl://vishnu-1tu-test.servicebus.windows.net:9093/boot]: sasl_ssl://vishnu-1tu-test.servicebus.windows.net:9093/0: Topic TestTopic [0] in state active at offset 500 (leader epoch 0) (2366/100000 msgs, 10048/10240 kb queued, opv 6) is not fetchable: queued.max.messages.kbytes exceeded
Eventhough i have set QueuedMaxMessagesKbytes as 10240 how it pulled more than threshold. in this case its getting purged?
Hi @mhowlett & @edenhill ,
I have couple of question.
-
Once Consume() method calls with either cancellation token or timeout >0 will it start the message pooling by background thread from the broker and buffer it to the local queue? If yes, do we have any option to control it?
-
With in a infinite while loop, the first call of Consume method either with >0 timeout or CancellationToken and afterwards if I use Consume(TimeSpan.Zero) will it stop the message polling by the background thread?
-
If I pause the consumer by passing the Pause() method, will it stop the background thread to poll the message from the broker? I am aware that pausing the consumer will purge the local queue.