Why consumers do not report errors if the topic is deleted
Read the FAQ first: https://github.com/confluentinc/librdkafka/wiki/FAQ
Do NOT create issues for questions, use the discussion forum: https://github.com/confluentinc/librdkafka/discussions
Description
When my program was consuming Kafka data through consumer, I deleted the topic of Kafka. I expected the consumer to report an error, but in reality, there was no error and no consumption data.
This is part of my program
RdKafka::KafkaConsumer* _k_consumer = RdKafka::KafkaConsumer::create(conf, errstr);
while(true) {
std::unique_ptr<RdKafka::Message> msg(_k_consumer->consume(1000 /* timeout, ms */));
switch (msg->err()) {
case RdKafka::ERR_NO_ERROR:
if (_consuming_partition_ids.count(msg->partition()) <= 0) {
_consuming_partition_ids.insert(msg->partition());
}
if (msg->len() == 0) {
// ignore msg with length 0.
// put empty msg into queue will cause the load process shutting down.
break;
} else if (!queue->blocking_put(msg.get())) {
// queue is shutdown
done = true;
} else {
++put_rows;
msg.release(); // release the ownership, msg will be deleted after being processed
}
++received_rows;
break;
case RdKafka::ERR__TIMED_OUT:
// leave the status as OK, because this may happened
// if there is no data in kafka.
LOG(INFO) << "kafka consume timeout: " << _id;
break;
case RdKafka::ERR__TRANSPORT:
LOG(INFO) << "kafka consume Disconnected: " << _id
<< ", retry times: " << retry_times++;
if (retry_times <= MAX_RETRY_TIMES_FOR_TRANSPORT_FAILURE) {
std::this_thread::sleep_for(std::chrono::milliseconds(200));
break;
}
[[fallthrough]];
case RdKafka::ERR__PARTITION_EOF: {
LOG(INFO) << "consumer meet partition eof: " << _id
<< " partition offset: " << msg->offset();
_consuming_partition_ids.erase(msg->partition());
if (!queue->blocking_put(msg.get())) {
done = true;
} else if (_consuming_partition_ids.size() <= 0) {
msg.release();
done = true;
} else {
msg.release();
}
break;
}
case RdKafka::ERR_OFFSET_OUT_OF_RANGE: {
done = true;
std::stringstream ss;
ss << msg->errstr() << ", consume partition " << msg->partition() << ", consume offset "
<< msg->offset();
LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << ss.str();
st = Status::InternalError<false>(ss.str());
break;
}
default:
LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << msg->errstr();
done = true;
st = Status::InternalError<false>(msg->errstr());
break;
}
}
If a new consumer is created, there will be an error. I mainly want to know why consumers which have already established a connection with Kafka will not report an error?
How to reproduce
Delete topic of Kafka when consuming Kafka data.
IMPORTANT: Always try to reproduce the issue on the latest released version (see https://github.com/confluentinc/librdkafka/releases), if it can't be reproduced on the latest version the issue has been fixed.
Checklist
IMPORTANT: We will close issues where the checklist has not been completed.
Please provide the following information:
- [x] librdkafka version (release number or git tag):
<1.9.2> - [x] Apache Kafka version:
<2.3.0> - [x] librdkafka client configuration:
<enable.partition.eof=true, enable.auto.offset.store=true> - [ ] Operating system:
<linux> - [ ] Provide logs (with
debug=..as necessary) from librdkafka - [ ] Provide broker log excerpts
- [ ] Critical issue