connect
connect copied to clipboard
The `fallback` output should stop trying to send messages to unreachable outputs
The following config will keep trying to send messages to the kafka_franz
output if it's unavailable, although there's an stdout
fallback output configured:
input:
generate:
mapping: |
root.id = count("foo_counter1")
interval: 0s
count: 3
pipeline:
threads: 1
processors:
- bloblang: |
root = match {
this.id == 1 => {"msg": "foo"}
this.id == 2 => {"msg": "bar"}
_ => throw("UNKNOWN ID")
}
root.id = this.id
output:
switch:
retry_until_success: true
cases:
- check: errored()
output:
stdout:
codec: lines
processors:
- log:
message: "errored message: ${! json()}"
- output:
fallback:
- kafka_franz:
seed_brokers:
- localhost:6666
topic: test
- stdout:
codec: lines
processors:
- log:
message: "successful message: ${! json()}"
logger:
level: DEBUG
format: json
add_timestamp: true
static_fields:
"@service": benthos
shutdown_timeout: 3s # Default 20s
From the Discord chat, it looks like the solution would be to try and detect connection loss via an enhancement.
LE: Might be nice to also have an exponential_backoff
field which tells fallthrough
to wait a bit more time before trying again to send a message to an output which is experiencing connectivity issues.
Please let me know if there's something I can help with here!
I dug into this a lot more and I suspect this is a pretty non-trivial change as it fundamentally alters the way Benthos thinks about connectivity errors and its reconnection loop. Given the scope, I put the discussion over at https://github.com/benthosdev/benthos/discussions/754#discussioncomment-4261704
I also have a sketch of what it takes to fix this for possibly a similar situation I faced with redis_list
if it helps anyone trying to fix a fallback
that isn't falling back.
Thanks @theRealWardo, totally agree with your main points there. I think the fundamental problem has been that different client libraries like to obfuscate connectivity and cancellation in their own ways so it's very difficult to have a consistent mechanism in there. My priority up until now has been just to ensure we have guaranteed delivery, which some client libraries are worringly eager to hide.
However, I think now would be a good time to start aiming for greater consistency in cancellation and deadlines during delivery. A massive unblocker for this has been putting contexts everywhere as before v4 that wasn't the case, but now we have a way of delivering both cancellation and deadlines out to components including processors (which still needs glueing together).
I've marked this issue as our very first v5 label. The purpose of that label isn't to defer this until the next major version bump (we hopefully will never need one), but it's a indicator that this is a fundamental improvement to Benthos internals that's significant enough that we would need to have an answer for it before considering any new major version bump or new plugin API.
That was an impressively quick reply @Jeffail - I really appreciate that! Thank you!
Do you have a proposed starting point for this work? Let me know how you like to do these kinds of changes.
I lean towards a draft PR with some possible configuration options on the Config
struct along with an accompanying docs change describing the fields in detail of course and possibly a configuration/guide as well?
Happy to help if you give me a shove in the right direction. 😄