librdkafka
librdkafka copied to clipboard
The return value of 'rd_kafka_flush()' may be error, which would cause record loss
Description
I hava found a problem about 'rd_kafka_flush()', which can cause some data loss.
The scenarios is:
We use librdkafka to write record to KafkaCluster: ` resp_code = rd_kafka_produce*(...);
resp_code = rd_kafka_flush(...); `
When I stop the KafkaCluster.
rd_kafka_flush() may return an incorrect code, the return code is ok, that means the record has written to kafka. But in fact, kafka has no the record. It's a low probability event.
And I found when the case happens, it always print a log about producer, like this:
#producer.*|.*partition count changed from .* to 0
How to reproduce
I have reviewed a few codes about this.
In the code rd_kafka_resp_err_t rd_kafka_flush(rd_kafka_t *rk, int timeout_ms) {, It seems only two return statement:
-
if (rk->rk_type != RD_KAFKA_PRODUCER) return RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED; -
return msg_cnt > 0 ? RD_KAFKA_RESP_ERR__TIMED_OUT : RD_KAFKA_RESP_ERR_NO_ERROR;
rd_kafka_resp_err_t have many other error codes.
It seems try to deal with some other errors, I pay attation to some error such as:
rk->rk_fatal.err != RD_KAFKA_RESP_ERR_NO_ERROR
Maybe they should be considered.
I have reproduced the issue. My code file using librdkafka's unittest structure can be provide later. You can see the DRAFT: https://github.com/edenhill/librdkafka/pull/4104 This is my reproduce method.
But it's not an very directly unit test because I am not familiar with librdkafka project.
I print some logs and then using shell command to prove it.
I am glad to receive your advices about the test codes.
My commands:
- cd tests
- make -j32 && TESTS=0800 ./run-test.sh &> /tmp/result.out.4 &
- file=/tmp/result.out.4
- grep "producer.*partition count changed from" $file -A 1
grep "producer.*partition count changed from" $file -A 1 If the log happens, then record loss. (my opinion)
for example(My case): (the record: "produce2 711" is lost)
