confluent-kafka-dotnet icon indicating copy to clipboard operation
confluent-kafka-dotnet copied to clipboard

Confluent.Kafka.KafkaException: 'Local: Queue full' when running BeginProduce example

Open gmeena opened this issue 6 years ago • 13 comments

Description

I am getting error "Confluent.Kafka.KafkaException: 'Local: Queue full'" when trying to run the Basic Producer Examples (with p.BeginProduce). all settings are default.

It works till I set for loop at 100k, after that it start throwing this error.

How to reproduce

  1. Install Kafka & zookeeper
  2. Start servers
  3. Produce message to topic 'test' using simple producer example and consume through kafka-console-consumer.

Checklist

Please provide the following information:

  • [ ] Confluent.Kafka nuget version:
  • [ ] Apache Kafka version: 2.1.0
  • [ ] Client configuration:
  • [ ] Operating system: Window 10, 16 GB RAM, 64 bit OS
  • [ ] Provide logs (with "debug" : "..." as necessary in configuration)
  • [ ] Provide broker log excerpts
  • [ ] Critical issue

gmeena avatar Dec 15 '18 18:12 gmeena

Can you increase the messagetimeout and the retries to higher value?

srinathrangaramanujam avatar Dec 18 '18 09:12 srinathrangaramanujam

you're trying to send messages faster than librdkafka can get them delivered to kafka and the local queue is filling up. you can increase the queue size (queue.buffering.max.messages / queue.buffering.max.kbytes), but if you sustain the the same rate of producing (i.e. as fast as you can), the local queue is still going to fill up, just a bit later.

what you should do if you get this exception is catch it, then wait for some time before continuing. if you get the exception, you can be sure the message will not be sent to kafka, so you can be confident that re-trying will not result in duplicate messages (due to this exception at least).

marked as an enhancement as a reminder to note this in the readme.

mhowlett avatar Dec 18 '18 16:12 mhowlett

And you probably want to set linger.ms to 100 (or so, try different values) to promote batching (== increased performance)

edenhill avatar Dec 18 '18 16:12 edenhill

yes, good point. i've found linger.ms = 5 to be a good general purpose setting - you get nearly the max throughput but minimal latency impact.

mhowlett avatar Dec 18 '18 17:12 mhowlett

I encountered similar error while using confluent-kafka-python. I inject network fault in my test, with a packet loss over 10% and when the error 'BufferError: Local: Queue full' prompts, most of the messages are lost. Can this be solved by adjusting the parameters? The network condition needs to be checked, I suppose.

woohan avatar Oct 14 '19 16:10 woohan

Is there a way to get the current queue usage? I am producing messages from Flask requests, and would be nice to block and flush when the queue is full instead of getting errors. Flushing at every write seems too slow.

Currently I'm doing this:


 try:
        producer.produce(
            topic=my_topic,
            value=data
        )
 except BufferError:
        logger.warning('Buffer error, the queue must be full! Flushing...')
        producer.flush()

        logger.info('Queue flushed, will write the message again')
        producer.produce(
            topic=my_topic,
            value=data,
         )

jacopofar avatar Jan 13 '20 11:01 jacopofar

are you calling producer.poll regularly? in python you need to do this (this is automatic in .net), otherwise the internal queue will fill up with delivery reports, and they'll never get removed. you shouldn't generally call flush except on shutdown.

assuming you are doing this, it's intended that when you get a Queue full error, you should just wait a bit (no need to call flush) and try again. you should also configure the queue size so that this is not expected under normal operation.

the statistics callback will tell you internal queue sizes i believe.

mhowlett avatar Jan 13 '20 17:01 mhowlett

I'm not, the documentation says "Polls the producer for events" but I don't understand what it means.

I expect events to come only from the consumer, does it refer to "acknowledge" events?

jacopofar avatar Jan 13 '20 21:01 jacopofar

callbacks (delivery notification, global error notification, statistics) are called as a side effect to calling poll. more info is in this blog post: https://www.confluent.io/blog/kafka-python-asyncio-integration/

each pending callback has an associated event inside librdkafka, and these will accumulate if poll is not being called.

mhowlett avatar Jan 13 '20 21:01 mhowlett

Also I noticed only now that this is the issue tracker for the dotnet library -_-

jacopofar avatar Jan 14 '20 09:01 jacopofar

Thanks for the advice. Yes I did with the poll() method in the producer. The code is simplified as follows:

for i in range(totalMsgNumber):
    msg = str('some message contents...')
    try:
         producer.poll(0)
         producer.produce(topicName, bytes(msg, 'utf-8'))
    except BufferError as bfer:
         logger2.error(bfer)
         producer.poll(0.1)
producer.flush()

Say if I set totalMsgNumber=10000, will the producer accumulate all the messages in the buffer and flush them after the loop ends? I just want to emulate the scenario that producer send messages continuously.

woohan avatar Jan 14 '20 18:01 woohan

as soon as you call produce, messages are queued to be sent to the broker - and this will happen automatically. In practice, messages will be sent the the broker almost immediately with default settings, though you can control this with linger.ms for better batching (performance under high load).

you only need to call poll to service delivery notifications. messages will get through to the broker even if you don't call this. however, if you don't call it, the internal queue will eventually fill up, and you'll be unable to send messages due to that.

mhowlett avatar Jan 14 '20 18:01 mhowlett

And, i have a question, can increasing the number of partitions avoid the problem of queue full?

cvenwu avatar Aug 22 '22 12:08 cvenwu