kafka-connect-storage-cloud
kafka-connect-storage-cloud copied to clipboard
High Consumer Lag of S3 Sink Connector on Single Partition
We are using the Connect S3 sink with 6 tasks distributed on 3 connect instances that each have 6GB of memory of which are 4.5 GB dedicated Java heap.
The connect sink is configured to consume from 200 topics, each having 15 partitions. However, the throughput is very low to moderate and many of the topics do not experience any throughput at all. Furthermore, the sink is configured as follows (omitted irrelevant parts):
"s3.part.size":"5242880",
"rotate.interval.ms":"58000",
"partitioner.class":"io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"partition.duration.ms":"60000"
"rotate.schedule.interval.ms":"43200000",
"flush.size":"1000,",
"timestamp.extractor":"Record",
"errors.tolerance":"all",
for the connect instances
Looking at the max consumer lag per topic among partitions, we can see that most topics are not lagging at all.
max by(topic) (kafka_burrow_current_partition_lag{group=~"our-group.*"})
Except for one topic that has a constant lag (a single partition). Looking into lag distribution within this topic, we can see that there is a single partition causing this lag:
kafka_burrow_current_partition_lag{topic="our-problem-topic",group=~"out-group*"}

In fact, the partitions within this topic are highly unbalanced, since our partner system provides test data with many duplicates of a certain key (all partitions contain approx. 4K records, except partition 4 that holds 60k).
Still, we do not understand, why records for the problematic topics are not flushed. Since have a combined flush criterion (every 12 hours in terms of system time, 1K records in partition buffer or 1 min in terms of event time) that should flush quite rigorosly.
The memory consumption for all instances looks quite similar and is steadily in increasing (up to the point where the heap limit is reached)
(The drop is due to a restart of this instance).
There is also no broken record that would cause the single partition to lag (dead letter queue empty).
Are we missing some aspect here?
Additional side note:
We observed such "irregular" consumption patterns already in past, where had less rigoros flush policies and many half-full buffers were apparently preventing connect to further consume from a topic that was drastically lagging behind.
Generally, it would be desirable to have dynamic buffer sizes that somehow incorporate the lag information while determining buffer size. A topic that heavily lags behind and thus is able to yield high throughput is more likely to fulfill flush criteria in early future. It should be granted a larger buffer while decreasing (possibly unused) buffers.
Even I have irregular consumption of partition, so the lag in certain partitions are more. What is the reason for this? How can we fix this?