swift-async-algorithms icon indicating copy to clipboard operation
swift-async-algorithms copied to clipboard

[Broadcast] implement the algorithm with a buffering strategy (v1.1)

Open twittemb opened this issue 2 years ago • 2 comments

This PR is an attempt to implement a broadcast operator following the temporary conclusion of this discussion: https://forums.swift.org/t/swift-async-algorithms-proposal-broadcast-previously-shared/61210/36

This kind of operator can become tricky to implement once you consider the back pressure management and the cancellation policies.

To ease the understanding I've used 2 levels of state machines (known as orthogonal regions in my book):

  • a state machine per iteration/client to handle buffering and cancellation
  • a state machine for orchestrating the value distribution mechanism and restricting the access to the upstream sequence

I've used the pattern: one task for the upstream sequence iteration. This task is started on the first call to next by the first iteration and suspended between each element.

Pending questions (@FranzBusch, @phausler, @etcwilde) :

  • For now each client state machine is created on the first call to "next". It makes it a bit hard to test because we need some synchronisation between iterations to ensure we don't loose values and for now my tests are flaky because of that. Should we do that in the call to "makeAsyncIterator" instead ? (I remember a decision where nothing important should be done on makeAsyncIterator ... broadcast might be an exception to that rule)

  • For now I use an unbounded buffer per iteration, which can lead to uncontrolled memory footprint. Should we have an "on overflow" policy to drop elements? We mentioned an optimisation of the storage in the forum based on a single buffer and keeping track of indexes per client -> it means some more computations to handle the sanitisation of these indexes, meaning poorer performances also.

  • For now there is no "on cancellation" policy. The task for the upstream sequence remains alive when there is no more clients, waiting for a new one. Should we allow to cancel the upstream task in that case? It will be easy to do as I keep track of the task in the state machine.

  • For now there is a single locking mechanism for all the state machines (the main one and the ones per client). Should we consider a locking mechanism per client to increase the perfs (by parallelising computations) ?

I've not yet implemented a "replay" mechanism, but it will be pretty easy to do thanks to the separation of concerns between state machines. All we have to do is keep track of the history and initialise each new client state machine with this history in its internal buffer.

twittemb avatar Jan 01 '23 14:01 twittemb

Hi, awesome work! What’s the relation between this pull request and #227? Have we decided which one to merge?

fumoboy007 avatar Jan 29 '23 20:01 fumoboy007

Hi, awesome work! What’s the relation between this pull request and #227? Have we decided which one to merge?

Hi

these are 2 different implementations of the same operator. One based on a buffering strategy, the other based on a suspending strategy.

none of this has been reviewed yet since the focus is on validating all the existing operators for the v1.0.

twittemb avatar Jan 29 '23 21:01 twittemb