librdkafka
librdkafka copied to clipboard
rd_kafka_metadata can cause a UNKNOWN_TOPIC_OR_PART message
Description
Using a client with rd_kafka_queue_io_event_enable that is a consumer using rd_kafka_subscribe.
This works fine until metadata is requested via rd_kafka_metadata.
If the metadata is requested any time after making the subscription, the poll returns a message with error set to UNKNOWN_TOPIC_OR_PART (offset -1001) for every topic subscribed to (although the metadata returns correct results and the subscription continues getting messages).
If the metadata is requested prior to making any subscription, I dont get an error message from the poll.
How to reproduce
example client program (given topic exists called topic1 with partition 0, broker localhost:9092) purely to show the error message.
gcc main.c -lrdkafka -lz -lpthread -lssl
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <librdkafka/rdkafka.h>
int main(int argc,char** argv){
rd_kafka_t *rk=0;
rd_kafka_conf_t *conf=0;
char *brokers = "localhost:9092";
rd_kafka_queue_t *queue=0;
rd_kafka_resp_err_t err=0;
rd_kafka_topic_partition_list_t *t_partition;
rd_kafka_message_t *msg;
char errstr[512];
int spair[2];
if(pipe(spair)==-1){
fprintf(stderr, "pipe fail\n");
exit(1);
}
conf = rd_kafka_conf_new();
if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr,sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%% %s\n", errstr);
exit(1);
}
if (rd_kafka_conf_set(conf, "group.id","0", errstr,sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%% %s\n", errstr);
exit(1);
}
if (rd_kafka_conf_set(conf, "debug","all", errstr,sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%% %s\n", errstr);
exit(1);
}
if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr,sizeof(errstr)))) {
fprintf(stderr,"Failed to create new consumer: %s\n",errstr);
exit(1);
}
err=rd_kafka_poll_set_consumer(rk);
if(RD_KAFKA_RESP_ERR_NO_ERROR != err){
fprintf(stderr,"rd_kafka_poll_set_consumer err %s\n",rd_kafka_err2str(err));
exit(1);
}
queue = rd_kafka_queue_get_consumer(rk);
if (!queue){
fprintf(stderr, "queue fail\n");
exit(1);
}
rd_kafka_queue_io_event_enable(queue,spair[1],"X",1);
rd_kafka_set_log_queue(rk,NULL);
///////// subscribe
printf("Subscribing...\n");
t_partition = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(t_partition,"test1",RD_KAFKA_PARTITION_UA);
if(RD_KAFKA_RESP_ERR_NO_ERROR != (err= rd_kafka_subscribe(rk, t_partition))){
fprintf(stderr,"rd_kafka_subscribe err %s\n",rd_kafka_err2str(err));
exit(1);
}
rd_kafka_topic_partition_list_destroy(t_partition);
//////// metadata (comment out this section to prevent error)
{
printf("Get metadata...\n");
const struct rd_kafka_metadata *metadata;
err = rd_kafka_metadata(rk, 1, 0, &metadata,5000);
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
fprintf(stderr,"Failed to acquire metadata: %s\n",rd_kafka_err2str(err));
exit(1);
}
rd_kafka_metadata_destroy(metadata);
}
///////// poll
printf("Poll...\n");
while((msg= rd_kafka_consumer_poll(rk, 10000))) {
if (msg->err)
fprintf(stderr,"Got error msg %d (%s) [topic %s offset %lld]\n",msg->err,rd_kafka_err2name(msg->err),rd_kafka_topic_name(msg->rkt),msg->offset);
else
fprintf(stderr,"Got msg [topic %s offset %lld]\n",rd_kafka_topic_name(msg->rkt),msg->offset);
rd_kafka_message_destroy(msg);
}
printf("Finished...\n");
return 1;
}
outputs
Subscribing...
Get metadata...
Poll...
Got error msg 3 (UNKNOWN_TOPIC_OR_PART) [topic test1 offset -1001]
Finished...
if you edit the code to put the metadata request before the subscription, the error msg no longer appears.
The debug log gets a TOPICERR entry only when metadata requested after subscription.
Checklist
IMPORTANT: We will close issues where the checklist has not been completed.
Please provide the following information:
- [x] librdkafka version (release number or git tag):
2.3.0 - [x] Apache Kafka version:
7.0.1 - [x] librdkafka client configuration:
see example above - [x] Operating system:
OSX(x64)> - [x] Provide logs (with
debug=..as necessary) from librdkafka - [ ] Provide broker log excerpts
- [ ] Critical issue
bad.log - running with debug on with metadata request after subscription (gives error msg) good.log - running with debug on, with metadata request before subscription (no error msg)
On our program that wraps librdkafka, you can see this bad msg appearing prior to the successful subscription messages (first msg only appears when requesting metadata after subscription call) e.g.
mtype topic client partition offset msgtime data key headers rcvtime
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
UNKNOWN_TOPIC_OR_PART test1 0 0 -1001 "Subscribed topic not available: test1: Broker: Unknown topic or partition" `byte$() (`symbol$())!`symbol$() 2024.01.23D16:02:53.817149000
test1 0 0 0 2024.01.23D16:02:24.373000000 "2024.01.23D16:02:24.373020000" `byte$() (`symbol$())!`symbol$() 2024.01.23D16:02:57.045098000
test1 0 0 1 2024.01.23D16:02:26.614000000 "2024.01.23D16:02:26.614183000" `byte$() (`symbol$())!`symbol$() 2024.01.23D16:02:57.045232000
test1 0 0 2 2024.01.23D16:02:28.616000000 "2024.01.23D16:02:28.614183000" `byte$() (`symbol$())!`symbol$() 2024.01.23D16:02:57.045352000
test1 0 0 3 2024.01.23D16:02:30.616000000 "2024.01.23D16:02:30.614183000" `byte$() (`symbol$())!`symbol$() 2024.01.23D16:02:57.046034000
This seems to happens every time when using rd_kafka_metadata after a valid subscription is made to an existing topic, where rd_kafka_queue_io_event_enable is used enable notifications (I need to use the file descriptor type notifications of items appearing in the queue).
On a similar app that just does publishing, it doesnt seem to occur when rd_kafka_metadata called after publishing. When trying with an example subscription program that doesnt use rd_kafka_queue_io_event_enable, it doesnt seem to happen either.
I think this due to a bug discovered recently, affecting 2.1.0+ versions. When you subscribe a full metadata request is done. This request was emptying the metadata cache and causing those errors. https://github.com/confluentinc/librdkafka/blob/267367c9475c2154e72eafe6ff1957518cb2ed1a/src/rdkafka_metadata.c#L1441
It's fixed in this PR: https://github.com/confluentinc/librdkafka/pull/4660
could you test if it solves your problem?
Thanks, will let you know either way, once Ive tried.
@emasab Bug still occurring when testing with #4660
Running my full program I still see the issue.
Running with test program above, it still shows issue e.g.
Subscribing...
Get metadata...
Poll...
Got error msg 3 (UNKNOWN_TOPIC_OR_PART) [topic test1 offset -1001]
Still same effect, that if I move the rd_kafka_metadata before the rd_kafka_subscribe, you dont see the problem...but doing a rd_kafka_metadata after a rd_kafka_subscribe always shows the problem.
@emasab FYI: I tried librdkafka v2.0.2 & I dont see the bug occurring with that version.
Could you check latest pushed commit in same PR? I've reproduced and fixed your case, will add a new test too.
What happens is that when you call subscribe, the cache isn't populated until it joins the group, but if in the meantime it receives application metadata calls, it shouldn't update the group, because that causes this error.
@emasab FYI: I tried librdkafka v2.0.2 & I dont see the bug occurring with that version.
Yeah, there were changes with KIP 320 in 2.1.0
@emasab looks ok with latest change on PR , have tested again with original app.