aiokafka
aiokafka copied to clipboard
Application crash when producer fails to commit offsets using EXACTLY_ONCE semantics
The agent crashes when the producer fails to commit transactions. I am using faust 1.8.1, aiokafka 1.0.6, python 3.7 and kafka 1.1
Traceback (most recent call last):
File "/pyenv/versions/putmodengine/lib/python3.7/site-packages/aiokafka/producer/sender.py", line 167, in _sender_routine
task.result()
File "/pyenv/versions/putmodengine/lib/python3.7/site-packages/aiokafka/producer/sender.py", line 342, in _do_add_offsets_to_txn
return (yield from handler.do(node_id))
File "/pyenv/versions/putmodengine/lib/python3.7/site-packages/aiokafka/producer/sender.py", line 404, in do
node_id, req, group=self.group)
File "/pyenv/versions/putmodengine/lib/python3.7/site-packages/aiokafka/client.py", line 479, in send
if not (yield from self.ready(node_id, group=group)):
File "/pyenv/versions/putmodengine/lib/python3.7/site-packages/aiokafka/client.py", line 457, in ready
conn = yield from self._get_conn(node_id, group=group)
File "/pyenv/versions/putmodengine/lib/python3.7/site-packages/aiokafka/client.py", line 417, in _get_conn
assert broker is not None
AssertionError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/pyenv/versions/putmodengine/lib/python3.7/site-packages/mode/services.py", line 770, in _execute_task
await task
File "/pyenv/versions/putmodengine/lib/python3.7/site-packages/faust/transport/consumer.py", line 778, in _commit_handler
await self.commit()
File "/pyenv/versions/putmodengine/lib/python3.7/site-packages/faust/transport/consumer.py", line 812, in commit
start_new_transaction=start_new_transaction,
File "/pyenv/versions/putmodengine/lib/python3.7/site-packages/mode/services.py", line 456, in _and_transition
return await fun(self, *args, **kwargs)
File "/pyenv/versions/putmodengine/lib/python3.7/site-packages/faust/transport/consumer.py", line 846, in force_commit
commit_tps, start_new_transaction=start_new_transaction)
File "/pyenv/versions/putmodengine/lib/python3.7/site-packages/faust/transport/consumer.py", line 864, in _commit_tps
start_new_transaction=start_new_transaction)
File "/pyenv/versions/putmodengine/lib/python3.7/site-packages/faust/transport/consumer.py", line 928, in _commit_offsets
start_new_transaction=start_new_transaction,
File "/pyenv/versions/putmodengine/lib/python3.7/site-packages/faust/transport/consumer.py", line 314, in commit
start_new_transaction=start_new_transaction,
File "/pyenv/versions/putmodengine/lib/python3.7/site-packages/faust/transport/drivers/aiokafka.py", line 581, in commit_transactions
start_new_transaction=start_new_transaction,
File "/pyenv/versions/putmodengine/lib/python3.7/site-packages/aiokafka/producer/producer.py", line 748, in commit
start_new_transaction=start_new_transaction,
File "/pyenv/versions/putmodengine/lib/python3.7/site-packages/aiokafka/producer/producer.py", line 758, in _commit
transactional_id, offsets, group_id)
File "/pyenv/versions/putmodengine/lib/python3.7/site-packages/aiokafka/producer/producer.py", line 950, in send_offsets_to_transaction
yield from asyncio.shield(fut, loop=self._loop)
File "/pyenv/versions/putmodengine/lib/python3.7/site-packages/aiokafka/producer/sender.py", line 184, in _sender_routine
raise KafkaError("Unexpected error during batch delivery")
kafka.errors.KafkaError: KafkaError: Unexpected error during batch delivery```