faust icon indicating copy to clipboard operation
faust copied to clipboard

Crash and duplicated events on rebalance with processing_guarantee="exactly_once"

Open sauerburger opened this issue 2 years ago • 3 comments

Checklist

  • [x] I have included information about relevant versions
  • [x] I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

I have one faust app that sends enumerated tick messages to a topic (test.faust.eo.ticks) with 10 partitions in a Kafka cluster with 3 brokers.

A second set of apps/agents process the ticks messages. For the sake of this example, the agent only adds metadata about the agent (process number, hostname, and timestamp) and sends the augmented message to another topic (test.faust.eo.pipe). The workers are deployed to Kubernetes, such that I can easily scale up and down the number of workers.

BROKER = "kafka://" + os.environ["KAFKA_BOOTSTRAP_SERVER"]

app = faust.App("faust-exactlyonce-repeater",
                broker=BROKER,
                store='rocksdb://',
                processing_guarantee="exactly_once")

topic = app.topic("test.faust.eo.ticks",
                  value_serializer='json',
                  key_serializer='raw')

repeat_topic = app.topic("test.faust.eo.pipe",
                         value_serializer='json',
                         key_serializer='raw')

pid = os.getpid()
hostname = socket.gethostname()

@app.agent(topic, sinks=[repeat_topic])
async def repeater(ticks):
    async for key, tick in ticks.items():
        tick["pid"] = pid
        tick["hostname"] = hostname
        tick["time"] = datetime.utcnow().isoformat()
        await repeat_topic.send(value=tick, key=key)

In the last stage, an app reads the augmented topic (test.faust.eo.pipe) and prints the contents to the terminal.

All apps use processing_guarantee="exactly_once". To trigger the issue, I scale the number of nodes in the second stage up or down.

Expected behavior

No duplicated messages in test.faust.eo.pipe when worker nodes join or leave during a rebalance. Worker nodes not manually killed should not crash after the rebalance.

Actual behavior

After the rebalance, I see duplicated messages in the test.faust.eo.pipe topic printed by the logger stage. For example, here id=27 occurs twice.

[2022-07-02 16:10:49,681] [1] [WARNING] {'key': '3', 'id': 23, 'pid': 1, 'hostname': 'faust-repeater-5d8559c5d4-kdlnj', 'time': '2022-07-02T16:10:49.661269'}  
[2022-07-02 16:10:52,667] [1] [WARNING] {'key': '4', 'id': 24, 'pid': 1, 'hostname': 'faust-repeater-5d8559c5d4-kdlnj', 'time': '2022-07-02T16:10:52.658649'}  
[2022-07-02 16:10:55,690] [1] [WARNING] {'key': '5', 'id': 25, 'pid': 1, 'hostname': 'faust-repeater-5d8559c5d4-kdlnj', 'time': '2022-07-02T16:10:55.670966'}  
[2022-07-02 16:10:58,884] [1] [WARNING] {'key': '6', 'id': 26, 'pid': 1, 'hostname': 'faust-repeater-5d8559c5d4-vvjj7', 'time': '2022-07-02T16:10:58.867819'}  
[2022-07-02 16:11:01,679] [1] [WARNING] {'key': '7', 'id': 27, 'pid': 1, 'hostname': 'faust-repeater-5d8559c5d4-vvjj7', 'time': '2022-07-02T16:11:01.671352'}  
[2022-07-02 16:11:03,188] [1] [WARNING] {'key': '7', 'id': 27, 'pid': 1, 'hostname': 'faust-repeater-5d8559c5d4-vvjj7', 'time': '2022-07-02T16:11:03.177538'}  
[2022-07-02 16:11:04,694] [1] [WARNING] {'key': '8', 'id': 28, 'pid': 1, 'hostname': 'faust-repeater-5d8559c5d4-vvjj7', 'time': '2022-07-02T16:11:04.676443'}  
[2022-07-02 16:11:07,695] [1] [WARNING] {'key': '9', 'id': 29, 'pid': 1, 'hostname': 'faust-repeater-5d8559c5d4-vvjj7', 'time': '2022-07-02T16:11:07.678648'}

The duplicated messages are accompanied by error messages from the worker followed by a crash.

[2022-07-02 16:11:02,703] [1] [WARNING] Heartbeat failed for group faust-exactlyonce-repeater because it is rebalancing 
[2022-07-02 16:11:02,764] [1] [WARNING] Exception in begin_transaction for transaction faust-exactlyonce-repeater-0-0 exception Invalid state transition TransactionState.IN_TRANSACTION -> TransactionState.IN_TRANSACTION 
[2022-07-02 16:11:02,767] [1] [WARNING] Exception in begin_transaction for transaction faust-exactlyonce-repeater-1-0 exception Invalid state transition TransactionState.IN_TRANSACTION -> TransactionState.IN_TRANSACTION 
[2022-07-02 16:11:02,775] [1] [WARNING] Exception in begin_transaction for transaction faust-exactlyonce-repeater-1-1 exception Invalid state transition TransactionState.IN_TRANSACTION -> TransactionState.IN_TRANSACTION 
[2022-07-02 16:11:02,778] [1] [WARNING] Exception in begin_transaction for transaction faust-exactlyonce-repeater-1-2 exception Invalid state transition TransactionState.IN_TRANSACTION -> TransactionState.IN_TRANSACTION 
[2022-07-02 16:11:02,784] [1] [WARNING] Exception in begin_transaction for transaction faust-exactlyonce-repeater-1-3 exception Invalid state transition TransactionState.IN_TRANSACTION -> TransactionState.IN_TRANSACTION 
[2022-07-02 16:11:02,786] [1] [WARNING] Exception in begin_transaction for transaction faust-exactlyonce-repeater-1-4 exception Invalid state transition TransactionState.IN_TRANSACTION -> TransactionState.IN_TRANSACTION 
[2022-07-02 16:11:02,792] [1] [WARNING] Exception in begin_transaction for transaction faust-exactlyonce-repeater-1-5 exception Invalid state transition TransactionState.IN_TRANSACTION -> TransactionState.IN_TRANSACTION 
[2022-07-02 16:11:02,794] [1] [WARNING] Exception in begin_transaction for transaction faust-exactlyonce-repeater-1-6 exception Invalid state transition TransactionState.IN_TRANSACTION -> TransactionState.IN_TRANSACTION 
[2022-07-02 16:11:02,796] [1] [WARNING] Exception in begin_transaction for transaction faust-exactlyonce-repeater-1-7 exception Invalid state transition TransactionState.IN_TRANSACTION -> TransactionState.IN_TRANSACTION 
[2022-07-02 16:11:02,805] [1] [WARNING] Exception in begin_transaction for transaction faust-exactlyonce-repeater-1-8 exception Invalid state transition TransactionState.IN_TRANSACTION -> TransactionState.IN_TRANSACTION 
[2022-07-02 16:11:02,807] [1] [WARNING] Exception in begin_transaction for transaction faust-exactlyonce-repeater-1-9 exception Invalid state transition TransactionState.IN_TRANSACTION -> TransactionState.IN_TRANSACTION

Full traceback

[2022-07-02 16:11:08,908] [1] [WARNING] ProducerFenced ProducerFenced: There is a newer producer using the same transactional_id ortransaction timeout occurred (check that processing time is below transaction_timeout_ms) 
[2022-07-02 16:11:08,910] [1] [ERROR] [^-App]: Crashed reason=ProducerFenced('There is a newer producer using the same transactional_id ortransaction timeout occurred (check that processing time is below transaction_timeout_ms)') 
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/faust/transport/consumer.py", line 339, in commit
    await producer.commit_transactions(
  File "/usr/local/lib/python3.10/site-packages/faust/transport/drivers/aiokafka.py", line 1211, in commit_transactions
    await transaction_producer.send_offsets_to_transaction(
  File "/usr/local/lib/python3.10/site-packages/aiokafka/producer/producer.py", line 561, in send_offsets_to_transaction
    await asyncio.shield(fut)
  File "/usr/local/lib/python3.10/site-packages/aiokafka/producer/sender.py", line 154, in _sender_routine
    task.result()
  File "/usr/local/lib/python3.10/site-packages/aiokafka/producer/sender.py", line 315, in _do_add_offsets_to_txn
    return (await handler.do(node_id))
  File "/usr/local/lib/python3.10/site-packages/aiokafka/producer/sender.py", line 379, in do
    retry_backoff = self.handle_response(resp)
  File "/usr/local/lib/python3.10/site-packages/aiokafka/producer/sender.py", line 538, in handle_response
    raise ProducerFenced()
aiokafka.errors.ProducerFenced: ProducerFenced: There is a newer producer using the same transactional_id ortransaction timeout occurred (check that processing time is below transaction_timeout_ms)
[2022-07-02 16:11:09,992] [1] [CRITICAL] [^Worker]: We experienced a crash! Reraising original exception... 
 OK ^
[2022-07-02 16:11:10,095] [1] [ERROR] Future exception was never retrieved
future: <Future finished exception=ProducerFenced('There is a newer producer using the same transactional_id ortransaction timeout occurred (check that processing time is below transaction_timeout_ms)')> 
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/mode/worker.py", line 69, in exiting
    yield
  File "/usr/local/lib/python3.10/site-packages/mode/worker.py", line 290, in execute_from_commandline
    self.stop_and_shutdown()
  File "/usr/local/lib/python3.10/site-packages/mode/worker.py", line 302, in stop_and_shutdown
    self._shutdown_loop()
  File "/usr/local/lib/python3.10/site-packages/mode/worker.py", line 331, in _shutdown_loop
    raise self.crash_reason from self.crash_reason
  File "/usr/local/lib/python3.10/site-packages/faust/transport/consumer.py", line 339, in commit
    await producer.commit_transactions(
  File "/usr/local/lib/python3.10/site-packages/faust/transport/drivers/aiokafka.py", line 1211, in commit_transactions
    await transaction_producer.send_offsets_to_transaction(
  File "/usr/local/lib/python3.10/site-packages/aiokafka/producer/producer.py", line 561, in send_offsets_to_transaction
    await asyncio.shield(fut)
  File "/usr/local/lib/python3.10/site-packages/aiokafka/producer/sender.py", line 154, in _sender_routine
    task.result()
  File "/usr/local/lib/python3.10/site-packages/aiokafka/producer/sender.py", line 315, in _do_add_offsets_to_txn
    return (await handler.do(node_id))
  File "/usr/local/lib/python3.10/site-packages/aiokafka/producer/sender.py", line 379, in do
    retry_backoff = self.handle_response(resp)
  File "/usr/local/lib/python3.10/site-packages/aiokafka/producer/sender.py", line 538, in handle_response
    raise ProducerFenced()
aiokafka.errors.ProducerFenced: ProducerFenced: There is a newer producer using the same transactional_id ortransaction timeout occurred (check that processing time is below transaction_timeout_ms)

Versions

  • Python version: 3.10.1
  • Faust version: 0.8.5 and master (db6a3ae28ace1112132ef5dc07f4cf50afcdc427)
  • Operating system: Python 3.10.1 docker image, i.e. Debian 11.2
  • Kafka version: 3.1.0 (on Kubernetes with Strimzi 0.28.0)
  • RocksDB version (if applicable)

sauerburger avatar Jul 02 '22 16:07 sauerburger

I cannot claim to understand the internal workings of faust, but my suspicion is that the issue is caused by stale transactions started before the rebalance clashing with transaction ids from other workers.

In transport/consumer.py, I've spotted

transactional_id_format = "{group_id}-{tpg.group}-{tpg.partition}"

(in two places). The transaction names are unique across a balanced set of workers (due to the unique partition assignment), but upon reassignment of a partition, two workers might try to use the same transaction id.

I've locally changed the partition id format to include the hostname and the process id such that the transaction id is unique across all workers. With this change, I could prevent the crash of the worker due to the ProducerFenced exception. However, the Exception in begin_transaction ... warnings persist and duplicated events appear in the logging output. Does this mean, that a worker processes an input event, writes it to the topic within a transaction that fails to commit due to a rebalance. After the rebalance, the same transaction id is reused (causing the warning about the illegal transition), the event is processed a second time, and the the final commit action includes the output event twice?

sauerburger avatar Jul 03 '22 11:07 sauerburger

I would not rely on exactly_once, but instead rely on at least once semantics, the default, and make your application process messages indempotently.

While there is an implementation of exactly_once it's incorrect as far as I know.

jkgenser avatar Jul 03 '22 14:07 jkgenser

Thanks @jkgenser for the reply. This was however not the answer I was hoping for :D

Besides the issue with the transactional_id, exactly-once semantics with dynamic partition assignment seems to be broken already at the aiokafka level.

sauerburger avatar Jul 14 '22 21:07 sauerburger