kafka_ex
kafka_ex copied to clipboard
Error on fetching message sets larger than max_bytes
See #268 for details and some discussion.
We had a recent production issue where our consumer was unable to progress. It turns out that our Kafka broker was sending the client a Message Set of around 1.7MB, which is incompatible with a max_bytes of 61000. We use a small max_bytes as we currently run all 64 partition consumers on a single host and don't want to OOM if it falls a bit behind. Our message producer is limited to messages <61000 bytes, so we didn't expect KafkaEx to have a problem.
Is this fixed now? we raise a specific error for this case (changed in #268)
https://github.com/kafkaex/kafka_ex/blob/b0d472b2919e40ba403ad1e3b3a9224290827788/lib/kafka_ex/protocol/fetch.ex#L77
not sure what else we could do to handle the issue.
I got same error
** (EXIT) an exception was raised: ** (RuntimeError) Insufficient data fetched at offset 662363595135136302. Message size is 825622528 but only received 244 bytes. Try increasing max_bytes.
and looks like kafka_ex reads all messages (37) stored in kafka. Can this error be fixed in kafka_ex with chunk reading data?
it appears as though the message size is 825MB, which means you need to increase max_bytes to at least that size to be able to consume it. As an aside, I would not recommend using kafka for messages of that size.
But it would be great if kafka_ex will read big message by chunks (for example 80 times by 10 MB) and then return one message. Also 825MB it is not one big object in kakfa, because the size of one message pushed to kafka in this test env is about 50KB
Is this fixed now?
@joshuawscott I would argue that it's not, even though logging may be better in some cases. Note that the Kafka broker receiving a Fetch
with max_bytes
treates it as a guideline. If the first message is larger than max_bytes
, it's still returned in the response.
Maximum bytes to accumulate in the response. Note that this is not an absolute maximum, if the first message in the first non-empty partition of the fetch is larger than this value, the message will still be returned to ensure that progress can be made.
https://kafka.apache.org/protocol#The_Messages_Fetch
When this happens, it appears that kafka_ex
chokes on the larger-than-expected message:
[error] GenServer #PID<0.7296.0> terminating
** (stop) exited in: GenServer.call(#PID<0.7297.0>, {:fetch, %KafkaEx.Protocol.Fetch.Request{auto_commit: false, client_id: nil, correlation_id: nil, max_bytes: 1000000, min_bytes: 64000, offset: 2000604, partition: 52, topic: "<topic>", wait_time: 10}}, 5000)
** (EXIT) an exception was raised:
** (RuntimeError) Parse error during KafkaEx.Protocol.Fetch.parse_response. Couldn't parse: <<..... bytes ...>>
(kafka_ex) lib/kafka_ex/protocol/fetch.ex:131: KafkaEx.Protocol.Fetch.parse_message_set/2
(kafka_ex) lib/kafka_ex/protocol/fetch.ex:95: KafkaEx.Protocol.Fetch.parse_partitions/3
(kafka_ex) lib/kafka_ex/protocol/common.ex:25: KafkaEx.Protocol.Common.parse_topics/3
(kafka_ex) lib/kafka_ex/server.ex:857: KafkaEx.Server0P8P2.network_request/3
(kafka_ex) lib/kafka_ex/server_0_p_8_p_2.ex:259: KafkaEx.Server0P8P2.fetch/2
(kafka_ex) lib/kafka_ex/server_0_p_8_p_2.ex:117: KafkaEx.Server0P8P2.kafka_server_fetch/2
(stdlib) gen_server.erl:661: :gen_server.try_handle_call/4
(stdlib) gen_server.erl:690: :gen_server.handle_msg/6
(elixir) lib/gen_server.ex:989: GenServer.call/3
(kafka_ex) lib/kafka_ex/gen_consumer.ex:679: KafkaEx.GenConsumer.consume/1
(kafka_ex) lib/kafka_ex/gen_consumer.ex:630: KafkaEx.GenConsumer.handle_info/2
(stdlib) gen_server.erl:637: :gen_server.try_dispatch/4
(stdlib) gen_server.erl:711: :gen_server.handle_msg/6
(stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Last message: :timeout
Note that this is from version 0.10.0
.
The answer at this point is to ensure your max_bytes matches the topic's max_bytes. We treat max_bytes as a requirement in kafka_ex, so if it is lower than the topic's max_bytes, the request will fail.