pykafka icon indicating copy to clipboard operation
pykafka copied to clipboard

use_rdkafka=True pykafka.exceptions.RdKafkaException: Configuration property "queued.max.messages.kbytes" value 102400000 is outside allowed range 1..2097151

Open mkmoisen opened this issue 5 years ago • 4 comments

PyKafka Version 2.8.0 on Linux librdkafka Version librdkafka1-1.1.0_confluent5.3.1-1.el7.x86_64 on Linux Kafka Version 2.12-2.3.0 on Windows or Linux

Using topic.get_simple_consumer(use_rdkafka=True) results in the following error:

pykafka.exceptions.RdKafkaException: Configuration property "queued.max.messages.kbytes" value 102400000 is outside allowed range 1..2097151

I have tried this with the Kafka broker running on both Windows and on Linux, with the consumer connecting to it from Linux. Both result in this error.

from pykafka import KafkaClient
client = KafkaClient('localhost:9092')
topic = client.topics['test'] 
# pykafka.exceptions.RdKafkaException: Configuration property "queued.max.messages.kbytes" value 102400000 is outside allowed range 1..2097151
consumer = topic.get_simple_consumer(use_rdkafka=True)

I took a look in rdkafka/simple_consumer.py where the configs were set up, and then attempted to instantiate a rdkafka.RdKafkaSimpleConsumer directly, passing in fetch_message_max_bytes.

Unfortunately, there is no legal value. Setting fetch_message_max_bytes=21475 or greater results in: pykafka.exceptions.RdKafkaException: Configuration property "queued.max.messages.kbytes" value 2097167 is outside allowed range 1..2097151

Setting fetch_message_max_bytes=21474 or less than results in:

pykafka.exceptions.RdKafkaException: `receive.message.max.bytes` must be >= `fetch.max.bytes` + 512

And I was not able to find fetch.max.bytes in rdkafka/simple_consumer.py (even though fetch.min.bytes is available)

from pykafka import KafkaClient, rdkafka
client = KafkaClient('localhost:9092')
topic = client.topics['test'] 
# pykafka.exceptions.RdKafkaException: Configuration property "queued.max.messages.kbytes" value 2097167 is outside allowed range 1..2097151
consumer = rdkafka.RdKafkaSimpleConsumer(topic=topic, cluster=topic._cluster, fetch_message_max_bytes=21475)
# pykafka.exceptions.RdKafkaException: `receive.message.max.bytes` must be >= `fetch.max.bytes` + 512
consumer = rdkafka.RdKafkaSimpleConsumer(topic=topic, cluster=topic._cluster, fetch_message_max_bytes=21474)

mkmoisen avatar Sep 15 '19 20:09 mkmoisen

I've also found this issue. Would be great if this could get fixed.

tropas2001 avatar Dec 19 '19 11:12 tropas2001

@tropas2001 I've switched over to confluent_kafka which seems to be better.

mkmoisen avatar Dec 19 '19 20:12 mkmoisen

Thanks, I’ve played with confluence but found it didn’t behave well when trying to integrate with a websocket server. Think there was some going on re: threading. It’s so frustrating pykafka is a great client with some really small niggerly defects.

tropas2001 avatar Dec 19 '19 22:12 tropas2001

Attempting to use librdkafka, because the python-only code has a bug in an edge case (gets stuck in a loop). However, at this point we hit this problem. Planning to switch to confluent kafka as well.

abdullin avatar Feb 04 '20 08:02 abdullin