pykafka icon indicating copy to clipboard operation
pykafka copied to clipboard

Add a callback parameter to produced messages, called upon receipt

Open mikepk opened this issue 9 years ago • 6 comments
trafficstars

Rather than iterate over a delivery report, an optional callback per message is used that gets called when the message is marked as delivered. This makes it easier to attach behaviors to particular messages without having to correlate message bodies in a delivery report.

A sample use case is some kind of cleanup code that should only be run when a message has been guaranteed delivered. For example, a data / message bridge where messages can be dequeued (or acknowledged) from a master queue only when delivery is guaranteed in Kafka to avoid data loss. In my case, a rabbitMQ message broker is being replicated into Kafka. The message in Rabbit should only be ack'd when delivery is guaranteed otherwise there could be data loss.

This could be done by creating a registry of all sent messages and their cleanup code, then iterating over the delivery report and correlating the message with the registry, but that gets complicated and messy fast. This code uses the existing record of produced messages in pykafka (the message set / message_batch) and adds an optional callback to each message. When the delivery check is done, the callback attached to the message is executed if there is one. This keeps the async nature of the delivery check but allows attaching behaviors to the delivery check.

mikepk avatar Mar 17 '16 22:03 mikepk

Thanks @mikepk. I like this idea, and I think you've implemented it in a reasonable way that doesn't overly complicate the interface. Might it be useful for the callback function to accept the Message instance for which it was called? I'd also like to have @rduplain and/or @yungchin weigh in on this.

Some integration tests for the callback functions would be good as well, even if it's just a single test making sure that the function is called properly.

emmettbutler avatar Mar 22 '16 16:03 emmettbutler

i like the callback interface. two thoughts.

I am personally more interested when the message fails to be delivered so i would also push for a failure_callback as well.

second, make sure you catch any exception and bind it and allow the thread to report the exception. here is a pr that does that for the rebalance callback.

https://github.com/Parsely/pykafka/pull/411/files

johnistan avatar Mar 22 '16 21:03 johnistan

When I have some spare time I'll go ahead and look into a failure_callback, exception, and tests. For failure callback are you thinking a call signature like def produce(self, message, partition_key=None, callback=None, errback=None):?

mikepk avatar Mar 23 '16 19:03 mikepk

I'd advocate for not adding more than a single callback. Instead of a success callback and an error callback, we could easily get by with a single callback taking a parameter that indicates whether there was an error - maybe the exception instance.

emmettbutler avatar Mar 23 '16 20:03 emmettbutler

Just to weigh in: a callback is interesting, and +1 to having a single callback that gets intuitive parameters for whether the message was successful.

rduplain avatar Mar 24 '16 19:03 rduplain

This could use another look and some tests if you're still interested in getting it into master, @mikepk. Thanks again for the contribution!

emmettbutler avatar Jun 21 '16 14:06 emmettbutler