faust
faust copied to clipboard
faust.exceptions.ProducerSendError: Error while sending: KafkaTimeoutError()
Checklist
- [x] I have included information about relevant versions
- [x] I have verified that the issue persists when using the
master
branch of Faust.
Steps to reproduce
Kafka (bitnami/kafka:2.7.0) config:
MESSAGE_MAX_BYTES=6291456 MAX_REQUEST_SIZE=6291456 MAX_PARTITION_FETCH_BYTES=6291456
Faust app config:
consumer_max_fetch_size=6291456 producer_max_request_size=6291456 producer_request_timeout=300
Prepare Kafka + Zookeeper stack + 2 services:
- Service for downloading html page
- Service for scraping URLs and passing to 1st service
Let start the stack. Produce input messages (such amount it'll run at least for 30 minutes). Messages max size 6MB (for sure bigger than default 1MB).
Expected behavior
Service is operating without any interruption like PrducerSendError
Actual behavior
After run facing
faust.exceptions.ProducerSendError: Error while sending: KafkaTimeoutError()
Full traceback
[2021-08-02 09:16:52,586] [19] [ERROR] Task exception was never retrieved
future: <Task finished name='Task-62580' coro=<Topic.send() done, defined at /usr/local/lib/python3.9/site-packages/faust/topics.py:155> exception=ProducerSendError('Error while sending: KafkaTimeoutError()')>
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/faust/transport/drivers/aiokafka.py", line 1053, in send
return cast(Awaitable[RecordMetadata], await producer.send(
File "/usr/local/lib/python3.9/site-packages/aiokafka/producer/producer.py", line 319, in send
fut = await message_accumulator.add_message(
File "/usr/local/lib/python3.9/site-packages/aiokafka/producer/message_accumulator.py", line 341, in add_message
raise KafkaTimeoutError()
kafka.errors.KafkaTimeoutError: KafkaTimeoutError
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/faust/topics.py", line 184, in send
return await self._send_now(
File "/usr/local/lib/python3.9/site-packages/faust/channels.py", line 300, in _send_now
return await self.publish_message(
File "/usr/local/lib/python3.9/site-packages/faust/topics.py", line 413, in publish_message
fut2 = cast(asyncio.Future, await producer.send(
File "/usr/local/lib/python3.9/site-packages/faust/transport/drivers/aiokafka.py", line 1062, in send
raise ProducerSendError(f'Error while sending: {exc!r}') from exc
faust.exceptions.ProducerSendError: Error while sending: KafkaTimeoutError()
Versions
- Python version 3.9.2
- Faust version: faust[rocksdb]==1.10.4
- Operating system: docker image python:3.9.2-slim-buster
- Kafka version 2.7.0 (Commit:448719dc99a19793)
- RocksDB version (if applicable)
This project appears to have been abandoned.
You might want to check out the fork of this project - https://github.com/faust-streaming/faust
It has a bunch of fixes merged for problems that were in the base project.