akka.net icon indicating copy to clipboard operation
akka.net copied to clipboard

[PERF] Using the BroadcastHub with thousands of consumers seems painfully slow.

Open RollsChris opened this issue 1 year ago • 13 comments
trafficstars

Version Information Akka.Streams="1.5.20"

Describe the performance issue I'm trying to create thousands of consumers for a BroadCastHub.

Each consumer listens to the stream and checks if they care about the current item.

As the number increases the performance slows considerably.

We think the buffer plays apart because as soon as we go past the buffer value things get really slow but we are unsure.

We also think that when the buffer gets full there is a potential for it to drop messages, as some values we never see.

Here is a LinqPad URL to demo the issue: LinqPad

Expected behavior Should be able to process 14000 messages across 14000 consumers using a BroadCastHub within seconds

Actual behavior Becomes un-usable in the thousands.

Environment LinqPad demo We are using Linux docker images in product, same performance issue.

RollsChris avatar Jun 13 '24 07:06 RollsChris

Thank you for the reproduction! I'll see what we can do here.

Aaronontheweb avatar Jun 13 '24 12:06 Aaronontheweb

The problem in the sample code is here:

return signalValueProducer
    .Where(producedValue => // <-- This is very bad
    {
        // We only care about a specific value in this flow/stream
        return valueToWatch == producedValue;
    })
    .RunForeach(y =>
    {
        // Remove from list, can be called multiple times with same value.
        streamValuesToRemove.Remove(y);
    }, materializer);

The .Where() operation requires this LINQ statement to iterate through all of the values emitted by signalValueProducer, which is being emitted by streamValuesSource, which is limited by the .Tick() stage to only emit every 50 miliseconds.

The higher the streamCount, the slower this operation is because the .Where() stage has to iterate through the whole set which takes (50 miliseconds * streamCount). In this example, this delay amounts to 100 seconds per .Select() operation (50 miliseconds * 2000) and each .Select() only produces a single subscriber.

Arkatufus avatar Jul 24 '24 16:07 Arkatufus

The provided sample code, for completeness:

async Task Main()
{
	var streamCount = 2000;
	var actorSystem = ActorSystem.Create("bla");
	IMaterializer materializer = actorSystem.Materializer();

	//Create list of values to watch 0-signalCount
	var streamValuesToWatch = Enumerable
		.Range(0, streamCount)
		.Select(e => e.ToString())
		.ToList();

	//On every tick get a value from streamValuesToWatch
	var index = 0;
	var streamValuesSource = Source
		.Tick(TimeSpan.FromSeconds(0), TimeSpan.FromMilliseconds(50), "")		
		.Select(i => streamValuesToWatch[index++ % streamCount])
		.Take(streamCount);
		
	
	// Create a thread safe collection to monitor if a value has been seen by is observer, if removed its been seen.
	var streamValuesToRemove = streamValuesToWatch
		.Select(x => (x, x))
		.ToHashMap()
		.ToAtom();

	// Create a broadcasthub so we can create a consumer for each streamValuesToWatch
	Source<string, NotUsed> signalValueProducer = streamValuesSource
		.ToMaterialized(BroadcastHub.Sink<string>(bufferSize: 1024), Keep.Right)
		.Run(materializer);

	// Create each consumer
	var tasks = streamValuesToWatch
	.Select(valueToWatch =>
	{
		return signalValueProducer
			.Where(producedValue =>
			{
				// We only care about a specific value in this flow/stream
				return valueToWatch == producedValue;
			})
			.RunForeach(y =>
			{
				// Remove from list, can be called multiple times with same value.
				streamValuesToRemove.Remove(y);
			}, materializer);
	}).ToList();

	// Used to monitor current value to be processed.
	signalValueProducer
		.RunWith(Sink.AsObservable<string>(), materializer)
		.DumpLatest("CurrentValue");

	// Dump the Count for signalsToRemove, should shrink when value has been seen. 
	Observable
		.Interval(TimeSpan.FromSeconds(1))
		.Select(o => streamValuesToRemove.Count)
		.DumpLatest("signalsToRemove");

	// Wait for all tasks. they should all complete when all n-signalCount items have been produced???
	await Task.WhenAll(tasks);
}

Arkatufus avatar Jul 24 '24 16:07 Arkatufus

Here is a better solution for the provided example: LinqPad

Arkatufus avatar Jul 24 '24 19:07 Arkatufus

Hi sorry for the late reply, i have had a look at your implementation and if i'm understanding it correctly its a bit different and i'm not sure on the performance.

We were trying to test the throughput of a stream when 2000 consumers are looking at each value and either working on it or discarding it.

In our example, working on it was removing the value from a collection and discarding it would be do nothing with it. Both operations should be extremely fast.

From this image, using example provided here:

Here is a better solution for the provided example: LinqPad

image

You can see it take 3 minutes to process 15 values... this to me would create back pressure and in our scenario fill our queue, as we have values being placed on the queue with a heart beat every 10 seconds.

I hope i'm making sense and i have understood correctly?

In the mean time we opted for a different solution using Async-enumerable and System.Threading.Channel . Async-enumerable from an iothub queue source and a channel per consumer, with a lookup table.

chris

RollsChris avatar Aug 07 '24 14:08 RollsChris

@RollsChris

I think there's a misunderstanding on how a source, a broadcast hub, and how a LINQ statement works. In your code, you're doing this:

var tasks = streamValuesToWatch
  .Select(valueToWatch =>
  {
    return signalValueProducer
      .Where(producedValue =>
      {
        // We only care about a specific value in this flow/stream
        return valueToWatch == producedValue;
      })
      .RunForeach(y =>
      {
        // Remove from list, can be called multiple times with same value.
        streamValuesToRemove.Remove(y);
      }, materializer);
}).ToList();

Basically, what you're doing is

  • for each value inside streamValuesToWatch,
    • materialize the stream source signalValueProducer
    • for each value emitted by this source
      • if the value equals to the current value of streamValuesToWatch, remove the same value from streamValuesToRemove

Observation:

  • The element count for streamValuesToWatch is 2000
  • The element count for signalValueProducer is a repeating value of 0-1999, infinitely, emitted every 50 miliseconds, and then broadcast to all of the observer sinks
  • For the 2000-th element to be emitted and processed by the tasks list, it would take at least 2000 * 50 miliseconds or 100 seconds until it even emitted and processed by the .Where() clause.
  • You can't expect the whole process to complete within seconds if the source takes 1.7 minutes to even produce the whole number sequence.

Note: The tasks list would never complete because signalValueProducer is non-completing, it needs to be canceled to complete.

Arkatufus avatar Aug 07 '24 14:08 Arkatufus

So I tried this out myself and was able to get the application to process everything in about 3:30, which is still too slow IMHO:

https://share.linqpad.net/qa5jrmw9.linq

Trick was to make sure we didn't start producing until after all of the consumers were attached. The problem appears to be, IMHO, that this system is lossy or slow - that there's consumers who are being put on pause while the hub is distributing work to others.

Aaronontheweb avatar Aug 07 '24 17:08 Aaronontheweb

I think the problem is here:

https://github.com/akkadotnet/akka.net/blob/1af82a75c58d943f0fe802b566dbfe8b78e34203/src/core/Akka.Streams/Dsl/Hub.cs#L940-L948

The code we use for determining which consumers get which message looks complicated to me - and based on my profiling data we're not doing anything particularly CPU intensive. Looks like there's a lot of timer-driven waiting going on inside the hub itself.

Aaronontheweb avatar Aug 07 '24 17:08 Aaronontheweb

@RollsChris There might be a bug in our code after all, we'll re-assess the broadcast hub internal implementation

Arkatufus avatar Aug 07 '24 18:08 Arkatufus

This may also benefit from a Box<T> wrapper for the queue itself due to variance checks on arrays.

Probably a small opt but worth mentioning while we are here...

On Wed, Aug 7, 2024, 1:42 PM Aaron Stannard @.***> wrote:

I think the problem is here:

https://github.com/akkadotnet/akka.net/blob/1af82a75c58d943f0fe802b566dbfe8b78e34203/src/core/Akka.Streams/Dsl/Hub.cs#L940-L948

The code we use for determining which consumers get which message looks complicated to me - and based on my profiling data we're not doing anything particularly CPU intensive. Looks like there's a lot of timer-driven waiting going on inside the hub itself.

— Reply to this email directly, view it on GitHub https://github.com/akkadotnet/akka.net/issues/7253#issuecomment-2273995395, or unsubscribe https://github.com/notifications/unsubscribe-auth/AC7UYVPVF7CAFLTAVTSCE5DZQJL7FAVCNFSM6AAAAABJHZA3RCVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDENZTHE4TKMZZGU . You are receiving this because you are subscribed to this thread.Message ID: @.***>

to11mtm avatar Aug 07 '24 18:08 to11mtm

Also bounds checks with calloffs to throwhelpers if needed...

On Wed, Aug 7, 2024, 2:55 PM Andrew Laing @.***> wrote:

This may also benefit from a Box<T> wrapper for the queue itself due to variance checks on arrays.

Probably a small opt but worth mentioning while we are here...

On Wed, Aug 7, 2024, 1:42 PM Aaron Stannard @.***> wrote:

I think the problem is here:

https://github.com/akkadotnet/akka.net/blob/1af82a75c58d943f0fe802b566dbfe8b78e34203/src/core/Akka.Streams/Dsl/Hub.cs#L940-L948

The code we use for determining which consumers get which message looks complicated to me - and based on my profiling data we're not doing anything particularly CPU intensive. Looks like there's a lot of timer-driven waiting going on inside the hub itself.

— Reply to this email directly, view it on GitHub https://github.com/akkadotnet/akka.net/issues/7253#issuecomment-2273995395, or unsubscribe https://github.com/notifications/unsubscribe-auth/AC7UYVPVF7CAFLTAVTSCE5DZQJL7FAVCNFSM6AAAAABJHZA3RCVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDENZTHE4TKMZZGU . You are receiving this because you are subscribed to this thread.Message ID: @.***>

to11mtm avatar Aug 07 '24 18:08 to11mtm

Thanks for the help, my gut feeling is that it should be a lot faster. We are looking at having 14000 consumers, with potentially 14000 updates every 10 seconds ( and that's the best case with a 1-1 mapping). Id also like to show you are alternative method but not got time at the moment to make a demo app.

RollsChris avatar Aug 08 '24 09:08 RollsChris

The performance issues have nothing to do with boxing or any measurable CPU issues - it's a scheduling problem. There's long periods of time where the consumers aren't doing anything. I think the algorithm for waking up / sleeping the consumers is off.

Aaronontheweb avatar Aug 08 '24 13:08 Aaronontheweb