confluent-kafka-python icon indicating copy to clipboard operation
confluent-kafka-python copied to clipboard

The kafka flush failed in the child process

Open gjm-anban opened this issue 2 years ago • 2 comments

Description

I start a kafka producer in subprocess,and it stop and could not continue run

How to reproduce

import sys

from confluent_kafka import Producer
from config import CommonConfig

mq_server = f'{CommonConfig.WEB_HOST}:{CommonConfig.KAFKA_PORT}'

topic = CommonConfig.APIFUZZ_TOPIC

p = Producer({
    'bootstrap.servers': mq_server,
})


def delivery_callback(err, msg):
    if err:
        sys.stderr.write('%% Message failed delivery: %s\n' % err)
    else:
        sys.stderr.write('%% Message delivered to %s [%d] @ %d\n' %
                         (msg.topic(), msg.partition(), msg.offset()))

def send_msg(msg):
    while True:
        try:
            p.produce(topic, msg.encode('utf-8'), callback=delivery_callback)
            print(f"send msg:{msg}")
        except BufferError:
            sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\n' %
                            len(p))
        
        p.poll(0)
        p.flush()


from multiprocessing import Process
pr = Process(target=send_msg,args=("testtest",))
pr.start()

and the process seems stop and can not run in while True: image

Checklist

Please provide the following information:

  • [ ] confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()):confluent_kafka==1.6.0
  • [ ] Apache Kafka broker version:
  • [ ] Client configuration: {...}
  • [ ] Operating system: ubuntu 20.04
  • [ ] Provide client logs (with 'debug': '..' as necessary)
  • [ ] Provide broker log excerpts
  • [ ] Critical issue

gjm-anban avatar Aug 22 '22 09:08 gjm-anban

I was able to make this work through Thread. Any specific reason to use multiprocessing instead of multithreading?

I think the issue is related to the fact the 2 different processes don't share same memory space and can't access each other's memory directly. If you want to achieve something similar then you might need to consider sharing memory between processes which will be difficult with complex object like producer.

pranavrth avatar Sep 13 '22 15:09 pranavrth

I use multiprocessing because my project's feature needs to use multiprocess,when I find kafka don't support in child process, I changed it run in mainprocess. And I want to known why kafka doesn't support child process,cause I didn't see a document record this issue.

gjm-anban avatar Sep 14 '22 01:09 gjm-anban