confluent-kafka-python
confluent-kafka-python copied to clipboard
suspicious memory leak in producer (rdkafka)
Description
I have a microservice that consumes messages from Kafka, do some work with it, and publish the result back to Kafka.
However it quickly get OOMKilled after started.
With help of memory profile, I managed to figure out that it's rdk:broker0 contributed the biggest memory usage (In my example it's a 384MiB pod in Kubernetes)

As seen in this report, there is no Python object that holds anything larger than 2MB from GC; It's rdk:broker0 holding 4460 allocations and 165MiB of memory unreleased.
Here is the KafkaProducerService code that calls Producer:
# irrelevant code omitted
class KafkaProducerService:
'''
main class
'''
def __init__(self, bootstrap_servers: str, topic: str) -> None:
self.__producer = Producer({
'bootstrap.servers': bootstrap_servers
})
self.__topic = topic
self.__count = 0
self.__previous_count_timestamp = datetime.now().timestamp()
def publish(self, key: str, value: bytes, headers: dict) -> None:
'''
publish message
'''
try:
self.__producer.produce(
self.__topic,
key=key,
value=value,
headers=headers
)
except BufferError as error:
raise RuntimeError(
"internal producer message queue is full"
) from error
except KafkaException as error:
raise RuntimeError(
"error adding to producer message queue"
) from error
num_messages_to_be_delievered = len(self.__producer)
if num_messages_to_be_delievered > 1000:
log.debug("wait for %s messages to be delivered to Kafka...",
num_messages_to_be_delievered)
try:
num_message = self.__producer.flush()
except KafkaException as error:
raise RuntimeError(
"error when flushing producer message queue to Kafka"
) from error
log.debug("%d messages still in Kafka", num_message)
self.__count += 1
self.__count_published()
def __count_published(self) -> None:
current_count_timestamp = datetime.now().timestamp()
if current_count_timestamp - self.__previous_count_timestamp >= 1:
self.__previous_count_timestamp = current_count_timestamp
if self.__count == 0:
return
log.info("%d messages published (%s messages pending for delivery)",
self.__count, len(self.__producer))
self.__count = 0
How to reproduce
- Install https://bloomberg.github.io/memray
- Use the snippet code above, build a Python script that keeps re-publishing Kafka messages back to the same topic.
- each message should be bigger than 50k
- Run the script with memray for like 5mins.
Checklist
Please provide the following information:
- [x] confluent-kafka-python and librdkafka version (
confluent_kafka.version()andconfluent_kafka.libversion()):
>>> confluent_kafka.version()
('1.8.2', 17302016)
>>> confluent_kafka.libversion()
('1.6.0', 17170687)
- [x] Apache Kafka broker version:
3.0.0
- [x] Client configuration:
{...}
default, except bootstrap.servers
- [x] Operating system:
Reproduced on both Alpine and Debian (Bullseye)
If call flush() on every publish, instead of every 1000 messages, memory still leaks but it's just very slow. Will get OOMKill eventuall, like in 30mins.
I've added some verbose logging to capture the remaining unpublished messages in Kafka internal queue every second (updated the snippet above):
2022-06-03 13:24:29,463 MainProcess(9) INFO kafka_consumer::__count_consumed - 1 messages consumed - last offset: 62597, last timestamp: 2022-05-29 18:02:19.732000 (1653847339732)
2022-06-03 13:24:35,550 KafkaProducerWorker(10) INFO kafka_producer::__count_published - 999 messages published (0 messages pending for delivery)
2022-06-03 13:24:42,028 KafkaProducerWorker(10) INFO kafka_producer::__count_published - 1001 messages published (0 messages pending for delivery)
2022-06-03 13:24:48,462 KafkaProducerWorker(10) INFO kafka_producer::__count_published - 1001 messages published (0 messages pending for delivery)
2022-06-03 13:24:54,916 KafkaProducerWorker(10) INFO kafka_producer::__count_published - 1001 messages published (0 messages pending for delivery)
2022-06-03 13:24:54,983 MainProcess(9) INFO kafka_consumer::__count_consumed - 1 messages consumed - last offset: 62598, last timestamp: 2022-05-29 18:02:27.617000 (1653847347617)
2022-06-03 13:25:01,379 KafkaProducerWorker(10) INFO kafka_producer::__count_published - 998 messages published (0 messages pending for delivery)
2022-06-03 13:25:07,885 KafkaProducerWorker(10) INFO kafka_producer::__count_published - 1001 messages published (0 messages pending for delivery)
2022-06-03 13:25:14,369 KafkaProducerWorker(10) INFO kafka_producer::__count_published - 1001 messages published (0 messages pending for delivery)
2022-06-03 13:25:20,849 KafkaProducerWorker(10) INFO kafka_producer::__count_published - 1001 messages published (0 messages pending for delivery)
2022-06-03 13:25:27,665 KafkaProducerWorker(10) INFO kafka_producer::__count_published - 1001 messages published (0 messages pending for delivery)
2022-06-03 13:25:34,190 KafkaProducerWorker(10) INFO kafka_producer::__count_published - 1001 messages published (0 messages pending for delivery)
2022-06-03 13:25:40,643 KafkaProducerWorker(10) INFO kafka_producer::__count_published - 1001 messages published (0 messages pending for delivery)
2022-06-03 13:25:46,975 KafkaProducerWorker(10) INFO kafka_producer::__count_published - 996 messages published (0 messages pending for delivery)
2022-06-03 13:25:53,397 KafkaProducerWorker(10) INFO kafka_producer::__count_published - 1001 messages published (0 messages pending for delivery)
2022-06-03 13:25:53,610 MainProcess(9) INFO kafka_consumer::__count_consumed - 1 messages consumed - last offset: 62599, last timestamp: 2022-05-29 18:02:29.433000 (1653847349433)
2022-06-03 13:25:59,878 KafkaProducerWorker(10) INFO kafka_producer::__count_published - 996 messages published (0 messages pending for delivery)
2022-06-03 13:26:06,369 KafkaProducerWorker(10) INFO kafka_producer::__count_published - 999 messages published (0 messages pending for delivery)
2022-06-03 13:26:12,721 KafkaProducerWorker(10) INFO kafka_producer::__count_published - 1001 messages published (0 messages pending for delivery)
Most of time there is 0 message pending for delivery, i.e. all messages are published in time. Thus the high memory usage is unlikely due to the remaining messages in the queue.
@tigerinus is this also present in 1.9.0?
worth trying 1.9.0, but I don't recall this coming up.
it's unusual to call flush except on producer shutdown. perhaps try poll based solution instead ( something along the lines of https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/asyncio_example.py )
with that said, i'm going to preemptively label this a bug, even though I haven't looked into it as I don't see why this should leak, and I believe you that it does.
Note from one of customers: Has this issue on 1.9.0.
Although, worth pointing out that I still get the leak without calling flush.
does the issue persist if you specify a delivery callback method?