Improve dynamic output multiplexing performance
The dynamic output is currently the best performance option for dynamically altering the outputs of a stream. However, since this output is strictly fan-out, when the goal is to multiplex messages across the dynamic outputs we're forced to dispatch each message to every single output and rely on processors to filter down the correct messages:
output:
dynamic:
outputs:
foo:
http_client: ...
processors:
- bloblang: root = if type != "a" { deleted() }
bar:
kafka: ...
processors:
- bloblang: root = if type != "a" { deleted() }
baz:
sqs: ...
processors:
- bloblang: root = if type != "b" { deleted() }
...
As the number of outputs increases the performance hit grows linearly, even when the number of outputs that a message dispatches to is static. Since this dynamic dispatch capability is pretty much unique to Benthos it would be cool to take this output one step further and offer a high performance multiplexing solution.
One possibility is to expand the current implementation to include a check field with each output so that the routing can be determined before dispatch. However, this still means executing a bloblang query for each output for each message, carrying an overhead per-output.
If we really want to eliminate a performance cost of having a large number of outputs then we need the check to be made once, and it returns a static list of outputs to be dispatched to. As an example we could implement behaviour whereby if a message has a metadata field dynamic_output_dispatch specified then we parse it as a comma separated list of outputs to dispatch to:
pipeline:
processors:
- bloblang: |
meta dynamic_output_dispatch = match type {
case "a" => "foo",
case "b" => "bar",
case "c" => "baz",
case "d" => "baz,buz",
}
output:
dynamic:
outputs:
foo:
http_client: ...
bar:
kafka: ...
baz:
sqs: ...
...
Giving us a way to multiplex across a dynamic set of outputs with a single bloblang query.
Great writeup! I'm not sure this entirely works for us though.. say a new dynamic output gets posted, intended to consume messages with type == "a". Won't this require modification of that global processor, to add a case "a" => "new_output"?
Maybe wildcards could help with this - something like
meta dynamic_output_dispatch = "type-%s-*".format(type)
..so dynamic outputs with a label beginning with type-a- would receive messages of type a?
Good point, prefixes or wildcards could work but I would need to restrict dynamic output names to a more limited character set. I was originally thinking we'd have an API for editing resources (as a separate body of work) and it would be a two step process: add the output ready for routing messages to, then update the processor resource used for routing to include the new output and logic for selecting it.
I'm not sure I've captured a dynamic resources API in an issue yet but we could consider it a precursor. I'm not sure if that would suit your flow though, it'd be nice to find a generalized solution that would only include registering the new output without having to hook anything else in.
I'm not sure if that would suit your flow though
If a dynamic resources API is the clearest way forward, we're all for it! We don't mind writing a little code to make changes to dynamic resources as well as outputs.
Shower thought: Maybe the cleanest way forward here would be to fork an existing HTTP multiplexer and use that as the basis for output routing? It would be a new output type called multiplexer and it would allow you to add/remove outputs and dispatch messages using the same rules as HTTP server endpoints, which would make it much easier to document.
Edit: quick examples:
If we were to register:
- Output A with route
/ - Output B with route
/types - Output C with route
/types/foo - Output D with route
/types/bar
Then messages with the following paths (at metadata field benthos_endpoint or something) would be routed as follows:
-
/types/foo-> output C -
/types/bar-> output D -
/types/baz-> output B -
/no_type-> output A