librdkafka icon indicating copy to clipboard operation
librdkafka copied to clipboard

Memory Leaks in Kafka Producer

Open SealPoint opened this issue 3 years ago • 1 comments

Read the FAQ first: https://github.com/edenhill/librdkafka/wiki/FAQ

Do NOT create issues for questions, use the discussion forum: https://github.com/edenhill/librdkafka/discussions

Description

Memory leaks in rd_kafka_t

How to reproduce

The following code introduces memory leaks:

const int MSG_BUFFER_SIZE = 512;

struct DeliveryResult
{
    DeliveryResult ()
    {
	Err = 0;
    }

    int Err;
    string Message;
};

void DeliveryReportCallback (rd_kafka_t* producer,
	const rd_kafka_message_t* kafkaMessage,
	void* opaque)
{
    if (opaque != NULL)
    {
        DeliveryResult* deliveryResult = (DeliveryResult*)opaque;

        if (kafkaMessage->err)
        {
            deliveryResult->Err = 1;
            deliveryResult->Message = "No Ack Error";
        }
    }
}

const char* _authLocation =  "C:\\Certs\\my_key.key";
const char* _certificateLocation =  "C:\\Certs\\my_certificate.pem";

DeliveryResult deliveryResult;

rd_kafka_t* producer;
rd_kafka_conf_t* producerConfig;
char errorBuffer[MSG_BUFFER_SIZE];

producerConfig = rd_kafka_conf_new();
rd_kafka_conf_set(producerConfig, "security.protocol", "SSL", errorBuffer, MSG_BUFFER_SIZE);
rd_kafka_conf_set(producerConfig, "ssl.key.location", _authLocation, errorBuffer, MSG_BUFFER_SIZE);
rd_kafka_conf_set(producerConfig, "ssl.certificate.location", _certificateLocation, errorBuffer, MSG_BUFFER_SIZE);
rd_kafka_conf_set(producerConfig, "ssl.key.password", "ABCDE", errorBuffer, MSG_BUFFER_SIZE);
rd_kafka_conf_set_opaque(producerConfig, &deliveryResult);

if (rd_kafka_conf_set(producerConfig,
	"bootstrap.servers",
	 "localhost:9092",
	  errorBuffer,
	  sizeof(errorBuffer)) != RD_KAFKA_CONF_OK)
{
    rd_kafka_conf_destroy(producerConfig);
    throw "Invalid config property value error";
}

rd_kafka_conf_set_dr_msg_cb(producerConfig, DeliveryReportCallback);
producer = rd_kafka_new(RD_KAFKA_PRODUCER, producerConfig, errorBuffer, sizeof(errorBuffer));

if (producer == NULL)
{
    throw "Unable to create producer error";
}

deliveryResult.Err = 0;
deliveryResult.Message = "";
string messageString = "some_string";

future result = async(launch::async,
    rd_kafka_producev,
    producer,
    RD_KAFKA_V_TOPIC("some_topic"),
    RD_KAFKA_V_PARTITION(1),
    RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
    RD_KAFKA_V_VALUE(messageString.c_str(), messageString.length()),
    RD_KAFKA_V_OPAQUE(NULL),
    RD_KAFKA_V_END);
rd_kafka_resp_err_t err = result.get();

if (err)
{
    rd_kafka_destroy(producer);
    throw "Unable to create producer";
}

rd_kafka_flush(producer, 3000);

if (rd_kafka_outq_len(producer) > 0)
{
    if (deliveryResult.Err == 0)
    {
        deliveryResult.Err = 1;
        deliveryResult.Message = "Message never written to the bus";
    }
}

rd_kafka_destroy(producer);

Execute this code multiple times and Windows Task Manager will show increase in memory usage after each execution of this code.

IMPORTANT: Always try to reproduce the issue on the latest released version (see https://github.com/edenhill/librdkafka/releases), if it can't be reproduced on the latest version the issue has been fixed.

Checklist

IMPORTANT: We will close issues where the checklist has not been completed.

Please provide the following information:

  • [x] librdkafka version (release number or git tag): 1.9.2
  • [x] Apache Kafka version: <2.13.3>
  • [x] librdkafka client configuration: provided in the code above, the other values are default
  • [x] Operating system: <Windows 10 Enterprise (x64)>
  • [x] No logs
  • [x] Provide broker log excerpts
  • [x] Critical issue

SealPoint avatar Oct 07 '22 19:10 SealPoint

Your reproducer includes code to spawn a thread and then immediately block until it's done (using std::async/std::future). Do you see the same memory leak happening if you eliminate that extra thread? I.e. do you see the same leak if you replace this:

future result = async(launch::async,
    rd_kafka_producev,
    producer,
    RD_KAFKA_V_TOPIC("some_topic"),
    RD_KAFKA_V_PARTITION(1),
    RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
    RD_KAFKA_V_VALUE(messageString.c_str(), messageString.length()),
    RD_KAFKA_V_OPAQUE(NULL),
    RD_KAFKA_V_END);
rd_kafka_resp_err_t err = result.get();

with just this:

rd_kafka_resp_err_t err = rd_kafka_producev(
    producer,
    RD_KAFKA_V_TOPIC("some_topic"),
    RD_KAFKA_V_PARTITION(1),
    RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
    RD_KAFKA_V_VALUE(messageString.c_str(), messageString.length()),
    RD_KAFKA_V_OPAQUE(NULL),
    RD_KAFKA_V_END
);

Knowing whether std::async is really necessary to the bug, seems relevant to solving it.

(Tangential C++ style nit: please use messageString.data(), messageString.size() for the pointer-length pair, instead of .c_str(). See https://quuxplusone.github.io/blog/2020/03/20/c-str-correctness/ :))

Quuxplusone avatar Oct 18 '22 17:10 Quuxplusone

If this issue still persists, please open another

nhaq-confluent avatar Feb 27 '24 13:02 nhaq-confluent