swift-async-algorithms icon indicating copy to clipboard operation
swift-async-algorithms copied to clipboard

[WIP] Broadcast algorithm

Open phausler opened this issue 3 years ago • 12 comments

This algorithm allows for the broadcasting of values to multiple consumers from one source.

Multiple instances from the same broadcast will share the values produced from iteration. These also share failures that are thrown or non-throwing termination. Cancellation is handled on a per instance basis, but if all iterations are cancelled and there are no more references to the base iteration then the base iteration is cancelled.

There are bits of room for improvement: currently each side contains a buffer to the elements yet to be consumed. This could be changed to an index into a shared buffer, however the algorithm to share that is quite complex and could be a bit tricky to implement. So as an initial draft this is being left as O(N * M) storage where N is the number of active iterations and M is the number of elements that have yet to be consumed. In degenerate cases this can be looked at as O(N^2) storage, however in ideal cases it is O(1). The shared buffer can bring it down to degenerate cases being O(N) and the normal case being O(1).

phausler avatar Oct 14 '22 17:10 phausler

Hi @phausler

Happy we are beginning to introduce this kind of operator for a future version :-).

I have a few questions regarding this draft though:

  • it looks like an equivalent to the Combine share operator (multicast + auto start), meaning that future consumers will miss the initial values, is that the intent ?
  • shouldn’t we think about a family of “sharing” operators with some options (like replaying values). Some could use buffers, some could rely on AsyncChannel to enforce all consumers to consume before requesting the next elements ?
  • From what I understand, the base is iterated over in a dedicated task no matter the demand from the consumers. Am I right ? Isn’t it weird ?

Thanks.

twittemb avatar Oct 15 '22 10:10 twittemb

So it is the intent that prior values are NOT buffered for later consumption when new downstreams are attached (else the best case storage would be O(N^2)... which imho is unexpected: and if you need that then you can slap a buffer on it maybe?)

This is by no means the only one of this category: I'm sure there are either composable variants or other distinct algorithms in the same "multicast" family. This is just the one I see more commonly requested/used.

I think (with some extra complexity) I can modify this to respond to per element demand to pump the iterator along; that is likely tied to the consolidation of the buffers I was mentioning in the pr description. This is still a WIP so my next steps are to make that incremental demand and shared buffers. (And measure the impact/complexity of storage)

phausler avatar Oct 15 '22 18:10 phausler

Nice start! I agree that we should get rid of the Task in here and it ought to be possible. This is definitely one of the more interesting algorithms implementation wise :D

FranzBusch avatar Oct 19 '22 19:10 FranzBusch

@tcldr this might be of interest, a slightly different approach.

phausler avatar Nov 11 '22 20:11 phausler

@phausler Yes, this served as the inspiration for the work on shared! I originally intended to see if I could adapt this by adding history and a disposing iterator and then fell down the rabbit hole. :) (I borrowed your original test cases, too.)

The thing that begun to interest me is how this would be used with something like AsyncChannel. As this algorithm uses internal buffers, the utility of AsyncChannel is lost. I realised that if back pressure was to be transmitted from the site of consumption to the site of production, back pressure would need to be maintained throughout the pipeline.

The other thing I realised is that while a back pressure supporting algorithm can be made to work in a ‘depressurised’ way (by chaining a buffer), the inverse isn’t possible.

So I thought, maybe if I create a back pressure supporting multicast algorithm, it could be used as a primitive to support both use-cases: ones where back-pressure is a requirement and as well as those use cases where it isn’t (by chaining buffers).

tcldr avatar Nov 11 '22 23:11 tcldr

I would love to use this soon, do you plan to continue working on this in January @phausler ? :)

And or what is the relationship between this PR and https://github.com/apple/swift-async-algorithms/pull/227 ?

Sajjon avatar Dec 29 '22 12:12 Sajjon

How does this PR relate to https://github.com/apple/swift-async-algorithms/pull/242 ?

Sajjon avatar Jan 04 '23 14:01 Sajjon

They are both proposal how to solve this. After 1.0 of this library has shipped we need to take another look and discuss what approach we want.

FranzBusch avatar Jan 10 '23 14:01 FranzBusch

@FranzBusch almost 18 months later, any update? :) I think multicast is a crucial missing piece in Async Swift.

Sajjon avatar May 15 '24 18:05 Sajjon

@FranzBusch it's really useful feature 🙏

dehrom avatar Jun 24 '24 16:06 dehrom