librdkafka
librdkafka copied to clipboard
Use-after-free when rd_kafka_new returns RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE
We're hitting this codepath in rd_kafka_new. This is in a pathologically large integration test where (1) all Kafka brokers are unreachable by design, and (2) we're running lots of VMs (7x 16-core VMs all simulated on the same single 34-core physical machine) such that "the OS is not scheduling the background threads" is actually a highly likely scenario for us.
if (rd_kafka_init_wait(rk, 60 * 1000) != 0) {
/* This should never happen unless there is a bug
* or the OS is not scheduling the background threads.
* Either case there is no point in handling this gracefully
* in the current state since the thread joins are likely
* to hang as well. */
mtx_lock(&rk->rk_init_lock);
rd_kafka_log(rk, LOG_CRIT, "INIT",
"Failed to initialize %s: "
"%d background thread(s) did not initialize "
"within 60 seconds",
rk->rk_name, rk->rk_init_wait_cnt);
if (errstr)
rd_snprintf(errstr, errstr_size,
"Timed out waiting for "
"%d background thread(s) to initialize",
rk->rk_init_wait_cnt);
mtx_unlock(&rk->rk_init_lock);
rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE,
EDEADLK);
return NULL;
}
Looking at this code, I see that it calls rd_kafka_log(rk, ...) and then returns null without destroying the rk at all. This seems like a memory leak, right? That's bad, but not fatal.
What's fatal is that this codepath leads to a use-after-free for us, because we assume that when rd_kafka_new fails, we should clean up the rk_conf that we passed to it. In other words, we do exactly what's documented in INTRODUCTION.md:
rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if (!rk) {
rd_kafka_conf_destroy(rk); // [sic]; actually this should say `conf` and I'll submit a PR for that in a minute
fail("Failed to create producer: %s\n", errstr);
}
/* Note: librdkafka takes ownership of the conf object on success */
But along this specific codepath, rd_kafka_new fails and returns NULL and yet there are still background threads running, and those background threads are going to try to access the function-pointer callbacks registered in that conf object. If we rd_kafka_conf_destroy(conf) on failure, then we have a use-after-free situation on those function pointers, and the symptom is generally a segfault.
#2820 might be related; it mentions this same codepath.
My ideal outcome here would be a simple rule like "When rd_kafka_new fails by returning NULL, it always relinquishes ownership of the rk_conf, so you can feel free to destroy it at your leisure," or "Whenever rd_kafka_new is called, it always takes ownership of the rk_conf, so you should actually never destroy the rk_conf after that point because now it belongs to the library." (The former is what I always thought the rule was. The latter would be awesome, but probably can't be achieved in practice because it would cause double-free bugs for all existing code.)
The second-best outcome would be if you could give us a simple distinguishing rule for when we should destroy the rk_conf and when we shouldn't. For example, "When rd_kafka_new fails by returning NULL, it relinquishes ownership of the rk_conf if and only if rd_kafka_last_error() != RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE. Client code should check for that situation and avoid destroying the rk_conf if rd_kafka_last_error() == RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE." (However, notice that that rule is also wrong, because of this codepath.)
Checklist
Please provide the following information:
- [x] librdkafka version (release number or git tag):
v1.8.2,v1.9.2 - [x] Apache Kafka version:
2.6.3, I think - [x] librdkafka client configuration: Various, but one example is a CONSUMER with the following non-default properties: {group.id => "redacted", metadata.broker.list => "172.31.1.6:9093", enable.partition.eof => "true", security.protocol => "ssl", ssl.key.location => "/var/redacted1.key", ssl.certificate.location => "/var/redacted2.pem", ssl.ca.location => "/var/redacted3.pem"}
- [x] Operating system: RHEL 7, also probably any OS
The function pointer that is the target of the use-after-free was set with rd_kafka_conf_set_log_cb, and the log message it's trying to print comes from this codepath in rd_kafka_broker_handle_ApiVersion, kicked off from rd_kafka_broker_connect_done:
rd_rkb_log(rkb, LOG_ERR, "APIVERSION",
"ApiVersionRequest v%hd failed due to "
"invalid request: "
"check client.software.name (\"%s\") and "
"client.software.version (\"%s\") "
"for invalid characters: "
"falling back to older request version",
request->rkbuf_reqhdr.ApiVersion,
rk->rk_conf.sw_name, rk->rk_conf.sw_version);