modern-cpp-kafka
modern-cpp-kafka copied to clipboard
Missing consumer cb causes librdkafka assertion failure
Hi,
consider the following scenario: After subscribing and seeking into some topic X, while performing the subscribe on the KafkaConsumer
of a further topic Y, we end up inside rd_kafka_q_serve
(which polls and executes operations) with an operation of rko_type ==RD_KAFKA_OP_FETCH
and cb_type == RD_KAFKA_Q_CB_CALLBACK
(presumably concerning topic X). This eventually leads us into rd_kafka_poll_cb
which says
if (!rk->rk_conf.consume_cb ||
cb_type == RD_KAFKA_Q_CB_RETURN ||
cb_type == RD_KAFKA_Q_CB_FORCE_RETURN)
return RD_KAFKA_OP_RES_PASS; /* Dont handle here */
and we eventually hit the assertion in the q_serve because all ops must be handled:
res = rd_kafka_op_handle(rk, &localq, rko, cb_type, opaque,
callback);
/* op must have been handled */
rd_kafka_assert(NULL, res != RD_KAFKA_OP_RES_PASS);
The solution I tried was adding the consume_cb that is missing above, which KafkaConsumer does not set. So I add
rd_kafka_conf_set_consume_cb(conf,
[](rd_kafka_message_t* rkmessage, void* opaque) {
throw;
});
into KafkaConsumer::registerConfigCallbacks. Interestingly, because the rko version of the fetch operation is outdated, it is discarded anyway, see rd_kafka_consume_cb
if (unlikely(rd_kafka_op_version_outdated(rko, 0)) ||
rko->rko_type == RD_KAFKA_OP_BARRIER) {
rd_kafka_op_destroy(rko);
return RD_KAFKA_OP_RES_HANDLED;
}
so actually the CB is never invoked, however is required by librdkafka logic. Can we add a vacuous callback (perhaps with either an exception or just no-op) so this failure mode is averted?
Hi, Arcoth,
Could you please help show some demo code about how to trigger the issue?
According to my understanding, calling the consumser.subscribe(...)
twice (for 2 different topics) would have a chance to reproduce it, right?