Centralize the filter lookup in the core actor
Problem
The core actor dispatches incoming messages (from local publishers, peers or WebSockets) to subscribers (again: local subscribers, peers or WebSockets). For that, Broker currently sets up data flow pipelines. Each subscriber subscribes to the central merge point (where all the inputs flow together) and then filters unwanted messages.
The problem is that this filtering is done by each subscriber individually. This means we will perform n filter checks, where n is the number of subscribers. Since subscriptions in Broker are prefix-based, this means a filter check isn’t just a single hash map lookup.
Proposed Solution
At the end of the day, Broker shoves data from one set of buffers (inputs) to another set of buffers (outputs). Those buffers then connect to hubs, stores, peers or WebSocket clients. While data flows come with nice properties and APIs, we barely use any of that. In fact, the pipeline setup in the core actor has become quite involved since there we have added WebSocket peers and hubs to the mix.
The proposed solution operates much closer to the buffers. We introduce a new handler concept in the core actor, which is basically just a data sink. It is managing an output buffer and also adds back-pressure. Each handler has a queue that fills up if the buffer reached its capacity (which happens if we push faster than the reader consumes from the buffer). If that queue overflows, we apply the configured strategy (disconnect, drop-oldest or drop-newest). The main change is that we store all handlers in a single filter: a multi map that allows storing multiples handlers for the same topic(s). This means we always do a single, central filter lookup. The result of that lookup is a list of handlers that have a matching subscription. Then we call offer on all those handlers. This method may do a few extra checks. For example, a data store handler discards anything that isn’t a command message.
With this design, we only ever have exactly one filter lookup. While that filter may still grow with the number of peers, a central filter lookup should still be faster than doing n lookups.
Benchmarking
Setup: sending 20k messages/s. To start the receiving node:
./build/bin/broker-node -t "[$( tr -d '\n' < topics.txt ) '/benchmark/events']" -p 1234 -v -r
To start the sending node:
./build/bin/broker-throughput --verbose -r 20000 localhost:1234
After a while, we peer 20 additional nodes with the receiver, starting each via:
./build/bin/broker-node -t "[$( tr -d '\n' < topics.txt )]" --peers='["tcp://localhost:1234"]' -v
Baseline
No additional peerings
broker-node: ~36% CPU broker-throughput: ~34% CPU
20 additional peerings
broker-node: ~53% broker-throughput: ~37%
Branch
broker-node: ~36% CPU broker-throughput: ~34% CPU
20 additional peerings
broker-node: ~47% broker-throughput: ~36%
Discussion
Doing a single filter lookup reduces CPU load in the benchmark by about 6% (from 53% to 47%), which is good. However, there is still some linear scaling with the number of peers. The additional peers are not subscribed to the /benchmark/events topic, so they don’t receive any data. I can verify from the system diagnostics that the extra peers sit idle well below 1% CPU and don’t receive any data. Hence, they shouldn’t cause additional CPU load.
Long story short, the new structure cuts down unnecessary CPU load due to centralizing the filter lookup (resulting in a ~6% reduction of CPU load for the benchmark). I think the new code is also simpler, which is an added bonus. However, even in the new branch we can see that adding more peers adds run-time cost even if they don’t receive anything. We should look into that next, after wrapping up this PR.
@ckreibich thank you for the feedback! I've split the PR into four logical commits: two prepare steps, the actual change, and one cleanup commit that only removes obsolete files. The unrelated change to the broker-node has been filed separately and I've added additional documentation to the multimap.