numaflow icon indicating copy to clipboard operation
numaflow copied to clipboard

Buffer Usage Calculation and ISB Writing Race Condition: Potential Data Loss in High-Throughput Pipelines

Open yhl25 opened this issue 11 months ago • 5 comments

https://github.com/numaproj/numaflow/issues/1551 https://github.com/numaproj/numaflow/issues/1551#issuecomment-1991223266

yhl25 avatar Mar 12 '24 15:03 yhl25

Move to 1.3 because we have a work around.

vigith avatar Mar 19 '24 21:03 vigith

I was thinking of something. Can't we modify the discard policy in the buffer creation right here ? https://github.com/numaproj/numaflow/blob/f1e5ba0eb222edf3e5a5593769efc3626b092c1b/pkg/isbsvc/jetstream_service.go#L117

We hard-code this DiscardOld policy, which is the reason we encounter data loss instead of a write error according to the Jetstream doc.

The Discard Policy sets how messages are discarded when limits set by LimitsPolicy are reached. The DiscardOld option removes old messages making space for new, while DiscardNew refuses any new messages.

QuentinFAIDIDE avatar Apr 01 '24 16:04 QuentinFAIDIDE

DiscardNew only works with the workQueue policy, we cannot use the limits policy with DiscardNew in our use case. But when I was testing DiscardNew with the workQueue policy, I found some messages stuck in the stream, which were never delivered to the consumer. I regret forgetting to update the issue with my findings. Since we had a workaround, I haven't spent too much time on this, but I will look into it when I find some time.

yhl25 avatar Apr 01 '24 16:04 yhl25

Could you take a minute to elaborate on why it can't work ? I got all the test passing in a draft PR and have started torturing Numaflow with this change, and for now no issue yet. Sorry for being persistent, I'm a curious person, I will leave you in peace once I understand :laughing: It feels like it would make your lives easier to not be living under a buffer that silently drops data when it's full, but just error out :)

QuentinFAIDIDE avatar Apr 01 '24 17:04 QuentinFAIDIDE

When the stream is configured with the Limits policy and DiscardOld, Jetstream will automatically delete old messages if either the maxMsgs limit of 100k or the maxAge limit of 72h is met. However, when operating with DiscardNew and Limits, reaching the maxMsgs of 100k will result in the stream returning an error when attempting to write new messages and old messages will not be deleted from the stream, so the pipeline will get stuck. On the other hand, using DiscardNew with WorkQueue works fine because Jetstream deletes messages once they are consumed and acked.

yhl25 avatar Apr 02 '24 01:04 yhl25