librdkafka
librdkafka copied to clipboard
A single slow-io-broker could block us from publishing messages towards all other brokers!
We noticed one serious problem during the past few days, -- a single slow-io-broker could block us publishing messages towards all other brokers!
Here is a brief example shows how it happens,
-
The Kafka cluster has 5 brokers, - broker1, broker2, broker3, broker4, broker5 (we suppose broker5 is with a slow disk).
-
Use 1 topic/5partitions, with replication_factor=3. (configure kafka with
min.insync.replicas=2) -
The replicas might be,
-
partition1: leader[broker1], replicas[broker1, broker2, broker3], isr[broker1, broker2, broker3],
-
partition2: leader[broker2], replicas[broker2, broker3, broker3], isr[broker2, broker3, broker3]
-
partition3: leader[broker3], replicas[broker3, broker4, broker5], isr[broker3, broker4]
-
partition4: leader[broker4], replicas[broker4, broker5, broker1], isr[broker4, broker1]
-
partition5: leader[broker5], replicas[broker5, broker1, broker2], isr[broker5, broker1, broker2]
-
-
Then we keep publishing messages towards these 5 partitions. Since the leader of partition5 (i.e, broker5) is quite slower than others, eventually, all the messages in the buffering queue are for partition5! Unfortunately, there're only some 'global' limits (ie,
queue.buffering.max.messages/queue.buffering.max.kbytes) for it, thus the producer would be blocked, unable to send messages towards other partitions as well, -- i.e, a single slow broker would slow down publishing towards all other brokers as well!
So, could we get some configuration to limit the buffering queue per broker(or, partition)?
So you would like something like partition.queue.buffering.max.messages?
Or perhaps a percentage of how much of the total queue size a single partition queue may occupy?
So you would like something like
partition.queue.buffering.max.messages? Or perhaps a percentage of how much of the total queue size a single partition queue may occupy?
Yes, exactly, -- either per broker or per broker limits would work! 😃
Besides, whether choosing percentage of how much of the total queue size a single partition queue may occupy or partition.queue.buffering.max.messages doesn't that matter, -- they are equivalent, since we could calculate with one for the other, right?
Queues are per partition, so per broker would not work.
Having partition.queue.bufferingmax.messages/bytes is the easiest since it doesn't require synchronization with the global queue counters.
There is going to be one gotcha though, any partitioned messages produced before the number of partitions are known, and thus not enqueued on the partition queue, will not be accounted for and thus may exceed the configured per-partition limit.
This should be okay since that only happens on initial startup.
Yes, not necessary to make things complicated, a simple solution with partition.queue.bufferingmax.messages/bytes configuration available would be the best. 🥂