pykafka icon indicating copy to clipboard operation
pykafka copied to clipboard

Consumer param `fetch_message_max_bytes` should be a soft limit

Open jeffwidman opened this issue 8 years ago • 8 comments

We hit a production issue with a stuck pykafka-based consumer that I tracked down to a single message that was larger than the consumer's fetch_message_max_bytes, but smaller than the broker's message.max.bytes (meaning the broker didn't reject the message when it was originally produced).

In the Java reference implementation, the fetch.max.bytes param is a soft limit on both the broker and the new consumer: https://kafka.apache.org/documentation/#newconsumerconfigs

It needs to be a soft limit to avoid exactly this scenario of a single message that is smaller than the broker's message.max.bytes but larger than the consumer's fetch.max.bytes stopping the consumer.

So the broker returned the message, but then when pykafka tried to process the message it blew up here: https://github.com/Parsely/pykafka/blame/c80f66c0d0b11d830aa333ed486967b5f242bc2f/pykafka/protocol.py#L383

I think pykafka should mimic the upstream behavior of treating this as a soft limit rather than a hard limit. Perhaps emit a warning, but still process the message.

jeffwidman avatar Jul 01 '17 09:07 jeffwidman

One design consideration when treating this as a soft-limit is the memory pressure on the client. I didn't dive deep enough into the code to check if the decode buffer is fixed size or dynamic. It needs to be dynamic for this to be a soft-limit.

jeffwidman avatar Jul 01 '17 10:07 jeffwidman

@jeffwidman Likewise, thanks for this -- assigning to @emmett9001.

amontalenti avatar Jul 01 '17 11:07 amontalenti

Thanks @jeffwidman for the investigation, and for opening this as a separate ticket from #697.

emmettbutler avatar Jul 06 '17 20:07 emmettbutler

Pykafka treats fetch_message_max_bytes as a hard limit specifically because the decode buffer is statically allocated by BrokerConnection on initialization. fetch_message_max_bytes is used as a safeguard against overflowing this buffer. It's a bug that this implementation leads to the problem mentioned by @jeffwidman above in which a message from the broker is larger than fetch_message_max_bytes but smaller than the broker's message.max.bytes.

If my understanding is correct, this problem only happens when fetch_message_max_bytes < message.max.bytes. If this condition doesn't hold, the bug will never appear. Thus, the workaround in current master is to ensure that fetch_message_max_bytes is always at least message.max_bytes.

There may have been a pull request at one point to dynamically allocate the decode buffer, but I can't find it. I remember we ran into some issues last time we tried to make that change.

emmettbutler avatar Aug 07 '17 21:08 emmettbutler

Thus, the workaround in current master is to ensure that fetch_message_max_bytes is always at least message.max_bytes

Yes, I implemented that as a temporary workaround. But that isn't ideal in the long term in certain scenarios, such as worrying about memory usage on the client.

jeffwidman avatar Aug 07 '17 23:08 jeffwidman

Update: I noticed that this behavior changed upstream as part of KIP-74 / KAFKA-2063, so likely the original implementation was actually the expected behavior at the time in the Java world.

jeffwidman avatar Oct 04 '17 19:10 jeffwidman

@jeffwidman It's hard for me to tell from the breadcrumb trail of KIPs - does pykafka's current behavior now match the reference implementation?

emmettbutler avatar Dec 21 '17 20:12 emmettbutler

Afraid not. The original issue description is still correct, my addendum comment was just that at the time of implementation pykafka matched the expected behavior but no longer does. It should now be a soft limit.

jeffwidman avatar Dec 21 '17 23:12 jeffwidman