broadway_kafka icon indicating copy to clipboard operation
broadway_kafka copied to clipboard

Offsets accumulating in the producer ack state

Open danmarcab opened this issue 2 years ago • 3 comments

Hi all! 👋

First of all, thanks for the great libraries!

We are running into a strange issue where every now and then (roughly once a day) we start seeing offsets accumulating in the ack state of a producer (for one or more partitions).

From our debugging we understand the ack state values are of shape:

{pending_list, last, seen_list}

Looking into the affected producers we can see the pending and seen lists keep growing indefinitely until the VM is OOM killed.

The producer and processors seem to keep fetching and processing messages as evidenced by the seen list growth. The issue seems to be a small number of messages were never acked and so they remain in the front of the pending list.

We've been digging in the source code of broadway/broadway kafka and cannot find any point where messages/acks can get lost without trace (because we are not seeing any error logs or crash reports).

As a very hacky workaround we considering periodically checking for offset lag and then manually sending an ack message for the missing offset_ids in the front of the pending to the producer if the lag is too big:

for producer <- Broadway.producer_names(MyBroadway) do
  acks = :sys.get_state(producer).state.module_state.acks
  Enum.each(acks, fn {k, {pending, _, seen}} -> 
    missing_ack = MapSet.difference(MapSet.new(pending), MapSet.new(seen))
    
    missing_from_front = Enum.take_while(pending, &MapSet.member?(missing_ack, &1))
    missing_count = Enum.count(missing_from_front)
    
    if Enum.count(seen) > 5000 do
      IO.inspect("#{missing_count} missing ack in the front for partition #{elem(k,2)}")
      send(producer, {:ack, k, missing_from_front})
    end      
  end)
end

This 'could' work for us since we mostly care about being up to date with the topic and we can assume missing a few messages, but it's far from ideal.

In case it's useful, our broadway pipeline is very simple, being roughly:

  def start_link(_opts) do
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module:
          {BroadwayKafka.Producer,
           [
             hosts: [{"hostname", 9092}],
             group_id: "group_1",
             topics: ["topic"]
           ]},
        concurrency: 8
      ],
      processors: [
        default: [
          concurrency: 32
        ]
      ],
      batchers: [
        publish: [concurrency: 32, batch_size: 50],
        notify_errors: [concurrency: 10, batch_size: 50],
        ignore: [concurrency: 10, batch_size: 500]
      ]
    )
  end

  @impl true
  def handle_message(_, message, _) do
    if interested_in?(Map.new(message.metadata.headers)) do
      case parse_data(message.data) do
        {:ok, parsed_data} ->
          message
          |> Broadway.Message.put_data(parsed_data)
          |> Broadway.Message.put_batcher(:publish)

        {:error, reason} ->
          message
          |> Broadway.Message.put_data(reason)
          |> Broadway.Message.put_batcher(:notify_errors)
      end
    else
      Broadway.Message.put_batcher(message, :ignore)
    end
  end

  @impl true
  def handle_batch(:publish, messages, _batch_info, _context) do
    Enum.each(messages, fn data ->
      publish(data)
    end)

    messages
  end

  def handle_batch(:ignore, messages, _batch_info, _context) do
    messages
  end

  def handle_batch(:notify_errors, messages, _batch_info, _context) do
    Enum.each(messages, fn message ->
      Logger.error("Error processing message", reason: message.data)
    end)

    messages
  end

The error happens even when the interested_in? function returns false, and therefore no processing is done at all, just forwarding to the ignore batcher which does nothing.

Is there anything obvious we are missing?

danmarcab avatar Jun 06 '22 11:06 danmarcab

Nothing obvious come to mind, unfortunately.

josevalim avatar Jun 06 '22 12:06 josevalim

Thank you for having a look @josevalim, really appreciated!

After debugging for another while we have found one of the possible causes (I say one because we've seen offsets accumulating without this error message) of this, the batcher dies due to an unknown timer.

"GenServer MyService.Broadway.Broadway.Batcher_ignore terminating
    ** (RuntimeError) unknown timer #Reference<0.1181658914.3223060481.95258>
        (broadway 1.0.3) lib/broadway/topology/batcher_stage.ex:207: Broadway.Topology.BatcherStage.cancel_batch_timeout/1
        (broadway 1.0.3) lib/broadway/topology/batcher_stage.ex:148: Broadway.Topology.BatcherStage.deliver_batch/6
        (broadway 1.0.3) lib/broadway/topology/batcher_stage.ex:118: Broadway.Topology.BatcherStage.handle_events_per_batch_key/3
        (broadway 1.0.3) lib/broadway/topology/batcher_stage.ex:64: anonymous fn/2 in Broadway.Topology.BatcherStage.handle_events/3
        (telemetry 1.1.0) /build/deps/telemetry/src/telemetry.erl:320: :telemetry.span/3
        (broadway 1.0.3) lib/broadway/topology/batcher_stage.ex:54: Broadway.Topology.BatcherStage.handle_events/3
        (gen_stage 1.1.2) lib/gen_stage.ex:2471: GenStage.consumer_dispatch/6
        (gen_stage 1.1.2) lib/gen_stage.ex:2660: GenStage.take_pc_events/3

It seems that Broadway already accounts for the case where the timeout message has been received when the cancel timer returns false (https://github.com/dashbitco/broadway/blob/main/lib/broadway/topology/batcher_stage.ex#L201-L213), but there seems to be an edge case?

My guess is crashing the batcher is fine for other Producers where the ACK is not sequential, but for BroadwayKafka it seems like it's messing up.

danmarcab avatar Jun 20 '22 11:06 danmarcab

I am looking at the code and I cannot see a code path that would make the error message above happen. Every time we cancel the timer, we delete the batch, which means it is impossible to recover the timer again.

josevalim avatar Jun 20 '22 12:06 josevalim

This has been fixed in Broadway. There was an assumption that the timer message would be delivered automatically but that was not always the case.

josevalim avatar Mar 14 '23 16:03 josevalim

This has been fixed in Broadway. There was an assumption that the timer message would be delivered automatically but that was not always the case.

This is great news! Thank you very much for all your work @josevalim ❤️

danmarcab avatar Mar 14 '23 17:03 danmarcab