aiokafka icon indicating copy to clipboard operation
aiokafka copied to clipboard

Application crash when producer fails to commit offsets using EXACTLY_ONCE semantics

Open patkivikram opened this issue 4 years ago • 0 comments

The agent crashes when the producer fails to commit transactions. I am using faust 1.8.1, aiokafka 1.0.6, python 3.7 and kafka 1.1

Traceback (most recent call last):
  File "/pyenv/versions/putmodengine/lib/python3.7/site-packages/aiokafka/producer/sender.py", line 167, in _sender_routine
    task.result()
  File "/pyenv/versions/putmodengine/lib/python3.7/site-packages/aiokafka/producer/sender.py", line 342, in _do_add_offsets_to_txn
    return (yield from handler.do(node_id))
  File "/pyenv/versions/putmodengine/lib/python3.7/site-packages/aiokafka/producer/sender.py", line 404, in do
    node_id, req, group=self.group)
  File "/pyenv/versions/putmodengine/lib/python3.7/site-packages/aiokafka/client.py", line 479, in send
    if not (yield from self.ready(node_id, group=group)):
  File "/pyenv/versions/putmodengine/lib/python3.7/site-packages/aiokafka/client.py", line 457, in ready
    conn = yield from self._get_conn(node_id, group=group)
  File "/pyenv/versions/putmodengine/lib/python3.7/site-packages/aiokafka/client.py", line 417, in _get_conn
    assert broker is not None
AssertionError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/pyenv/versions/putmodengine/lib/python3.7/site-packages/mode/services.py", line 770, in _execute_task
    await task
  File "/pyenv/versions/putmodengine/lib/python3.7/site-packages/faust/transport/consumer.py", line 778, in _commit_handler
    await self.commit()
  File "/pyenv/versions/putmodengine/lib/python3.7/site-packages/faust/transport/consumer.py", line 812, in commit
    start_new_transaction=start_new_transaction,
  File "/pyenv/versions/putmodengine/lib/python3.7/site-packages/mode/services.py", line 456, in _and_transition
    return await fun(self, *args, **kwargs)
  File "/pyenv/versions/putmodengine/lib/python3.7/site-packages/faust/transport/consumer.py", line 846, in force_commit
    commit_tps, start_new_transaction=start_new_transaction)
  File "/pyenv/versions/putmodengine/lib/python3.7/site-packages/faust/transport/consumer.py", line 864, in _commit_tps
    start_new_transaction=start_new_transaction)
  File "/pyenv/versions/putmodengine/lib/python3.7/site-packages/faust/transport/consumer.py", line 928, in _commit_offsets
    start_new_transaction=start_new_transaction,
  File "/pyenv/versions/putmodengine/lib/python3.7/site-packages/faust/transport/consumer.py", line 314, in commit
    start_new_transaction=start_new_transaction,
  File "/pyenv/versions/putmodengine/lib/python3.7/site-packages/faust/transport/drivers/aiokafka.py", line 581, in commit_transactions
    start_new_transaction=start_new_transaction,
  File "/pyenv/versions/putmodengine/lib/python3.7/site-packages/aiokafka/producer/producer.py", line 748, in commit
    start_new_transaction=start_new_transaction,
  File "/pyenv/versions/putmodengine/lib/python3.7/site-packages/aiokafka/producer/producer.py", line 758, in _commit
    transactional_id, offsets, group_id)
  File "/pyenv/versions/putmodengine/lib/python3.7/site-packages/aiokafka/producer/producer.py", line 950, in send_offsets_to_transaction
    yield from asyncio.shield(fut, loop=self._loop)
  File "/pyenv/versions/putmodengine/lib/python3.7/site-packages/aiokafka/producer/sender.py", line 184, in _sender_routine
    raise KafkaError("Unexpected error during batch delivery")
kafka.errors.KafkaError: KafkaError: Unexpected error during batch delivery```

patkivikram avatar Oct 24 '19 15:10 patkivikram