confluent-kafka-python
confluent-kafka-python copied to clipboard
The kafka flush failed in the child process
Description
I start a kafka producer in subprocess,and it stop and could not continue run
How to reproduce
import sys
from confluent_kafka import Producer
from config import CommonConfig
mq_server = f'{CommonConfig.WEB_HOST}:{CommonConfig.KAFKA_PORT}'
topic = CommonConfig.APIFUZZ_TOPIC
p = Producer({
'bootstrap.servers': mq_server,
})
def delivery_callback(err, msg):
if err:
sys.stderr.write('%% Message failed delivery: %s\n' % err)
else:
sys.stderr.write('%% Message delivered to %s [%d] @ %d\n' %
(msg.topic(), msg.partition(), msg.offset()))
def send_msg(msg):
while True:
try:
p.produce(topic, msg.encode('utf-8'), callback=delivery_callback)
print(f"send msg:{msg}")
except BufferError:
sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\n' %
len(p))
p.poll(0)
p.flush()
from multiprocessing import Process
pr = Process(target=send_msg,args=("testtest",))
pr.start()
and the process seems stop and can not run in while True:
Checklist
Please provide the following information:
- [ ] confluent-kafka-python and librdkafka version (
confluent_kafka.version()
andconfluent_kafka.libversion()
):confluent_kafka==1.6.0 - [ ] Apache Kafka broker version:
- [ ] Client configuration:
{...}
- [ ] Operating system: ubuntu 20.04
- [ ] Provide client logs (with
'debug': '..'
as necessary) - [ ] Provide broker log excerpts
- [ ] Critical issue
I was able to make this work through Thread. Any specific reason to use multiprocessing instead of multithreading?
I think the issue is related to the fact the 2 different processes don't share same memory space and can't access each other's memory directly. If you want to achieve something similar then you might need to consider sharing memory between processes which will be difficult with complex object like producer.
I use multiprocessing because my project's feature needs to use multiprocess,when I find kafka don't support in child process, I changed it run in mainprocess. And I want to known why kafka doesn't support child process,cause I didn't see a document record this issue.