Missed element in replaysubject
You can miss an element in replaysubject in on_next is called between line 167 and 170 of file https://github.com/Reactive-Extensions/RxCpp/blob/master/Rx/v2/src/rxcpp/subjects/rx-replaysubject.hpp We should lock a mutex between on_next and add in replay_observer in order to avoid that. PS: Can be difficult to reproduce, but it easy to see the issue here. You can see a ugly fix here: https://github.com/diorcety/RxCpp/commit/3fdfd9bf0bbc0c08af81e7c5f54fb6fc33fbfd8e
thank you for the report!
yes, you are correct values arriving during that window in subscribe are lost for that subscriber.
this is a case of tradeoffs.
The current code drops events from the middle which is unexpected. the expectation is to drop only from the front.
Using a mutex to prevent new items would block all subscriptions receiving incoming values until all new subscriptions caught up. (potentially forever in the case of slow consumers and large replay buffers and many new subscriptions)
Using a queue per subscription to buffer values that arrive before all the pending values are consumed. (potentially infinite queueing in the case of slow consumers and large replay buffers and many new subscriptions)
replay is often used to limit queueing and so a queueing solution would not be expected anymore than the current dropping of the events from the middle.
replay is a subject and therefore it is at times used to feed output values back in as input. the mutex option would potentially make this a deadlock.
perhaps another option would be to interleave replay values with live values. I expect that this violates the expectations as well.
This will require some time to check with other implementations of replay to see how they are implemented and pick the best outcome for rxcpp.
I would appreciate more opinions and options from anyone.
Maybe I use the wrong tool then. I'm using a subject that emit reply from network communication. The subscription of the listener can occurs after the first received reply. I'm need a tool that can ensure that all received reply will be handle (not like replaysubject here) and cached until processed.
usually Rx expressions are structured so that the request is not made until the listener has subscribed.
one way to structure the expression to allow the listener to subscribe later would be to reduce all the replies into a container, like a vector<> and then, after all the replies are complete, emit the final vector. another option is to use scan instead of reduce then the whole container would be emitted every time a reply was added (this works best with an immutable collection, not a std::vector).
I maybe found a difference between rxpy and rxcpp. https://github.com/Reactive-Extensions/RxCpp/blob/master/Rx/v2/src/rxcpp/subjects/rx-replaysubject.hpp#L166 https://github.com/ReactiveX/RxPY/blob/master/rx/subjects/replaysubject.py#L68
In case of rxpy if the on_completed event is received from the source, if a replay observer subscribe after this on_completed event then the replay subject replays all the on_next events and the on_complete event.
In rxcpp it only send the on_complete event.