librdkafka
librdkafka copied to clipboard
Consumer reading same message after every poll
Read the FAQ first: https://github.com/edenhill/librdkafka/wiki/FAQ
Do NOT create issues for questions, use the discussion forum: https://github.com/edenhill/librdkafka/discussions
Description
Consumer is reading same message from the same offset every time.
How to reproduce
<your steps how to reproduce goes here, or remove section if not relevant>
IMPORTANT: Always try to reproduce the issue on the latest released version (see https://github.com/edenhill/librdkafka/releases), if it can't be reproduced on the latest version the issue has been fixed.
Checklist
IMPORTANT: We will close issues where the checklist has not been completed.
Please provide the following information:
- [x] librdkafka version (release number or git tag):
<REPLACE with e.g., v0.10.5 or a git sha. NOT "latest" or "current"> - [ ] Apache Kafka version:
<REPLACE with e.g., 0.10.2.3> - [ ] librdkafka client configuration:
<REPLACE with e.g., message.timeout.ms=123, auto.reset.offset=earliest, ..> - [ ] Operating system:
<REPLACE with e.g., Centos 5 (x64)> - [ ] Provide logs (with
debug=..as necessary) from librdkafka - [ ] Provide broker log excerpts
- [ ] Critical issue
consumer is reading the same message after the poll. I tried different configuration values - "enable.auto.commit", "false" "offset.store.method", "true" "auto.offset.reset", "earliest" I have tried various combination of above config values but with same consumer group ID with every consumer restart. I am not sure why it is reading same Kafka message every time. Every message it receives has same offset. Though, all messages are being enqueued in Kafka topic.
Pleas fill out the checklist and provide logs with config debug=cgrp,fetch set. Also provide your consumer loop code.
Added consumer loop code -
while (!termFlag)
{
rd_kafka_message_t *rkm;
rkm = rd_kafka_consumer_poll(m_rk, 300);
/* Timeout: no message within 10ms,
* try again. This short timeout allows
* checking for `run` at frequent intervals.
*/
if (!rkm)
{
::Debug(__FILE__, __FUNCTION__, __LINE__,
"no Kafka messages after %d milliseconds", 100);
//Sleep(2000);
continue;
}
/* consumer_poll() will return either a proper message
* or a consumer error (rkm->err is set). */
if (rkm->err)
{
/* Consumer errors are generally to be considered
* informational as the consumer will automatically
* try to recover from all types of errors. */
Error(__FILE__, __FUNCTION__, __LINE__,
"Consumer error: %s\n", rd_kafka_message_errstr(rkm));
rd_kafka_message_destroy(rkm);
continue;
}
::Debug(__FILE__, __FUNCTION__, __LINE__,
"Message on %s [%" PRId32 "] at offset %" PRId64 ":\n",
rd_kafka_topic_name(rkm->rkt), rkm->partition,
rkm->offset);
/* Print the message key. */
if (rkm->key)
{
::Debug(__FILE__, __FUNCTION__, __LINE__, "%d Key: %s\n",
(int)rkm->key_len, (const char *)rkm->key);
}
/* Print the message value/payload. */
if (rkm->payload)
{
MlfString1024 payload((const char*)rkm->payload);
payload.ReplaceAllNon7bitCharsWithSpaces();
payload.ReplaceAllOf(0, "\t\n\r", " ");
::Debug(__FILE__, __FUNCTION__, __LINE__,"%d Value:%s\n",
(int)rkm->len, payload.cStr());
// Enqueue payload in the DataProcessor Worker pool.
parse_msg(rkm, serdes, purgeOldEmailExchanges);
}
rd_kafka_message_destroy(rkm);
}
MlfLog::Info(__FILE__, __FUNCTION__, __LINE__, "%s", "Closing consumer\n");
I tried the documentation but could not figure out how to get logging in my code -
if (rd_kafka_conf_set(conf, "debug", "consumer,cgrp,topic,fetch", errstr,
sizeof(errstr)) != RD_KAFKA_CONF_OK)
{
rd_kafka_conf_destroy(conf);
return -1;
}
if (rd_kafka_conf_set(conf, "log_level", "7", errstr,
sizeof(errstr)) != RD_KAFKA_CONF_OK)
{
rd_kafka_conf_destroy(conf);
return -1;
}
Setting debug is sufficient. Logging will be emitted on stderr. If you want it elsewhere, set a rd_kafka_conf_set_log_cb
It will be great, if you can add an example for this to set rd_kafka_conf_set_log_cb as the web search did not help me much.
https://github.com/edenhill/librdkafka/blob/master/examples/rdkafka_complex_consumer_example.c#L93-L102 https://github.com/edenhill/librdkafka/blob/master/examples/rdkafka_complex_consumer_example.c#L319-L320