Offset committed for second message when first is still being processed
Checklist
- [x] I have included information about relevant versions
- [x] I have verified that the issue persists when using the
masterbranch of Faust.
Steps to reproduce
- Create new python 3.8 virtualenv
- Install Faust
pip install git+https://github.com/robinhood/faust@b6ebdd955c1de5ceed81f53ade76b4b04561151e - Create worker with agent (with 2 actors) and with task that adds messages to topic. Add monitor that prints offset commits. Save it to
example.py
import asyncio
import faust
from faust.sensors.monitor import Monitor
class PrintCommitMonitor(Monitor):
def on_tp_commit(self, tp_offsets):
print(f'Commit offsets {tp_offsets}')
app = faust.App(
'test-app',
broker='kafka://localhost:9092',
value_serializer='raw',
Monitor=PrintCommitMonitor,
)
test_topic = app.topic('test')
@app.agent(test_topic, concurrency=2)
async def process(stream):
async for msg in stream:
print(f'Start processing {msg} - by {stream.concurrency_index}')
if msg == b'Message i=0':
await asyncio.sleep(20)
else:
await asyncio.sleep(3)
print(f'Finish processing {msg} - by {stream.concurrency_index}')
@app.task(on_leader=True)
async def on_started():
print('Add messages')
for i in range(10):
msg = f'Message i={i}'
await test_topic.send(key=msg, value=msg)
- Start worker
faust -A example worker -l info --without-web. Worker will create kafka topic. Task will write 10 messages to topic - Two actors will start processing messages. First actor will take message with offset 0, second actor will take message with offset 1
- After second actor will finish processing message with offset 1 (but before first actor will finish processing message with offset 0) offset 1 will be committed
Expected behavior
Worker commits offset only after all messages with smaller offsets are processed.
Actual behavior
Worker commits offset 1 after message with offset is processed by one actor, but message with offset 0 is still being processed by another actor.
Versions
- Python version - 3.8.6
- Faust version - git+https://github.com/robinhood/faust@b6ebdd955c1de5ceed81f53ade76b4b04561151e
- Operating system - Ubuntu 20.04.1
- Kafka version - 2.4.1
The concurrency processes messages out of order. You wont see this behavior with concurrency=1
I was also curious about this behavior. I was expecting messages to be processed out of order, but offsets to be committed in order. The current design could clearly lead to data loss in the case of a failure and restart.
if a service crash then restart; the tp offset was not the message witch made agent crash
this is a very important issue on faust's concurrency: even if the logic for the event process is completely build with out-of-order in mind, the out-of-order commit-offset will cause loss of data when consumer process is interrupted.