librdkafka icon indicating copy to clipboard operation
librdkafka copied to clipboard

Why consumers do not report errors if the topic is deleted

Open sollhui opened this issue 1 year ago • 0 comments

Read the FAQ first: https://github.com/confluentinc/librdkafka/wiki/FAQ

Do NOT create issues for questions, use the discussion forum: https://github.com/confluentinc/librdkafka/discussions

Description

When my program was consuming Kafka data through consumer, I deleted the topic of Kafka. I expected the consumer to report an error, but in reality, there was no error and no consumption data.

This is part of my program

RdKafka::KafkaConsumer* _k_consumer = RdKafka::KafkaConsumer::create(conf, errstr);
while(true) {
std::unique_ptr<RdKafka::Message> msg(_k_consumer->consume(1000 /* timeout, ms */));
switch (msg->err()) {
        case RdKafka::ERR_NO_ERROR:
            if (_consuming_partition_ids.count(msg->partition()) <= 0) {
                _consuming_partition_ids.insert(msg->partition());
            }
            if (msg->len() == 0) {
                // ignore msg with length 0.
                // put empty msg into queue will cause the load process shutting down.
                break;
            } else if (!queue->blocking_put(msg.get())) {
                // queue is shutdown
                done = true;
            } else {
                ++put_rows;
                msg.release(); // release the ownership, msg will be deleted after being processed
            }
            ++received_rows;
            break;
        case RdKafka::ERR__TIMED_OUT:
            // leave the status as OK, because this may happened
            // if there is no data in kafka.
            LOG(INFO) << "kafka consume timeout: " << _id;
            break;
        case RdKafka::ERR__TRANSPORT:
            LOG(INFO) << "kafka consume Disconnected: " << _id
                      << ", retry times: " << retry_times++;
            if (retry_times <= MAX_RETRY_TIMES_FOR_TRANSPORT_FAILURE) {
                std::this_thread::sleep_for(std::chrono::milliseconds(200));
                break;
            }
            [[fallthrough]];
        case RdKafka::ERR__PARTITION_EOF: {
            LOG(INFO) << "consumer meet partition eof: " << _id
                      << " partition offset: " << msg->offset();
            _consuming_partition_ids.erase(msg->partition());
            if (!queue->blocking_put(msg.get())) {
                done = true;
            } else if (_consuming_partition_ids.size() <= 0) {
                msg.release();
                done = true;
            } else {
                msg.release();
            }
            break;
        }
        case RdKafka::ERR_OFFSET_OUT_OF_RANGE: {
            done = true;
            std::stringstream ss;
            ss << msg->errstr() << ", consume partition " << msg->partition() << ", consume offset "
               << msg->offset();
            LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << ss.str();
            st = Status::InternalError<false>(ss.str());
            break;
        }
        default:
            LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << msg->errstr();
            done = true;
            st = Status::InternalError<false>(msg->errstr());
            break;
        }
}
 

If a new consumer is created, there will be an error. I mainly want to know why consumers which have already established a connection with Kafka will not report an error?

How to reproduce

Delete topic of Kafka when consuming Kafka data.

IMPORTANT: Always try to reproduce the issue on the latest released version (see https://github.com/confluentinc/librdkafka/releases), if it can't be reproduced on the latest version the issue has been fixed.

Checklist

IMPORTANT: We will close issues where the checklist has not been completed.

Please provide the following information:

  • [x] librdkafka version (release number or git tag): <1.9.2>
  • [x] Apache Kafka version: <2.3.0>
  • [x] librdkafka client configuration: <enable.partition.eof=true, enable.auto.offset.store=true>
  • [ ] Operating system: <linux>
  • [ ] Provide logs (with debug=.. as necessary) from librdkafka
  • [ ] Provide broker log excerpts
  • [ ] Critical issue

sollhui avatar Sep 04 '24 03:09 sollhui