akka.net
akka.net copied to clipboard
[PERF] Using the BroadcastHub with thousands of consumers seems painfully slow.
Version Information Akka.Streams="1.5.20"
Describe the performance issue I'm trying to create thousands of consumers for a BroadCastHub.
Each consumer listens to the stream and checks if they care about the current item.
As the number increases the performance slows considerably.
We think the buffer plays apart because as soon as we go past the buffer value things get really slow but we are unsure.
We also think that when the buffer gets full there is a potential for it to drop messages, as some values we never see.
Here is a LinqPad URL to demo the issue: LinqPad
Expected behavior Should be able to process 14000 messages across 14000 consumers using a BroadCastHub within seconds
Actual behavior Becomes un-usable in the thousands.
Environment LinqPad demo We are using Linux docker images in product, same performance issue.
Thank you for the reproduction! I'll see what we can do here.
The problem in the sample code is here:
return signalValueProducer
.Where(producedValue => // <-- This is very bad
{
// We only care about a specific value in this flow/stream
return valueToWatch == producedValue;
})
.RunForeach(y =>
{
// Remove from list, can be called multiple times with same value.
streamValuesToRemove.Remove(y);
}, materializer);
The .Where() operation requires this LINQ statement to iterate through all of the values emitted by signalValueProducer, which is being emitted by streamValuesSource, which is limited by the .Tick() stage to only emit every 50 miliseconds.
The higher the streamCount, the slower this operation is because the .Where() stage has to iterate through the whole set which takes (50 miliseconds * streamCount). In this example, this delay amounts to 100 seconds per .Select() operation (50 miliseconds * 2000) and each .Select() only produces a single subscriber.
The provided sample code, for completeness:
async Task Main()
{
var streamCount = 2000;
var actorSystem = ActorSystem.Create("bla");
IMaterializer materializer = actorSystem.Materializer();
//Create list of values to watch 0-signalCount
var streamValuesToWatch = Enumerable
.Range(0, streamCount)
.Select(e => e.ToString())
.ToList();
//On every tick get a value from streamValuesToWatch
var index = 0;
var streamValuesSource = Source
.Tick(TimeSpan.FromSeconds(0), TimeSpan.FromMilliseconds(50), "")
.Select(i => streamValuesToWatch[index++ % streamCount])
.Take(streamCount);
// Create a thread safe collection to monitor if a value has been seen by is observer, if removed its been seen.
var streamValuesToRemove = streamValuesToWatch
.Select(x => (x, x))
.ToHashMap()
.ToAtom();
// Create a broadcasthub so we can create a consumer for each streamValuesToWatch
Source<string, NotUsed> signalValueProducer = streamValuesSource
.ToMaterialized(BroadcastHub.Sink<string>(bufferSize: 1024), Keep.Right)
.Run(materializer);
// Create each consumer
var tasks = streamValuesToWatch
.Select(valueToWatch =>
{
return signalValueProducer
.Where(producedValue =>
{
// We only care about a specific value in this flow/stream
return valueToWatch == producedValue;
})
.RunForeach(y =>
{
// Remove from list, can be called multiple times with same value.
streamValuesToRemove.Remove(y);
}, materializer);
}).ToList();
// Used to monitor current value to be processed.
signalValueProducer
.RunWith(Sink.AsObservable<string>(), materializer)
.DumpLatest("CurrentValue");
// Dump the Count for signalsToRemove, should shrink when value has been seen.
Observable
.Interval(TimeSpan.FromSeconds(1))
.Select(o => streamValuesToRemove.Count)
.DumpLatest("signalsToRemove");
// Wait for all tasks. they should all complete when all n-signalCount items have been produced???
await Task.WhenAll(tasks);
}
Here is a better solution for the provided example: LinqPad
Hi sorry for the late reply, i have had a look at your implementation and if i'm understanding it correctly its a bit different and i'm not sure on the performance.
We were trying to test the throughput of a stream when 2000 consumers are looking at each value and either working on it or discarding it.
In our example, working on it was removing the value from a collection and discarding it would be do nothing with it. Both operations should be extremely fast.
From this image, using example provided here:
Here is a better solution for the provided example: LinqPad
You can see it take 3 minutes to process 15 values... this to me would create back pressure and in our scenario fill our queue, as we have values being placed on the queue with a heart beat every 10 seconds.
I hope i'm making sense and i have understood correctly?
In the mean time we opted for a different solution using Async-enumerable and System.Threading.Channel . Async-enumerable from an iothub queue source and a channel per consumer, with a lookup table.
chris
@RollsChris
I think there's a misunderstanding on how a source, a broadcast hub, and how a LINQ statement works. In your code, you're doing this:
var tasks = streamValuesToWatch
.Select(valueToWatch =>
{
return signalValueProducer
.Where(producedValue =>
{
// We only care about a specific value in this flow/stream
return valueToWatch == producedValue;
})
.RunForeach(y =>
{
// Remove from list, can be called multiple times with same value.
streamValuesToRemove.Remove(y);
}, materializer);
}).ToList();
Basically, what you're doing is
- for each value inside
streamValuesToWatch,- materialize the stream source
signalValueProducer - for each value emitted by this source
- if the value equals to the current value of
streamValuesToWatch, remove the same value fromstreamValuesToRemove
- if the value equals to the current value of
- materialize the stream source
Observation:
- The element count for
streamValuesToWatchis 2000 - The element count for
signalValueProduceris a repeating value of 0-1999, infinitely, emitted every 50 miliseconds, and then broadcast to all of the observer sinks - For the 2000-th element to be emitted and processed by the tasks list, it would take at least 2000 * 50 miliseconds or 100 seconds until it even emitted and processed by the
.Where()clause. - You can't expect the whole process to complete within seconds if the source takes 1.7 minutes to even produce the whole number sequence.
Note: The tasks list would never complete because signalValueProducer is non-completing, it needs to be canceled to complete.
So I tried this out myself and was able to get the application to process everything in about 3:30, which is still too slow IMHO:
https://share.linqpad.net/qa5jrmw9.linq
Trick was to make sure we didn't start producing until after all of the consumers were attached. The problem appears to be, IMHO, that this system is lossy or slow - that there's consumers who are being put on pause while the hub is distributing work to others.
I think the problem is here:
https://github.com/akkadotnet/akka.net/blob/1af82a75c58d943f0fe802b566dbfe8b78e34203/src/core/Akka.Streams/Dsl/Hub.cs#L940-L948
The code we use for determining which consumers get which message looks complicated to me - and based on my profiling data we're not doing anything particularly CPU intensive. Looks like there's a lot of timer-driven waiting going on inside the hub itself.
@RollsChris There might be a bug in our code after all, we'll re-assess the broadcast hub internal implementation
This may also benefit from a Box<T> wrapper for the queue itself due to variance checks on arrays.
Probably a small opt but worth mentioning while we are here...
On Wed, Aug 7, 2024, 1:42 PM Aaron Stannard @.***> wrote:
I think the problem is here:
https://github.com/akkadotnet/akka.net/blob/1af82a75c58d943f0fe802b566dbfe8b78e34203/src/core/Akka.Streams/Dsl/Hub.cs#L940-L948
The code we use for determining which consumers get which message looks complicated to me - and based on my profiling data we're not doing anything particularly CPU intensive. Looks like there's a lot of timer-driven waiting going on inside the hub itself.
— Reply to this email directly, view it on GitHub https://github.com/akkadotnet/akka.net/issues/7253#issuecomment-2273995395, or unsubscribe https://github.com/notifications/unsubscribe-auth/AC7UYVPVF7CAFLTAVTSCE5DZQJL7FAVCNFSM6AAAAABJHZA3RCVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDENZTHE4TKMZZGU . You are receiving this because you are subscribed to this thread.Message ID: @.***>
Also bounds checks with calloffs to throwhelpers if needed...
On Wed, Aug 7, 2024, 2:55 PM Andrew Laing @.***> wrote:
This may also benefit from a Box<T> wrapper for the queue itself due to variance checks on arrays.
Probably a small opt but worth mentioning while we are here...
On Wed, Aug 7, 2024, 1:42 PM Aaron Stannard @.***> wrote:
I think the problem is here:
https://github.com/akkadotnet/akka.net/blob/1af82a75c58d943f0fe802b566dbfe8b78e34203/src/core/Akka.Streams/Dsl/Hub.cs#L940-L948
The code we use for determining which consumers get which message looks complicated to me - and based on my profiling data we're not doing anything particularly CPU intensive. Looks like there's a lot of timer-driven waiting going on inside the hub itself.
— Reply to this email directly, view it on GitHub https://github.com/akkadotnet/akka.net/issues/7253#issuecomment-2273995395, or unsubscribe https://github.com/notifications/unsubscribe-auth/AC7UYVPVF7CAFLTAVTSCE5DZQJL7FAVCNFSM6AAAAABJHZA3RCVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDENZTHE4TKMZZGU . You are receiving this because you are subscribed to this thread.Message ID: @.***>
Thanks for the help, my gut feeling is that it should be a lot faster. We are looking at having 14000 consumers, with potentially 14000 updates every 10 seconds ( and that's the best case with a 1-1 mapping). Id also like to show you are alternative method but not got time at the moment to make a demo app.
The performance issues have nothing to do with boxing or any measurable CPU issues - it's a scheduling problem. There's long periods of time where the consumers aren't doing anything. I think the algorithm for waking up / sleeping the consumers is off.