librdkafka icon indicating copy to clipboard operation
librdkafka copied to clipboard

A single slow-io-broker could block us from publishing messages towards all other brokers!

Open kennethjia opened this issue 5 years ago • 4 comments

We noticed one serious problem during the past few days, -- a single slow-io-broker could block us publishing messages towards all other brokers!

Here is a brief example shows how it happens,

  1. The Kafka cluster has 5 brokers, - broker1, broker2, broker3, broker4, broker5 (we suppose broker5 is with a slow disk).

  2. Use 1 topic/5partitions, with replication_factor=3. (configure kafka with min.insync.replicas=2)

  3. The replicas might be,

    • partition1: leader[broker1], replicas[broker1, broker2, broker3], isr[broker1, broker2, broker3],

    • partition2: leader[broker2], replicas[broker2, broker3, broker3], isr[broker2, broker3, broker3]

    • partition3: leader[broker3], replicas[broker3, broker4, broker5], isr[broker3, broker4]

    • partition4: leader[broker4], replicas[broker4, broker5, broker1], isr[broker4, broker1]

    • partition5: leader[broker5], replicas[broker5, broker1, broker2], isr[broker5, broker1, broker2]

  4. Then we keep publishing messages towards these 5 partitions. Since the leader of partition5 (i.e, broker5) is quite slower than others, eventually, all the messages in the buffering queue are for partition5! Unfortunately, there're only some 'global' limits (ie, queue.buffering.max.messages/queue.buffering.max.kbytes ) for it, thus the producer would be blocked, unable to send messages towards other partitions as well, -- i.e, a single slow broker would slow down publishing towards all other brokers as well!

So, could we get some configuration to limit the buffering queue per broker(or, partition)?

kennethjia avatar Jan 08 '20 02:01 kennethjia

So you would like something like partition.queue.buffering.max.messages? Or perhaps a percentage of how much of the total queue size a single partition queue may occupy?

edenhill avatar Feb 06 '20 09:02 edenhill

So you would like something like partition.queue.buffering.max.messages? Or perhaps a percentage of how much of the total queue size a single partition queue may occupy?

Yes, exactly, -- either per broker or per broker limits would work! 😃 Besides, whether choosing percentage of how much of the total queue size a single partition queue may occupy or partition.queue.buffering.max.messages doesn't that matter, -- they are equivalent, since we could calculate with one for the other, right?

kennethjia avatar Feb 06 '20 09:02 kennethjia

Queues are per partition, so per broker would not work.

Having partition.queue.bufferingmax.messages/bytes is the easiest since it doesn't require synchronization with the global queue counters. There is going to be one gotcha though, any partitioned messages produced before the number of partitions are known, and thus not enqueued on the partition queue, will not be accounted for and thus may exceed the configured per-partition limit. This should be okay since that only happens on initial startup.

edenhill avatar Feb 06 '20 09:02 edenhill

Yes, not necessary to make things complicated, a simple solution with partition.queue.bufferingmax.messages/bytes configuration available would be the best. 🥂

kennethjia avatar Feb 06 '20 09:02 kennethjia