gen_stage
gen_stage copied to clipboard
LIFO ordering for queued sync_info messages
Problem
- Populate a producer's buffer
- Queue multiple
sync_info/async_infomessages - Drain the buffer
- sync_info messages are delivered - but they are delivered LIFO instead of FIFO
Example If I add the following test, it fails:
test "delivers multiple info to producer in FIFO order" do
{:ok, producer} = Counter.start_link({:producer, self(), buffer_size: 3})
{:ok, consumer} = Forwarder.start_link({:consumer, self()})
# Fill buffer so info messages get queued
Counter.sync_queue(producer, [:a, :b, :c])
GenStage.sync_info(producer, :first)
GenStage.sync_info(producer, :second)
GenStage.sync_info(producer, :third)
refute_received :first
refute_received :second
refute_received :third
# Subscribe and consume events
GenStage.sync_subscribe(consumer, to: producer)
assert_receive {:consumed, [:a, :b, :c]}
# Skip subscription messages
assert_receive {:consumer_subscribed, _}
assert_receive {:producer_subscribed, _}
messages =
for n <- 1..3 do
receive do
msg -> msg
after
100 -> flunk("Did not receive expected number of messages (got #{n - 1})")
end
end
assert messages == [:first, :second, :third]
end
1) test info delivers multiple info to producer in FIFO order (GenStageTest)
test/gen_stage_test.exs:1064
Assertion with == failed
code: assert messages == [:first, :second, :third]
left: [:third, :second, :first]
right: [:first, :second, :third]
stacktrace:
test/gen_stage_test.exs:1093: (test)
I assume FIFO here is intended behavior? If so, happy to find root cause and fix.
This was Claude's first suspicion, I can look with fresh eyes:
Yes, correct, info messages should always be in FIFO. But I am not sure if reverse is correct. We may need to queue some events after the sync_info as well and guarantee they are correct too.