brod icon indicating copy to clipboard operation
brod copied to clipboard

brod_topic_subscriber throws away #kafka_fetch_error{}

Open dszoboszlay opened this issue 4 years ago • 4 comments

I encountered an issue where a group subscriber stopped making any progress. These are the logs from the incident, before it went silent:

2021-03-02T10:10:11.139033+00:00 notice:
Group member (mysubscribergroup,coor=<0.27477.6987>,cb=<0.8782.6987>,generation=141):
assignments received:
  mytopic:
    partition=0 begin_offset=undefined
    partition=1 begin_offset=undefined
    partition=2 begin_offset=undefined
    partition=3 begin_offset=undefined
    partition=4 begin_offset=undefined
    partition=5 begin_offset=undefined

2021-03-02T10:10:11.139676+00:00 notice:
Starting group_subscriber_worker: #{commit_fun =>
                                        #Fun<brod_group_subscriber_v2.3.129769404>,
                                    group_id => <<"mysubscribergroup">>,
                                    partition => 1,
                                    topic =>
                                        <<"mytopic">>}
Offset: undefined
Pid: 'pid.group_subscriber_worker.1'

2021-03-02T10:10:11.139740+00:00 notice:
    supervisor: {<0.27084.6987>,brod_consumers_sup}
    started: [{pid,'pid.brod_consumers_sup.mytopic'},
              {id,<<"mytopic">>},
              {mfargs,
                  {supervisor3,start_link,
                      [brod_consumers_sup,
                       {brod_consumers_sup2,'pid.brod_client.kflow_default_client',
                           <<"mytopic">>,
                           [{begin_offset,earliest}]}]}},
              {restart_type,{permanent,10}},
              {shutdown,infinity},
              {child_type,supervisor}]

2021-03-02T10:10:11.140214+00:00 notice:
    supervisor: {'pid.brod_consumers_sup.mytopic',brod_consumers_sup}
    started: [{pid,'pid.brod_consumer.1'},
              {id,1},
              {mfargs,{brod_consumer,start_link,
                                     ['pid.brod_client.kflow_default_client',
                                      <<"mytopic">>,1,
                                      [{begin_offset,earliest}]]}},
              {restart_type,{transient,2}},
              {shutdown,5000},
              {child_type,worker}]

2021-03-02T10:12:07.121191+00:00 notice:
brod_consumer 'pid.brod_consumer.1' consumer is suspended, waiting for subscriber 'pid.group_subscriber_worker.1' to resubscribe with new begin_offset

I replaced most of the pids with 'pid.module.id' atoms to make the logs more readable.

My understanding is that the last message is coming from here: https://github.com/kafka4beam/brod/blob/1c9144ba5a96267fe074f9ff04e3f76490bfe6bc/src/brod_consumer.erl#L609-L617

The brod_consumer sent a {self(), #kafka_fetch_error{}} message to the brod_topic_subscriber, but the latter only expects #kafka_message_set{}-s here: https://github.com/kafka4beam/brod/blob/1c9144ba5a96267fe074f9ff04e3f76490bfe6bc/src/brod_topic_subscriber.erl#L298-L300

So the error is thrown away and thus the whole subscriber stops progressing. This looks like a problem to me, but to be honest, I don't know how to handle these errors. Maybe just log the error and crash?

By the way, I don't understand how the consumer got that error either, because it looks like it should have started from the earliest offset - which then turns out not to exist? But this happened after some big Kafka hiccup, when the previous 40 minutes was spent receiving coordinator load in progress responses from Kafka and nothing else, so I'm not particularly surprised.

dszoboszlay avatar Mar 04 '21 15:03 dszoboszlay

I've faced same problem. Starting brod_group_subscriber_v2, populating topic(with 3 partitions), every few runs one of partitions freezes without any output. After some debugging found out reason is mentioned above kafka_fetch_error with offset_out_of_range reason. brod_subscriber sends this error message, but brod_topic_subscriber ignores it. Adding handler to brod_topic_subscriber with error("msg") helps to restart subscribers. I don't think it's a proper solution, but at least subscribers can progress consuming messages. imo ideal solution would be if consumer would use get_committed_offset callback and resubscribe.

SerikDM avatar Sep 08 '22 17:09 SerikDM

Have to admit it's not nice neither brod_group_subscriber nor brod_topic_subscriber handles the said fetch error (not even a log at a proper severity-level). This was a 'miss-out', If I remember correctly, we were using per-partition group subscriber implemented outside of brod, and the handling of such error is to raise a critical level alert or log to get OPs' attention, because there should be absolutely NO gap in that data stream.

Maybe the least we can do is to handle the error message in handle_info and crash in brod_group_subscriber and brod_topic_subscriber.

===================

If you want the offset to be reset in case of offset_out_of_rage fetch error, you can provide the offset_reset_policy config. The default offset_reset_policy was set to reset_by_subscriber to be on a very defensive side.

  • Setting to earliest would imply a huge amount of data reply.
  • Setting to latest would imply a gap in the processed stream. IMO, the safest approach is to wait for a manual reset --- the reason behind the default value.

zmstone avatar Sep 09 '22 07:09 zmstone

Agree auto reset to earliest or latest is not a good option. Logging and crashing sounds good, that should bring enough attention to figure out what's the real issue of data gap. upd: can confirm in my case the issue is a data gap. consumer is starting with some non zero offset. populating topic before starting consumer works fine. while populating topic 1 second after starting consumer causes consumer to freeze without progressing further even after topic was populated up to expected offset.

SerikDM avatar Sep 09 '22 07:09 SerikDM

Hi @SerikDM No surprise that it did not resume even after the out-of-range offset is in-rage after a while because the subscriber process is expecting a re-subscribe from the subscriber process.

I'll try to send a PR to add a shutdown.

zmstone avatar Sep 10 '22 17:09 zmstone