broadway_sqs
broadway_sqs copied to clipboard
AWS SQS FIFO messages being processed out of order or in duplicate
I've been using broadway_sqs
to consume AWS SQS FIFO queues and I noticed some unexpected behaviours when processing the messages since sometimes those were processed out of order or more than one time.
Initially I didn't had the Broadway partition_by configured and once I did that, things seemed to improve but I can still see some double processing and out of order processing occurring. For example, looking at the below logs – organized by process identifier to help readability – we can see that:
- PID 383 consumed the second message from group C, without waiting for the first message of the same group to be acknowledge and thus removed from queue. Visibility timeout is of 10 seconds and even that wasn't guaranteed.
- PID 419 processed the same fifth message two times.
16:39:34.610 [info] [#PID<0.382.0>] Handling message: "B" / "B1"
16:39:37.270 [info] [#PID<0.382.0>] Message acknowledge: "B" / "B1"
16:39:37.300 [info] [#PID<0.382.0>] Handling message: "B" / "B2"
16:39:39.633 [info] [#PID<0.382.0>] Message acknowledge: "B" / "B2"
16:39:34.610 [info] [#PID<0.427.0>] Handling message: "C" / "C1"
16:39:39.830 [info] [#PID<0.427.0>] Message processing failed : "C" / "C1"
16:39:39.830 [info] [#PID<0.427.0>] timeout: "C" / "C1"
16:39:39.830 [info] [#PID<0.427.0>] Handling message: "C" / "C2"
16:39:42.194 [info] [#PID<0.427.0>] Message acknowledge: "C" / "C2"
16:39:45.814 [info] [#PID<0.427.0>] Handling message: "C" / "C1"
16:39:47.528 [info] [#PID<0.427.0>] Message acknowledge: "C" / "C1"
16:39:34.610 [info] [#PID<0.419.0>] Handling message: "A" / "A1"
16:39:37.504 [info] [#PID<0.419.0>] Message acknowledge: "A" / "A1"
16:39:37.521 [info] [#PID<0.419.0>] Handling message: "A" / "A2"
16:39:40.242 [info] [#PID<0.419.0>] Message acknowledge: "A" / "A2"
16:39:40.263 [info] [#PID<0.419.0>] Handling message: "A" / "A3"
16:39:42.404 [info] [#PID<0.419.0>] Message acknowledge: "A" / "A3"
16:39:42.422 [info] [#PID<0.419.0>] Handling message: "A" / "A4"
16:39:44.933 [info] [#PID<0.419.0>] Message acknowledge: "A" / "A4"
16:39:44.954 [info] [#PID<0.419.0>] Handling message: "A" / "A5"
16:39:47.037 [info] [#PID<0.419.0>] Message acknowledge: "A" / "A5"
16:39:47.056 [info] [#PID<0.419.0>] Handling message: "A" / "A5"
16:39:50.001 [info] [#PID<0.419.0>] Message acknowledge: "A" / "A5"
Before setting up the partition_by the behaviour was even more awkward with different consumers handling messages from the same message_group_id:
16:25:33.076 [info] [#PID<0.423.0>] Handling message: "A" / "A1"
16:25:36.304 [info] [#PID<0.423.0>] Message acknowledge: "A" / "A1"
16:25:36.326 [info] [#PID<0.423.0>] Handling message: "A" / "A2"
16:25:38.859 [info] [#PID<0.423.0>] Message acknowledge: "A" / "A2"
16:25:38.869 [info] [#PID<0.423.0>] Handling message: "A" / "A3"
16:25:41.709 [info] [#PID<0.423.0>] Message acknowledge: "A" / "A3"
16:25:41.728 [info] [#PID<0.423.0>] Handling message: "A" / "A4"
16:25:44.206 [info] [#PID<0.423.0>] Message acknowledge: "A" / "A4"
16:25:44.230 [info] [#PID<0.423.0>] Handling message: "A" / "A5"
16:25:46.592 [info] [#PID<0.423.0>] Message acknowledge: "A" / "A5"
16:25:46.613 [info] [#PID<0.423.0>] Handling message: "C" / "C1"
16:25:48.219 [info] [#PID<0.423.0>] Message acknowledge: "C" / "C1"
16:25:48.233 [info] [#PID<0.423.0>] Handling message: "C" / "C2"
16:25:50.134 [info] [#PID<0.423.0>] Message acknowledge: "C" / "C2"
16:25:50.152 [info] [#PID<0.423.0>] Handling message: "B" / "B1"
16:25:52.073 [info] [#PID<0.423.0>] Message acknowledge: "B" / "B1"
16:25:52.087 [info] [#PID<0.423.0>] Handling message: "B" / "B2"
16:25:53.323 [info] [#PID<0.423.0>] Message acknowledge: "B" / "B2"
16:25:44.286 [info] [#PID<0.424.0>] Handling message: "C" / "C1"
16:25:46.866 [info] [#PID<0.424.0>] Message acknowledge: "C" / "C1"
16:25:46.877 [info] [#PID<0.424.0>] Handling message: "C" / "C2"
16:25:48.653 [info] [#PID<0.424.0>] Message acknowledge: "C" / "C2"
16:25:48.661 [info] [#PID<0.424.0>] Handling message: "A" / "A5"
16:25:50.938 [info] [#PID<0.424.0>] Message acknowledge: "A" / "A5"
16:25:50.958 [info] [#PID<0.424.0>] Handling message: "B" / "B1"
16:25:52.517 [info] [#PID<0.424.0>] Message acknowledge: "B" / "B1"
16:25:52.536 [info] [#PID<0.424.0>] Handling message: "B" / "B2"
16:25:54.045 [info] [#PID<0.424.0>] Message acknowledge: "B" / "B2"
My understanding is that AWS SQS FIFO queues, using the message_group_id, should guarantee message order within the same message group identifier and that once a message has been received, during its visibility timeout, no other consumer can receive the same message.
I'll leave here the code for my SQS producer:
defmodule ElixirBroadwayPlayground.SQSProducer do
use Broadway
require Logger
alias Broadway.Message
def start_link(config) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module: {
BroadwaySQS.Producer,
queue_url: Keyword.get(config, :queue_url),
receive_interval: 1000,
on_success: :ack,
on_failure: :noop,
visibility_timeout: 10,
attribute_names: [:message_group_id, :approximate_first_receive_timestamp]
}
],
processors: [
default: [concurrency: Keyword.get(config, :num_workers, 1)]
],
partition_by: &partition_by/1
)
end
@impl true
def handle_message(
_,
%Message{
data: data,
metadata: %{attributes: %{"message_group_id" => message_group_id}}
} = message,
_
) do
log_event("Handling message", data, message_group_id)
HTTPoison.get("https://swapi.dev/api/people/1", [{"Content-Type", "application/json"}],
ssl: [verify: :verify_none]
)
|> handle_response(message)
end
defp handle_response(
{:ok, _},
%Message{
data: data,
metadata: %{attributes: %{"message_group_id" => message_group_id}}
} = message
) do
log_event("Message acknowledge", data, message_group_id)
Message.ack_immediately(message)
end
defp handle_response(
{:error, %HTTPoison.Error{reason: reason}},
%Message{
data: data,
metadata: %{attributes: %{"message_group_id" => message_group_id}}
} = message
) do
log_event("Message processing failed ", data, message_group_id)
log_event(reason, data, message_group_id)
Message.failed(message, reason)
end
defp log_event(text, data, msg_group_id) do
message_id = Jason.decode!(data)["id"]
Logger.info("[#{inspect(self())}] #{text}: #{inspect(msg_group_id)} / #{inspect(message_id)}")
end
defp partition_by(%Message{
metadata: %{attributes: %{"message_group_id" => message_group_id}}
}) do
:erlang.phash2(message_group_id)
end
end
I'm I misinterpreting the behaviour that should be expected? Anyone has experienced the same behaviour?