confluent-kafka-python
confluent-kafka-python copied to clipboard
Asynchronous producer callbacks & multi-threaded environment
Description
We're using a multi-threaded Flask server with the asynchronous producer (confluent_kafka.avro.AvroProducer) to send events to Kafka. After we asynchronously produce the events, we use flush to ensure that callbacks have been called (and block the thread).
However, we're finding that we can't predict which threads these callbacks run on; sometimes, we find that they run on a thread different than the one that called produce, effectively running with a different request context (specifically: variables set on Flask's global g are incorrect).
What's the recommended way for callbacks to work in a multi-threaded Flask environment?
How to reproduce
- set up a multi-threaded Flask server (using
mod_wsgi); on each request, set some request-local value ing.foo - use
confluent_kafka.avro.AvroProducerto produce events to Kafka, and callproducer.flush()to trigger acknowledgement of delivery andon_deliverycallbacks - with enough concurrent web requests, reading
g.foofrom within theon_deliverycallback should show a value, different from the value set when theon_deliverycallback was registered (note: the fact that the value is set should mean that we're running in some request context)
Checklist
Please provide the following information:
- [x] confluent-kafka-python and librdkafka version (
confluent_kafka.version()andconfluent_kafka.libversion()):- confluent-kafka-python:
('1.7.0', 17235968) - librdkafka:
('1.7.0', 17236223)
- confluent-kafka-python:
- [ ] Apache Kafka broker version:
- [ ] Client configuration:
{...} - [ ] Operating system:
- [ ] Provide client logs (with
'debug': '..'as necessary) - [ ] Provide broker log excerpts
- [ ] Critical issue
this blog post might be helpful: https://www.confluent.io/blog/kafka-python-asyncio-integration/
hi @mhowlett - thanks for the response and link!
the problem we're having is a little different, though, as far as I can tell. we're using a WSGI server (Flask with mod_wsgi), which doesn't run on top of any async runtime that I know of (but uses threading to multiplex requests). I guess I'm wondering: how does confluent-kafka-python multiplex producer calls, and how does that interact with mod_wsgi threading?
apologies - closing as as i don't have the bandwidth to look into / answer.