aiokafka icon indicating copy to clipboard operation
aiokafka copied to clipboard

KeyError when trying to commit offsets using Transactional Producer

Open marcosschroh opened this issue 8 months ago • 2 comments

Describe the bug When trying to commit offsets with a transactional producer doing await producer.send_offsets_to_transaction(offsets, group_id) there is a sort of race condition where it seems that two background tasks are trying to do the same.

In my application I am using always the same transactional producer with always the same transactional_id. The commit is being perform one by one rather than in batches.

Traceback

03/11/2025 02:48:03 PM Unexpected error in sender routine
Traceback (most recent call last):
  File "/examples/transactions/.venv/lib/python3.12/site-packages/aiokafka/producer/sender.py", line 176, in _sender_routine
    task.result()
  File "\examples/transactions/.venv/lib/python3.12/site-packages/aiokafka/producer/sender.py", line 367, in _do_txn_offset_commit
    await handler.do(node_id)
  File "/examples/transactions/.venv/lib/python3.12/site-packages/aiokafka/producer/sender.py", line 413, in do
    retry_backoff = self.handle_response(resp)
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/examples/transactions/.venv/lib/python3.12/site-packages/aiokafka/producer/sender.py", line 643, in handle_response
    offset = self._offsets[tp].offset
             ~~~~~~~~~~~~~^^^^
KeyError: TopicPartition(topic='local--kstreams-json', partition=0)

I have added some logs in Sender.py module and I can see that Sender._do_txn_offset_commit is being called twice, trying to commit the same offset twice for the TopicPartition (in this example offset 62). As you can see, offsets is a dict with id 4345275008 which is shared by the 2 tasks.

The first task calls TxnOffsetCommitHandler.handle_response which works fine, then it is called again but then the offsets dict is empty, which causes the code to crash when it does offset = self._offsets[tp].offset.

Beging Sender._do_txn_offset_commit with offsets {TopicPartition(topic='local--kstreams-json', partition=0): OffsetAndMetadata(offset=62, metadata='')}. Dict Offsets id 4345275008 

Ending Sender._do_txn_offset_commit with offsets {TopicPartition(topic='local--kstreams-json', partition=0): OffsetAndMetadata(offset=62, metadata='')}. Dict Offsets id 4345275008 

Beging Sender._do_txn_offset_commit with offsets {TopicPartition(topic='local--kstreams-json', partition=0): OffsetAndMetadata(offset=62, metadata='')}. Dict Offsets id 4345275008 

Ending Sender._do_txn_offset_commit with offsets {TopicPartition(topic='local--kstreams-json', partition=0): OffsetAndMetadata(offset=62, metadata='')}. Dict Offsets id 4345275008 

Handle response in TxnOffsetCommitHandler.handle_response for offsets {TopicPartition(topic='local--kstreams-json', partition=0): OffsetAndMetadata(offset=62, metadata='')}. Dict Offsets id 4345275008 

03/11/2025 03:00:00 PM Commiting offsets {TopicPartition(topic='local--kstreams-json', partition=0): 62} for group my-group
Handle response in TxnOffsetCommitHandler.handle_response for offsets {}. Offsets id 4345275008 

03/11/2025 03:00:00 PM Unexpected error in sender routine
Traceback (most recent call last):
  File "/Users/marcosschroh/Projects/kstreams/examples/transactions/.venv/lib/python3.12/site-packages/aiokafka/producer/sender.py", line 176, in _sender_routine
    task.result()
  File "/Users/marcosschroh/Projects/kstreams/examples/transactions/.venv/lib/python3.12/site-packages/aiokafka/producer/sender.py", line 366, in _do_txn_offset_commit
    await handler.do(node_id)
  File "/Users/marcosschroh/Projects/kstreams/examples/transactions/.venv/lib/python3.12/site-packages/aiokafka/producer/sender.py", line 412, in do
    retry_backoff = self.handle_response(resp)
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/marcosschroh/Projects/kstreams/examples/transactions/.venv/lib/python3.12/site-packages/aiokafka/producer/sender.py", line 642, in handle_response
    offset = self._offsets[tp].offset
             ~~~~~~~~~~~~~^^^^

Expected behaviour The offset commit with a transactional producer should always work and wait for previous commit to finish in order to start a new one.

Environment (please complete the following information):

  • aiokafka version: 0.12.0
  • Kafka Broker version: 3.7

marcosschroh avatar Mar 11 '25 14:03 marcosschroh

Related to https://github.com/aio-libs/aiokafka/issues/1004 and https://github.com/aio-libs/aiokafka/issues/920

marcosschroh avatar Mar 11 '25 14:03 marcosschroh

After more debugging I can see that there are orphan asyncio Tasks in Sender which it makes prone to race condition and unexpected behaviors.

The _maybe_do_transactional_request is added as a task, then it creates 4 other asyncio.Tasks that are never awaited, there are not references to them which means that it is not possible to determine the result of them. This scenario is perfect for race conditions as they trying to manipulate shared resources.

There is a kind of lock for _maybe_do_transactional_request but this is not enough as previous _maybe_do_transactional_request have created orphan tasks that are running in the background trying to do the same thing. A proper way to solve this problem is using Structured concurrency or saving tasks references.

class Sender:
    def __init__(...):
        self.offset_commit_task: asyncio.Task = None

     def _maybe_do_transactional_request(self):
        txn_manager = self._txn_manager
        ....
        
        # check the task reference in line 321
        if commit_data is not None and (self.offset_commit_task is None or self.offset_commit_task.done()):  
            offsets, group_id = commit_data
            self.offset_commit_task = create_task(self._do_txn_offset_commit(offsets, group_id))
            return self.offset_commit_task
        ...

This is not a critic, but It seems to me that a lot of this code is based on a java approach that should be refactored to a proper python way with proper asyncio.

Any thoughts?

marcosschroh avatar Mar 12 '25 14:03 marcosschroh