kafka-hadoop-consumer icon indicating copy to clipboard operation
kafka-hadoop-consumer copied to clipboard

Problems reading batched compressed messages

Open cresny opened this issue 12 years ago • 1 comments

If messages are produced by asynchronous Producer with compression, then offset boundaries will span multiple messages. If offset = 0 represents a batch, current implementation of KafkaInputFormat.nextKeyValue() test fails because test is for "next > 0" and next is still = 0. This results in the the partition not being consumed and the mapper preemptively returning.

A simple fix for the read problem is to change test to "next >= 0". This will allow for full reads. However, there is still a problem with how KafkaContext derives "prior offset" for the mapper key. This will return garbage because 1. the offset does not always advance and 2. the message data is compressed. In fact, I don't think the current logic will produce an accurate prior offset for any compressed messages.

Because of the compression I think it's best to simplify the Mapper Key logic to just use a "priorOffset" reference. At least that will support an accurate reading of "current offset" to a map function, though in reality most people probably ignore this value anyway as the Kafka Hadoop process is an all-or-none commit anyway.

If the above sounds ok to you I can make the changes and issue apull request.

cresny avatar Apr 16 '13 16:04 cresny

Thank you for the comment and your interest on the project.

I didn't consider the compressed messages at all.

The pull request must be welcomed.

miniway avatar Apr 16 '13 16:04 miniway