librdkafka icon indicating copy to clipboard operation
librdkafka copied to clipboard

rd_kafka_metadata can cause a UNKNOWN_TOPIC_OR_PART message

Open sshanks-kx opened this issue 1 year ago • 10 comments

Description

Using a client with rd_kafka_queue_io_event_enable that is a consumer using rd_kafka_subscribe.

This works fine until metadata is requested via rd_kafka_metadata.

If the metadata is requested any time after making the subscription, the poll returns a message with error set to UNKNOWN_TOPIC_OR_PART (offset -1001) for every topic subscribed to (although the metadata returns correct results and the subscription continues getting messages).

If the metadata is requested prior to making any subscription, I dont get an error message from the poll.

How to reproduce

example client program (given topic exists called topic1 with partition 0, broker localhost:9092) purely to show the error message.

gcc main.c -lrdkafka -lz -lpthread -lssl

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <librdkafka/rdkafka.h>

int main(int argc,char** argv){
    rd_kafka_t *rk=0;
    rd_kafka_conf_t *conf=0;
    char *brokers = "localhost:9092";
    rd_kafka_queue_t *queue=0;
    rd_kafka_resp_err_t err=0;
    rd_kafka_topic_partition_list_t *t_partition;
    rd_kafka_message_t *msg;
    char errstr[512];
    int spair[2];

    if(pipe(spair)==-1){
        fprintf(stderr, "pipe fail\n");
        exit(1);
    }

    conf = rd_kafka_conf_new();
    if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr,sizeof(errstr)) != RD_KAFKA_CONF_OK) {
        fprintf(stderr, "%% %s\n", errstr);
        exit(1);
    }

    if (rd_kafka_conf_set(conf, "group.id","0", errstr,sizeof(errstr)) != RD_KAFKA_CONF_OK) {
        fprintf(stderr, "%% %s\n", errstr);
        exit(1);
    }

    if (rd_kafka_conf_set(conf, "debug","all", errstr,sizeof(errstr)) != RD_KAFKA_CONF_OK) {
        fprintf(stderr, "%% %s\n", errstr);
        exit(1);
    }

    if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr,sizeof(errstr)))) {
      fprintf(stderr,"Failed to create new consumer: %s\n",errstr);
      exit(1);
    }

    err=rd_kafka_poll_set_consumer(rk);
    if(RD_KAFKA_RESP_ERR_NO_ERROR != err){
        fprintf(stderr,"rd_kafka_poll_set_consumer err %s\n",rd_kafka_err2str(err));
        exit(1);
    }
    queue = rd_kafka_queue_get_consumer(rk);
    if (!queue){
        fprintf(stderr, "queue fail\n");
        exit(1);
    }
    rd_kafka_queue_io_event_enable(queue,spair[1],"X",1);
    rd_kafka_set_log_queue(rk,NULL);

    ///////// subscribe
    printf("Subscribing...\n");
    t_partition = rd_kafka_topic_partition_list_new(1);
    rd_kafka_topic_partition_list_add(t_partition,"test1",RD_KAFKA_PARTITION_UA);
    if(RD_KAFKA_RESP_ERR_NO_ERROR != (err= rd_kafka_subscribe(rk, t_partition))){
        fprintf(stderr,"rd_kafka_subscribe err %s\n",rd_kafka_err2str(err));
        exit(1);
    }
    rd_kafka_topic_partition_list_destroy(t_partition);

    //////// metadata (comment out this section to prevent error)
    {
        printf("Get metadata...\n");
        const struct rd_kafka_metadata *metadata;
        err = rd_kafka_metadata(rk, 1, 0, &metadata,5000);
          if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
             fprintf(stderr,"Failed to acquire metadata: %s\n",rd_kafka_err2str(err));
             exit(1);
        }
        rd_kafka_metadata_destroy(metadata);
    }

    ///////// poll
    printf("Poll...\n");
    while((msg= rd_kafka_consumer_poll(rk, 10000))) {
        if (msg->err)
            fprintf(stderr,"Got error msg %d (%s) [topic %s offset %lld]\n",msg->err,rd_kafka_err2name(msg->err),rd_kafka_topic_name(msg->rkt),msg->offset);
        else
            fprintf(stderr,"Got msg [topic %s offset %lld]\n",rd_kafka_topic_name(msg->rkt),msg->offset);
        rd_kafka_message_destroy(msg);
    }

    printf("Finished...\n");

    return 1;
}

outputs

Subscribing...
Get metadata...
Poll...
Got error msg 3 (UNKNOWN_TOPIC_OR_PART) [topic test1 offset -1001]
Finished...

if you edit the code to put the metadata request before the subscription, the error msg no longer appears. The debug log gets a TOPICERR entry only when metadata requested after subscription.

Checklist

IMPORTANT: We will close issues where the checklist has not been completed.

Please provide the following information:

  • [x] librdkafka version (release number or git tag): 2.3.0
  • [x] Apache Kafka version: 7.0.1
  • [x] librdkafka client configuration: see example above
  • [x] Operating system: OSX(x64)>
  • [x] Provide logs (with debug=.. as necessary) from librdkafka
  • [ ] Provide broker log excerpts
  • [ ] Critical issue

sshanks-kx avatar Jan 23 '24 16:01 sshanks-kx

good.log bad.log

bad.log - running with debug on with metadata request after subscription (gives error msg) good.log - running with debug on, with metadata request before subscription (no error msg)

sshanks-kx avatar Jan 23 '24 16:01 sshanks-kx

On our program that wraps librdkafka, you can see this bad msg appearing prior to the successful subscription messages (first msg only appears when requesting metadata after subscription call) e.g.

mtype                 topic client partition offset msgtime                       data                                                                        key      headers                 rcvtime                      
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
UNKNOWN_TOPIC_OR_PART test1 0      0         -1001                                "Subscribed topic not available: test1: Broker: Unknown topic or partition" `byte$() (`symbol$())!`symbol$() 2024.01.23D16:02:53.817149000
                      test1 0      0         0      2024.01.23D16:02:24.373000000 "2024.01.23D16:02:24.373020000"                                             `byte$() (`symbol$())!`symbol$() 2024.01.23D16:02:57.045098000
                      test1 0      0         1      2024.01.23D16:02:26.614000000 "2024.01.23D16:02:26.614183000"                                             `byte$() (`symbol$())!`symbol$() 2024.01.23D16:02:57.045232000
                      test1 0      0         2      2024.01.23D16:02:28.616000000 "2024.01.23D16:02:28.614183000"                                             `byte$() (`symbol$())!`symbol$() 2024.01.23D16:02:57.045352000
                      test1 0      0         3      2024.01.23D16:02:30.616000000 "2024.01.23D16:02:30.614183000"                                             `byte$() (`symbol$())!`symbol$() 2024.01.23D16:02:57.046034000

sshanks-kx avatar Jan 23 '24 16:01 sshanks-kx

This seems to happens every time when using rd_kafka_metadata after a valid subscription is made to an existing topic, where rd_kafka_queue_io_event_enable is used enable notifications (I need to use the file descriptor type notifications of items appearing in the queue).

On a similar app that just does publishing, it doesnt seem to occur when rd_kafka_metadata called after publishing. When trying with an example subscription program that doesnt use rd_kafka_queue_io_event_enable, it doesnt seem to happen either.

sshanks-kx avatar Jan 25 '24 13:01 sshanks-kx

I think this due to a bug discovered recently, affecting 2.1.0+ versions. When you subscribe a full metadata request is done. This request was emptying the metadata cache and causing those errors. https://github.com/confluentinc/librdkafka/blob/267367c9475c2154e72eafe6ff1957518cb2ed1a/src/rdkafka_metadata.c#L1441

It's fixed in this PR: https://github.com/confluentinc/librdkafka/pull/4660

could you test if it solves your problem?

emasab avatar Mar 26 '24 19:03 emasab

Thanks, will let you know either way, once Ive tried.

sshanks-kx avatar Mar 26 '24 20:03 sshanks-kx

@emasab Bug still occurring when testing with #4660

Running my full program I still see the issue.

Running with test program above, it still shows issue e.g.

Subscribing...
Get metadata...
Poll...
Got error msg 3 (UNKNOWN_TOPIC_OR_PART) [topic test1 offset -1001]

Still same effect, that if I move the rd_kafka_metadata before the rd_kafka_subscribe, you dont see the problem...but doing a rd_kafka_metadata after a rd_kafka_subscribe always shows the problem.

sshanks-kx avatar Mar 27 '24 11:03 sshanks-kx

@emasab FYI: I tried librdkafka v2.0.2 & I dont see the bug occurring with that version.

sshanks-kx avatar Mar 27 '24 16:03 sshanks-kx

Could you check latest pushed commit in same PR? I've reproduced and fixed your case, will add a new test too.

What happens is that when you call subscribe, the cache isn't populated until it joins the group, but if in the meantime it receives application metadata calls, it shouldn't update the group, because that causes this error.

emasab avatar Mar 28 '24 14:03 emasab

@emasab FYI: I tried librdkafka v2.0.2 & I dont see the bug occurring with that version.

Yeah, there were changes with KIP 320 in 2.1.0

emasab avatar Mar 28 '24 14:03 emasab

@emasab looks ok with latest change on PR , have tested again with original app.

sshanks-kx avatar Mar 30 '24 11:03 sshanks-kx