cppkafka
cppkafka copied to clipboard
rdkafka breaks on exceptions from inside callbacks
The most typical problem is to create consumer and then close it. It leads to this backtrace:
#0 cppkafka::Consumer::handle_rebalance (this=0x7ffefc002910, error=RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS, topic_partitions=...) at ../../contrib/cppkafka/src/consumer.cpp:303
#1 0x00007fffdf3dd75b in cppkafka::Consumer::rebalance_proxy (error=RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS, partitions=0x7fff040038e0, opaque=0x7ffefc002910) at ../../contrib/cppkafka/src/consumer.cpp:55
#2 0x00007fffdf0937d6 in rd_kafka_poll_cb (rk=0x7ffefc003270, rkq=0x7fff24205e20, rko=0x7fff04003780, cb_type=RD_KAFKA_Q_CB_RETURN, opaque=0x0) at ../../contrib/librdkafka/src/rdkafka.c:3129
#3 0x00007fffdf095f3c in rd_kafka_consumer_close (rk=0x7ffefc003270) at ../../contrib/librdkafka/src/rdkafka.c:2660
#4 0x00007fffdf3de6a7 in cppkafka::Consumer::close (this=0x7ffefc002910) at ../../contrib/cppkafka/src/consumer.cpp:276
#5 0x00007fffdf3dde08 in cppkafka::Consumer::~Consumer (this=0x7ffefc002910) at ../../contrib/cppkafka/src/consumer.cpp:82
Where the next call to assign()
returns:
(gdb) info locals
error = RD_KAFKA_RESP_ERR__DESTROY
Which leads to the thrown exception and Kafka hangs somewhere inside itself. I was unable to debug the root cause.
The exception handler copy-pasted from destructor into handle_rebalance()
solves the symptoms.
From what i can see in the code, before close()
is called, the consumer callbacks are nullified to prevent holding refrences to the consumer itself. When handle_rebalance()
is called, the callback invoker throws because the assign callback is null, but its caught inside and returns an error instead. So the proper solution would be to check the error from the callback invoker.
This being said, I think it's an rdkafka
bug, because when close is called, it should not call the assignment callback. The consumer is going away so this would just be a null assignment.