faust icon indicating copy to clipboard operation
faust copied to clipboard

Offset committed for second message when first is still being processed

Open bohdantan opened this issue 5 years ago • 4 comments

Checklist

  • [x] I have included information about relevant versions
  • [x] I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

  • Create new python 3.8 virtualenv
  • Install Faust pip install git+https://github.com/robinhood/faust@b6ebdd955c1de5ceed81f53ade76b4b04561151e
  • Create worker with agent (with 2 actors) and with task that adds messages to topic. Add monitor that prints offset commits. Save it to example.py
import asyncio

import faust
from faust.sensors.monitor import Monitor


class PrintCommitMonitor(Monitor):
    def on_tp_commit(self, tp_offsets):
        print(f'Commit offsets {tp_offsets}')


app = faust.App(
    'test-app',
    broker='kafka://localhost:9092',
    value_serializer='raw',
    Monitor=PrintCommitMonitor,
)

test_topic = app.topic('test')


@app.agent(test_topic, concurrency=2)
async def process(stream):
    async for msg in stream:
        print(f'Start  processing {msg} - by {stream.concurrency_index}')
        if msg == b'Message i=0':
            await asyncio.sleep(20)
        else:
            await asyncio.sleep(3)
        print(f'Finish processing {msg} - by {stream.concurrency_index}')


@app.task(on_leader=True)
async def on_started():
    print('Add messages')
    for i in range(10):
        msg = f'Message i={i}'
        await test_topic.send(key=msg, value=msg)
  • Start worker faust -A example worker -l info --without-web. Worker will create kafka topic. Task will write 10 messages to topic
  • Two actors will start processing messages. First actor will take message with offset 0, second actor will take message with offset 1
  • After second actor will finish processing message with offset 1 (but before first actor will finish processing message with offset 0) offset 1 will be committed

Expected behavior

Worker commits offset only after all messages with smaller offsets are processed.

Actual behavior

Worker commits offset 1 after message with offset is processed by one actor, but message with offset 0 is still being processed by another actor.

Versions

  • Python version - 3.8.6
  • Faust version - git+https://github.com/robinhood/faust@b6ebdd955c1de5ceed81f53ade76b4b04561151e
  • Operating system - Ubuntu 20.04.1
  • Kafka version - 2.4.1

bohdantan avatar Nov 04 '20 10:11 bohdantan

The concurrency processes messages out of order. You wont see this behavior with concurrency=1

patkivikram avatar Nov 14 '20 23:11 patkivikram

I was also curious about this behavior. I was expecting messages to be processed out of order, but offsets to be committed in order. The current design could clearly lead to data loss in the case of a failure and restart.

ghost avatar Nov 16 '20 02:11 ghost

if a service crash then restart; the tp offset was not the message witch made agent crash

zailaib avatar Dec 15 '20 11:12 zailaib

this is a very important issue on faust's concurrency: even if the logic for the event process is completely build with out-of-order in mind, the out-of-order commit-offset will cause loss of data when consumer process is interrupted.

wsmlby avatar Aug 03 '22 21:08 wsmlby