librdkafka icon indicating copy to clipboard operation
librdkafka copied to clipboard

Consumer reading same message after every poll

Open radhaagr opened this issue 3 years ago • 7 comments

Read the FAQ first: https://github.com/edenhill/librdkafka/wiki/FAQ

Do NOT create issues for questions, use the discussion forum: https://github.com/edenhill/librdkafka/discussions

Description

Consumer is reading same message from the same offset every time.

How to reproduce

<your steps how to reproduce goes here, or remove section if not relevant>

IMPORTANT: Always try to reproduce the issue on the latest released version (see https://github.com/edenhill/librdkafka/releases), if it can't be reproduced on the latest version the issue has been fixed.

Checklist

IMPORTANT: We will close issues where the checklist has not been completed.

Please provide the following information:

  • [x] librdkafka version (release number or git tag): <REPLACE with e.g., v0.10.5 or a git sha. NOT "latest" or "current">
  • [ ] Apache Kafka version: <REPLACE with e.g., 0.10.2.3>
  • [ ] librdkafka client configuration: <REPLACE with e.g., message.timeout.ms=123, auto.reset.offset=earliest, ..>
  • [ ] Operating system: <REPLACE with e.g., Centos 5 (x64)>
  • [ ] Provide logs (with debug=.. as necessary) from librdkafka
  • [ ] Provide broker log excerpts
  • [ ] Critical issue

radhaagr avatar Apr 04 '22 10:04 radhaagr

consumer is reading the same message after the poll. I tried different configuration values - "enable.auto.commit", "false" "offset.store.method", "true" "auto.offset.reset", "earliest" I have tried various combination of above config values but with same consumer group ID with every consumer restart. I am not sure why it is reading same Kafka message every time. Every message it receives has same offset. Though, all messages are being enqueued in Kafka topic.

radhaagr avatar Apr 04 '22 10:04 radhaagr

Pleas fill out the checklist and provide logs with config debug=cgrp,fetch set. Also provide your consumer loop code.

edenhill avatar Apr 04 '22 10:04 edenhill

Added consumer loop code -

while (!termFlag)
    {
        rd_kafka_message_t *rkm;

        rkm = rd_kafka_consumer_poll(m_rk, 300);
        /* Timeout: no message within 10ms,
        *  try again. This short timeout allows
        *  checking for `run` at frequent intervals.
        */
        if (!rkm)
        {
            ::Debug(__FILE__, __FUNCTION__, __LINE__,
                          "no Kafka messages after %d milliseconds", 100);
            //Sleep(2000);
            continue;
        }

        /* consumer_poll() will return either a proper message
         * or a consumer error (rkm->err is set). */
        if (rkm->err)
        {
            /* Consumer errors are generally to be considered
                * informational as the consumer will automatically
                * try to recover from all types of errors. */
            Error(__FILE__, __FUNCTION__, __LINE__,
                          "Consumer error: %s\n", rd_kafka_message_errstr(rkm));
            rd_kafka_message_destroy(rkm);
            continue;
        }
        ::Debug(__FILE__, __FUNCTION__, __LINE__,
                      "Message on %s [%" PRId32 "] at offset %" PRId64 ":\n",
                      rd_kafka_topic_name(rkm->rkt), rkm->partition,
                      rkm->offset);

        /* Print the message key. */
        if (rkm->key)
        {
            ::Debug(__FILE__, __FUNCTION__, __LINE__, "%d Key: %s\n",
                            (int)rkm->key_len, (const char *)rkm->key);
        }


        /* Print the message value/payload. */
        if (rkm->payload)
        {
            MlfString1024 payload((const char*)rkm->payload);
            payload.ReplaceAllNon7bitCharsWithSpaces();
            payload.ReplaceAllOf(0, "\t\n\r", " ");
            ::Debug(__FILE__, __FUNCTION__, __LINE__,"%d Value:%s\n",
                          (int)rkm->len, payload.cStr());

            // Enqueue payload in the DataProcessor Worker pool.
            parse_msg(rkm, serdes, purgeOldEmailExchanges);
        }

       rd_kafka_message_destroy(rkm);
    }
    MlfLog::Info(__FILE__, __FUNCTION__, __LINE__, "%s", "Closing consumer\n");

radhaagr avatar Apr 04 '22 10:04 radhaagr

I tried the documentation but could not figure out how to get logging in my code -

if (rd_kafka_conf_set(conf, "debug", "consumer,cgrp,topic,fetch", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
{ rd_kafka_conf_destroy(conf); return -1; } if (rd_kafka_conf_set(conf, "log_level", "7", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { rd_kafka_conf_destroy(conf); return -1; }

radhaagr avatar Apr 04 '22 10:04 radhaagr

Setting debug is sufficient. Logging will be emitted on stderr. If you want it elsewhere, set a rd_kafka_conf_set_log_cb

edenhill avatar Apr 04 '22 10:04 edenhill

It will be great, if you can add an example for this to set rd_kafka_conf_set_log_cb as the web search did not help me much.

radhaagr avatar Apr 04 '22 11:04 radhaagr

https://github.com/edenhill/librdkafka/blob/master/examples/rdkafka_complex_consumer_example.c#L93-L102 https://github.com/edenhill/librdkafka/blob/master/examples/rdkafka_complex_consumer_example.c#L319-L320

edenhill avatar Apr 04 '22 11:04 edenhill