librdkafka
librdkafka copied to clipboard
Memory Leaks in Kafka Producer
Read the FAQ first: https://github.com/edenhill/librdkafka/wiki/FAQ
Do NOT create issues for questions, use the discussion forum: https://github.com/edenhill/librdkafka/discussions
Description
Memory leaks in rd_kafka_t
How to reproduce
The following code introduces memory leaks:
const int MSG_BUFFER_SIZE = 512;
struct DeliveryResult
{
DeliveryResult ()
{
Err = 0;
}
int Err;
string Message;
};
void DeliveryReportCallback (rd_kafka_t* producer,
const rd_kafka_message_t* kafkaMessage,
void* opaque)
{
if (opaque != NULL)
{
DeliveryResult* deliveryResult = (DeliveryResult*)opaque;
if (kafkaMessage->err)
{
deliveryResult->Err = 1;
deliveryResult->Message = "No Ack Error";
}
}
}
const char* _authLocation = "C:\\Certs\\my_key.key";
const char* _certificateLocation = "C:\\Certs\\my_certificate.pem";
DeliveryResult deliveryResult;
rd_kafka_t* producer;
rd_kafka_conf_t* producerConfig;
char errorBuffer[MSG_BUFFER_SIZE];
producerConfig = rd_kafka_conf_new();
rd_kafka_conf_set(producerConfig, "security.protocol", "SSL", errorBuffer, MSG_BUFFER_SIZE);
rd_kafka_conf_set(producerConfig, "ssl.key.location", _authLocation, errorBuffer, MSG_BUFFER_SIZE);
rd_kafka_conf_set(producerConfig, "ssl.certificate.location", _certificateLocation, errorBuffer, MSG_BUFFER_SIZE);
rd_kafka_conf_set(producerConfig, "ssl.key.password", "ABCDE", errorBuffer, MSG_BUFFER_SIZE);
rd_kafka_conf_set_opaque(producerConfig, &deliveryResult);
if (rd_kafka_conf_set(producerConfig,
"bootstrap.servers",
"localhost:9092",
errorBuffer,
sizeof(errorBuffer)) != RD_KAFKA_CONF_OK)
{
rd_kafka_conf_destroy(producerConfig);
throw "Invalid config property value error";
}
rd_kafka_conf_set_dr_msg_cb(producerConfig, DeliveryReportCallback);
producer = rd_kafka_new(RD_KAFKA_PRODUCER, producerConfig, errorBuffer, sizeof(errorBuffer));
if (producer == NULL)
{
throw "Unable to create producer error";
}
deliveryResult.Err = 0;
deliveryResult.Message = "";
string messageString = "some_string";
future result = async(launch::async,
rd_kafka_producev,
producer,
RD_KAFKA_V_TOPIC("some_topic"),
RD_KAFKA_V_PARTITION(1),
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
RD_KAFKA_V_VALUE(messageString.c_str(), messageString.length()),
RD_KAFKA_V_OPAQUE(NULL),
RD_KAFKA_V_END);
rd_kafka_resp_err_t err = result.get();
if (err)
{
rd_kafka_destroy(producer);
throw "Unable to create producer";
}
rd_kafka_flush(producer, 3000);
if (rd_kafka_outq_len(producer) > 0)
{
if (deliveryResult.Err == 0)
{
deliveryResult.Err = 1;
deliveryResult.Message = "Message never written to the bus";
}
}
rd_kafka_destroy(producer);
Execute this code multiple times and Windows Task Manager will show increase in memory usage after each execution of this code.
IMPORTANT: Always try to reproduce the issue on the latest released version (see https://github.com/edenhill/librdkafka/releases), if it can't be reproduced on the latest version the issue has been fixed.
Checklist
IMPORTANT: We will close issues where the checklist has not been completed.
Please provide the following information:
- [x] librdkafka version (release number or git tag):
1.9.2 - [x] Apache Kafka version:
<2.13.3> - [x] librdkafka client configuration:
provided in the code above, the other values are default - [x] Operating system:
<Windows 10 Enterprise (x64)> - [x] No logs
- [x] Provide broker log excerpts
- [x] Critical issue
Your reproducer includes code to spawn a thread and then immediately block until it's done (using std::async/std::future). Do you see the same memory leak happening if you eliminate that extra thread? I.e. do you see the same leak if you replace this:
future result = async(launch::async,
rd_kafka_producev,
producer,
RD_KAFKA_V_TOPIC("some_topic"),
RD_KAFKA_V_PARTITION(1),
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
RD_KAFKA_V_VALUE(messageString.c_str(), messageString.length()),
RD_KAFKA_V_OPAQUE(NULL),
RD_KAFKA_V_END);
rd_kafka_resp_err_t err = result.get();
with just this:
rd_kafka_resp_err_t err = rd_kafka_producev(
producer,
RD_KAFKA_V_TOPIC("some_topic"),
RD_KAFKA_V_PARTITION(1),
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
RD_KAFKA_V_VALUE(messageString.c_str(), messageString.length()),
RD_KAFKA_V_OPAQUE(NULL),
RD_KAFKA_V_END
);
Knowing whether std::async is really necessary to the bug, seems relevant to solving it.
(Tangential C++ style nit: please use messageString.data(), messageString.size() for the pointer-length pair, instead of .c_str(). See https://quuxplusone.github.io/blog/2020/03/20/c-str-correctness/ :))
If this issue still persists, please open another