faust icon indicating copy to clipboard operation
faust copied to clipboard

Proposal: Support for Failure-topic forwarding

Open sivy opened this issue 5 years ago • 6 comments

Failure handling is an area where there is limited consensus within the Kafka community. One option for Faust would be adding support for failure forwarding in the same pattern as sinks. The API might look like:

# in faust.models.record
# new Faust Record type specifically for error handling
class FailureEventRecord(Record):
    errormsg: str
    exception: AgentException
    failed_record: Record

# in app.py
topic = app.topic('my.event', value_type=MyRecord)
failure_topic = app.topic('my.event.failed', value_type=faust.FailureEventRecord)

@app.agent(topic, sinks=[sink1, sink2], failures=[failed_record_operation])
async def my_exception_agent(records):
    with record in records:
        raising_operation(record)

Checklist

  • [x] I have included information about relevant versions
  • [x] I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

Documentations says:

"Crashing the instance to require human intervention is certainly a choice, but far from ideal considering how common mistakes in code or unexpected exceptions are. It may be better to log the error and have ops replay and reprocess the stream on notification."

Expected behavior

Any events which fail stream processing in an agent will be wrapped in a FailureEventRecord and delivered to the topic or topics specified in the failures parameter to app.agent().

Versions

  • Python 3.6
  • Faust 1.7.0
  • MacOS 10.14.5
  • Kafka ?
  • RocksDB version (if applicable)

sivy avatar Jul 01 '19 19:07 sivy

Here's how Kafka connect handles it: https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues

nimish avatar Jul 01 '19 20:07 nimish

I'm no expert in event-driven design but a very common scenario (that can be compared to circuit breaker in syncrhonous systems) I've encountered in some architectures is when you want to propagate an error record over multiple agents until it reaches some application sink that will produce a side effect. This happens mostly for input validation errors I guess and can be handled differently, but a suggestion to support this scenario would be a flag on the declaration of the agent that, when receiving a FailureEventRecord will forward the record to all its sinks (and failures topic I guess).

What do you think?

chobeat avatar Jul 01 '19 20:07 chobeat

The Kafka Connect stuff is interesting. Could be that we add something similar to this? The developer could pass failures="pass" (skip failures and keep going), failures="fail" (the default now, blow up), or failures=[failed_record_operation, ...] to have failures sent to one or mode dead-letter topics.

sivy avatar Jul 01 '19 20:07 sivy

Another possibility stolen from Kafka Connect would be putting the error data in the message headers when forwarded to the new topic, rather than using a new wrapper Record for failures. Don't know how to access event headers from an agent, but I do see that header support is there in some form.

sivy avatar Jul 02 '19 13:07 sivy

@sivy you can access headers of current event via

from faust.streams import current_event

current_event().headers

pistolero avatar Jul 02 '19 14:07 pistolero

Curious if this will be a planned feature in the roadmap. This would be incredible valuable.

grjones avatar Mar 10 '22 20:03 grjones