faust
faust copied to clipboard
Proposal: Support for Failure-topic forwarding
Failure handling is an area where there is limited consensus within the Kafka community. One option for Faust would be adding support for failure forwarding in the same pattern as sinks. The API might look like:
# in faust.models.record
# new Faust Record type specifically for error handling
class FailureEventRecord(Record):
errormsg: str
exception: AgentException
failed_record: Record
# in app.py
topic = app.topic('my.event', value_type=MyRecord)
failure_topic = app.topic('my.event.failed', value_type=faust.FailureEventRecord)
@app.agent(topic, sinks=[sink1, sink2], failures=[failed_record_operation])
async def my_exception_agent(records):
with record in records:
raising_operation(record)
Checklist
- [x] I have included information about relevant versions
- [x] I have verified that the issue persists when using the
master
branch of Faust.
Steps to reproduce
Documentations says:
"Crashing the instance to require human intervention is certainly a choice, but far from ideal considering how common mistakes in code or unexpected exceptions are. It may be better to log the error and have ops replay and reprocess the stream on notification."
Expected behavior
Any events which fail stream processing in an agent will be wrapped in a FailureEventRecord
and delivered to the topic or topics specified in the failures
parameter to app.agent()
.
Versions
- Python 3.6
- Faust 1.7.0
- MacOS 10.14.5
- Kafka ?
- RocksDB version (if applicable)
Here's how Kafka connect handles it: https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues
I'm no expert in event-driven design but a very common scenario (that can be compared to circuit breaker in syncrhonous systems) I've encountered in some architectures is when you want to propagate an error record over multiple agents until it reaches some application sink that will produce a side effect. This happens mostly for input validation errors I guess and can be handled differently, but a suggestion to support this scenario would be a flag on the declaration of the agent that, when receiving a FailureEventRecord
will forward the record to all its sinks (and failures
topic I guess).
What do you think?
The Kafka Connect stuff is interesting. Could be that we add something similar to this? The developer could pass failures="pass"
(skip failures and keep going), failures="fail"
(the default now, blow up), or failures=[failed_record_operation, ...]
to have failures sent to one or mode dead-letter topics.
Another possibility stolen from Kafka Connect would be putting the error data in the message headers when forwarded to the new topic, rather than using a new wrapper Record for failures. Don't know how to access event headers from an agent, but I do see that header support is there in some form.
@sivy you can access headers of current event via
from faust.streams import current_event
current_event().headers
Curious if this will be a planned feature in the roadmap. This would be incredible valuable.