broadway_kafka
broadway_kafka copied to clipboard
Offsets accumulating in the producer ack state
Hi all! 👋
First of all, thanks for the great libraries!
We are running into a strange issue where every now and then (roughly once a day) we start seeing offsets accumulating in the ack state of a producer (for one or more partitions).
From our debugging we understand the ack state values are of shape:
{pending_list, last, seen_list}
Looking into the affected producers we can see the pending
and seen
lists keep growing indefinitely until the VM is OOM killed.
The producer and processors seem to keep fetching and processing messages as evidenced by the seen
list growth. The issue seems to be a small number of messages were never acked and so they remain in the front of the pending
list.
We've been digging in the source code of broadway/broadway kafka and cannot find any point where messages/acks can get lost without trace (because we are not seeing any error logs or crash reports).
As a very hacky workaround we considering periodically checking for offset lag and then manually sending an ack
message for the missing offset_ids
in the front of the pending
to the producer if the lag is too big:
for producer <- Broadway.producer_names(MyBroadway) do
acks = :sys.get_state(producer).state.module_state.acks
Enum.each(acks, fn {k, {pending, _, seen}} ->
missing_ack = MapSet.difference(MapSet.new(pending), MapSet.new(seen))
missing_from_front = Enum.take_while(pending, &MapSet.member?(missing_ack, &1))
missing_count = Enum.count(missing_from_front)
if Enum.count(seen) > 5000 do
IO.inspect("#{missing_count} missing ack in the front for partition #{elem(k,2)}")
send(producer, {:ack, k, missing_from_front})
end
end)
end
This 'could' work for us since we mostly care about being up to date with the topic and we can assume missing a few messages, but it's far from ideal.
In case it's useful, our broadway pipeline is very simple, being roughly:
def start_link(_opts) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module:
{BroadwayKafka.Producer,
[
hosts: [{"hostname", 9092}],
group_id: "group_1",
topics: ["topic"]
]},
concurrency: 8
],
processors: [
default: [
concurrency: 32
]
],
batchers: [
publish: [concurrency: 32, batch_size: 50],
notify_errors: [concurrency: 10, batch_size: 50],
ignore: [concurrency: 10, batch_size: 500]
]
)
end
@impl true
def handle_message(_, message, _) do
if interested_in?(Map.new(message.metadata.headers)) do
case parse_data(message.data) do
{:ok, parsed_data} ->
message
|> Broadway.Message.put_data(parsed_data)
|> Broadway.Message.put_batcher(:publish)
{:error, reason} ->
message
|> Broadway.Message.put_data(reason)
|> Broadway.Message.put_batcher(:notify_errors)
end
else
Broadway.Message.put_batcher(message, :ignore)
end
end
@impl true
def handle_batch(:publish, messages, _batch_info, _context) do
Enum.each(messages, fn data ->
publish(data)
end)
messages
end
def handle_batch(:ignore, messages, _batch_info, _context) do
messages
end
def handle_batch(:notify_errors, messages, _batch_info, _context) do
Enum.each(messages, fn message ->
Logger.error("Error processing message", reason: message.data)
end)
messages
end
The error happens even when the interested_in?
function returns false, and therefore no processing is done at all, just forwarding to the ignore
batcher which does nothing.
Is there anything obvious we are missing?
Nothing obvious come to mind, unfortunately.
Thank you for having a look @josevalim, really appreciated!
After debugging for another while we have found one of the possible causes (I say one because we've seen offsets accumulating without this error message) of this, the batcher
dies due to an unknown timer.
"GenServer MyService.Broadway.Broadway.Batcher_ignore terminating
** (RuntimeError) unknown timer #Reference<0.1181658914.3223060481.95258>
(broadway 1.0.3) lib/broadway/topology/batcher_stage.ex:207: Broadway.Topology.BatcherStage.cancel_batch_timeout/1
(broadway 1.0.3) lib/broadway/topology/batcher_stage.ex:148: Broadway.Topology.BatcherStage.deliver_batch/6
(broadway 1.0.3) lib/broadway/topology/batcher_stage.ex:118: Broadway.Topology.BatcherStage.handle_events_per_batch_key/3
(broadway 1.0.3) lib/broadway/topology/batcher_stage.ex:64: anonymous fn/2 in Broadway.Topology.BatcherStage.handle_events/3
(telemetry 1.1.0) /build/deps/telemetry/src/telemetry.erl:320: :telemetry.span/3
(broadway 1.0.3) lib/broadway/topology/batcher_stage.ex:54: Broadway.Topology.BatcherStage.handle_events/3
(gen_stage 1.1.2) lib/gen_stage.ex:2471: GenStage.consumer_dispatch/6
(gen_stage 1.1.2) lib/gen_stage.ex:2660: GenStage.take_pc_events/3
It seems that Broadway already accounts for the case where the timeout message has been received when the cancel timer returns false (https://github.com/dashbitco/broadway/blob/main/lib/broadway/topology/batcher_stage.ex#L201-L213), but there seems to be an edge case?
My guess is crashing the batcher is fine for other Producers where the ACK is not sequential, but for BroadwayKafka it seems like it's messing up.
I am looking at the code and I cannot see a code path that would make the error message above happen. Every time we cancel the timer, we delete the batch, which means it is impossible to recover the timer again.
This has been fixed in Broadway. There was an assumption that the timer message would be delivered automatically but that was not always the case.
This has been fixed in Broadway. There was an assumption that the timer message would be delivered automatically but that was not always the case.
This is great news! Thank you very much for all your work @josevalim ❤️