fs2 icon indicating copy to clipboard operation
fs2 copied to clipboard

Add support for mergePreferred

Open shagoon opened this issue 2 years ago • 3 comments

This PR adds support for mergePreferred like akka provides.

Internally two bounded CE Queues are used, mainly because they provide tryTake for peeking the queue of the preferred stream. If no data is available on the preferred stream, a race is started on both queues. The fibre of the looser will be reused when retrieving the next element.

Testing is kind of hard because race() is not deterministic. Since tests are missing atm, I created this as a draft PR. It's ready for review, but not ready to be merged. Please comment.

shagoon avatar Jun 02 '23 14:06 shagoon

Testing is kind of hard because race() is not deterministic.

If you use the mock runtime in TestControl.executeEmbed(...) then they will be deterministic in the sense that all sleeps will be precise e.g. in race(sleep(1.milli), sleep(2.millis)) the left side will always win. Ties will be non-deterministic.

armanbilge avatar Jun 02 '23 14:06 armanbilge

TestControl.executeEmbed

I added some tests, atm without TestControl.executeEmbed. test("mergePreferred prefers") actually cheats by delaying the non-preferred stream. Without the delay, when fetching the first element, a race will be started (since preferredQueue probably would have nothing to offer at that moment), resulting in non-deterministic behaviour (i.e. non-preferred might win the race).

shagoon avatar Jun 02 '23 15:06 shagoon

I adjusted the test of the behaviour of mergePreferred to work without any tweaks of timing (i.e. no delay, metered or TestControl.executeEmbed, etc.). It tolerates some (currently 2%) of the elements of the resulting stream to originate from the non preferred stream. This can happen, if the preferred stream did not yet offer an element to its queue, resulting in a race() on both queues, which the non preferred queue might win.

shagoon avatar Jun 05 '23 06:06 shagoon