kcat
kcat copied to clipboard
Missing messages when consuming
Having some really weird behaviour.
Using the edenhill/kafkacat:1.6.0 docker image.
We have around 102 messages on a topic, there are three partitions. The messages are evenly partitioned (38 on 0, 32 on 1 and 32 on 2).
The message values are in avro format and the keys are in json format.
Using kafkacat I run this command:
kafkacat -b kafka:9092 -s value=avro -r http://schema-registry:8081 -t my-topic | grep <unique-value>
Where unique value only appears on one of the messages.
I expect that on each execution of this I should get the message displayed in the terminal. But instead depending on the order of the partition consumption I either get no output or the message.
Interestingly only 51 messages are returned in these calls, so half of what is on the topic. (I did this count by first outputing to a file and then doing cat messages.txt | wc -l
)
Interestingly we then tried the format option -f '\nKey (%K bytes): %k Value (%S bytes): %s Timestamp: %T Partition: %p Offset: %o Headers: %h\n'
. This seems to constantly return the message regardless of the ordering. Also performing a count on this does render the 102 messages as expected.
Not sure what is going on here, but it's pretty consistent for us at the moment.
Could it be that some messages fail deserialization? Try running without your grep and see what happens.
Yeah tried without grep, same results. It's a bit random that on some runs it will show the message and on others it won't. The times when it does show it appears that the partition 0 is consumed first (that's the one that the message is on).
Avro messages are emitted as JSON without a delimiter, each message being its own json document. From the stdout IO streams perspective that might mean the output will be buffered until the buffer size is exhausted or a newline is seen.
What exit criteria do you have for kafkacat? Do you kill it manually after some time?
If you try to specify a partition (-p ..) does it consistently consume all messages?
Have you tried setting the start offset with -o beginning
?
Do all messages have the same avro schema?
Yes they all have the same avro schema, and we have no exit criteria we just wait for 10 seconds or so after seeing the Reached end of topic message and cancel the command.
We've recreated our kafka environment since then but luckily for this bug it's still an issue, we have 3 partitions again the numbers are 82, 79 and 84 on the 3 partitions on the topic.
Just tried this morning, so first the -p with partition 0, I get only 41 messages back, tried with the other two partitions and I get half of the messages back (so that's 41, 40 and 42 messages on the partitions returned).
Also tried the -o beginning
option and only get 123 messages back which is approx half of 245 rounded up so the same issue again.
Must be something special about our setup though, doesn't seem like anyone else is having this issue. The only thing special is the avro value with the json key. If you have a cluster up with avro values could you try this:
kafkacat -b kafka:9092 -s value=avro -r http://schema-registry:8081 -t test-topic > test.txt
cat test.txt | wc -l
If it works for you it could be down to the key value encoding combination, not sure how though....
I encountered the same issue with a topic with 3 partitions, containing json document. Messages are missing when I am consuming all partitions, but they are there when consuming partition one by one.
I have discovered that I have no issue at all with the -e
option, so it seems to be a buffer flush issue ? (somehow, the message are probably writting to stdout but not flushed yet)
Try kafkacat -u ...
for unbuffered output
Yes, it is also solving the problem. Maybe then @mt3593 you are in the same situation ?
ah nice, I'll give that a go
Yep this fixed it for me too!
This behaviour is strange to debug when using with other tools, at first glance it is not obvious why it sometimes outputs messages and others not (it was not happening to me before updating recently)
I did not find out how to define the default behavior to be unbuffered so I used an alias:
# KCAT unbuffered output
alias kcat="/usr/local/bin/kcat -u"
Where /usr/local/bin/kcat
corresponds to the output of which cat
Shouldn't kcat
flush the output periodically for the case if the is no much activity in the topic and we try to grep for the last message, that is not flushed yet.
Because it really look weird. I run kcat -C -t topic-name | grep "pattern"
and it shows nothing, if the message containing pattern is the last message of the topic.
If I remove grep
and just send it into the stdout, then everything looks fine, even without -u
.