akka
akka copied to clipboard
BroadcastHub swallows initial elements for new consumers
This issue is in regard to Akka Streams.
How to reproduce:
- Attach a source to a BroadcastHub, and materialise the hub
- Attach consumers to the hub
- Start emitting elements through the BroadcastHub
Expected behaviour:
- All consumers should receive all emitted elements
Actual behaviour:
- Some consumers may only receive some, or none of the emitted elements (indeterministic). Some or all initial elements may be swallowed.
The following test fails (most of the time)
"BroadcastHub" should "broadcast all elements to all consumers" in {
val blockingFlow = Source.queue(1, OverflowStrategy.fail) // used to block the source until we say so
val (queue, broadcast) = blockingFlow
.concat(Source(1 to 10)) // emit this after blockingFlow completes
.toMat(BroadcastHub.sink)(Keep.both)
.run()
val resultOne = broadcast.runWith(Sink.seq) // nothing happening yet
val resultTwo = broadcast.runWith(Sink.seq)
queue.complete() // only now is the source emptied
Await.result(resultOne, 1.second) should be(1 to 10) // fails
Await.result(resultTwo, 1.second) should be(1 to 10) // fails
}
Clearly this is a consequence of concurrency, but in some use-cases it's important to be able to guarantee deterministic behaviour, i.e. that all consumers receive all elements.
I see that this is actually noted in a few places in the test cases for BroadcastHub: https://github.com/akka/akka/blob/be67fb2ce5c14ae4e495251d89db35e15f75b531/akka-stream-tests/src/test/scala/akka/stream/scaladsl/HubSpec.scala#L201-L202
Would it be possible to make subscription of new consumers deterministic until the first element is emitted? Once the flow is running, I can see that it's by definition non-deterministic, but I'm hoping for a way to set your consumers up beforehand.
It would then need something similar to startAfterNrOfConsumers
in PartitionHub
* @param startAfterNrOfConsumers Elements are buffered until this number of consumers have been connected.
* This is only used initially when the operator is starting up, i.e. it is not honored when consumers have
* been removed (canceled).
We have this problem in cases where new consumers can be added anytime i.e., waiting for a certain number of initial subscriptions wouldn't fix anything.
Our use case is as follows:
- A publisher allow subscribing to its state changes
- subscribers can subscribe at anytime (they come and go over time)
- when a new subscriber subscribes, a snapshot of the state is emitted (to all subscribers), followed by incremental changes
Loosing either the snapshot or any of the following incremental update is not acceptable, if that happens the subscriber cancels the stream and re-subscribes.
This is roughly what gets executed in the publisher actor when a new subscriber subscribes:
class SomePublisher extends Actor {
val (source, broadcast) = Source
.queue[String](10, ???)
.toMat(BroadcastHub.sink[Elem](10))(Keep.both)
def receive = {
case Subscribe(sink: SinkRef[String]) =>
broadcast.dropWhile(_ != "state snapshot").runWith(sink)
queue.offer("state snapshot")
}
}
As the above doesn't work, our work around is to schedule a message that will trigger sending the snapshot:
def receive = {
case Subscribe(sink: SinkRef[String]) =>
broadcast.dropWhile(_ != "state snapshot").runWith(sink)
system.scheduler.scheduleOnce(500.millis, self, SendSnapshot)
case SendSnapshot => queue.offer("state snapshot")
}
however it is not satisfying as it is not reliable, more painful to test and slow.
We'd have no problem if the subscription to the broadcast hub stayed async, but would need a reliable way to know when the subscription is effective, for example using the following signature for creating a hub: BroadcastHub.sink[T](bufferSize: Int): Sink[T, Source[T, Future[Done]]]
. The materialized value of the materialized source would be a future to be completed when the subscription is effective. We could then use it as follows:
def receive = {
case Subscribe(sink: SinkRef[String]) =>
broadcast
.dropWhile(_ != "state snapshot")
.runWith(sink)
.map(_ => SendSnapshot)
.pipeTo(self)
case SendSnapshot => queue.offer("state snapshot")
}
@johanandren is this still in progress?
@francisdb looks like there are two open PRs (#27206 and #28327) but not sure if those people are working on them anymore. Please coordinate with those authors if you want to pick it up.
@johanandren I tried to pick up #27206 but got no response by the original author. How can I proceed from there? Can you tell me if the idea of the pull request is still a valid one? Or has the codebase changed too much?
@otto-dev And yes ,different buffer size gives different results