confluent-kafka-python icon indicating copy to clipboard operation
confluent-kafka-python copied to clipboard

Asynchronous producer callbacks & multi-threaded environment

Open srir opened this issue 3 years ago • 2 comments

Description

We're using a multi-threaded Flask server with the asynchronous producer (confluent_kafka.avro.AvroProducer) to send events to Kafka. After we asynchronously produce the events, we use flush to ensure that callbacks have been called (and block the thread).

However, we're finding that we can't predict which threads these callbacks run on; sometimes, we find that they run on a thread different than the one that called produce, effectively running with a different request context (specifically: variables set on Flask's global g are incorrect).

What's the recommended way for callbacks to work in a multi-threaded Flask environment?

How to reproduce

  • set up a multi-threaded Flask server (using mod_wsgi); on each request, set some request-local value in g.foo
  • use confluent_kafka.avro.AvroProducer to produce events to Kafka, and call producer.flush() to trigger acknowledgement of delivery and on_delivery callbacks
  • with enough concurrent web requests, reading g.foo from within the on_delivery callback should show a value, different from the value set when the on_delivery callback was registered (note: the fact that the value is set should mean that we're running in some request context)

Checklist

Please provide the following information:

  • [x] confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()):
    • confluent-kafka-python: ('1.7.0', 17235968)
    • librdkafka: ('1.7.0', 17236223)
  • [ ] Apache Kafka broker version:
  • [ ] Client configuration: {...}
  • [ ] Operating system:
  • [ ] Provide client logs (with 'debug': '..' as necessary)
  • [ ] Provide broker log excerpts
  • [ ] Critical issue

srir avatar May 24 '22 23:05 srir

this blog post might be helpful: https://www.confluent.io/blog/kafka-python-asyncio-integration/

mhowlett avatar Jun 22 '22 17:06 mhowlett

hi @mhowlett - thanks for the response and link!

the problem we're having is a little different, though, as far as I can tell. we're using a WSGI server (Flask with mod_wsgi), which doesn't run on top of any async runtime that I know of (but uses threading to multiplex requests). I guess I'm wondering: how does confluent-kafka-python multiplex producer calls, and how does that interact with mod_wsgi threading?

srir avatar Jun 27 '22 11:06 srir

apologies - closing as as i don't have the bandwidth to look into / answer.

mhowlett avatar Oct 24 '22 21:10 mhowlett