broadway_sqs icon indicating copy to clipboard operation
broadway_sqs copied to clipboard

AWS SQS FIFO messages being processed out of order or in duplicate

Open nuno-barreiro opened this issue 1 year ago • 7 comments

I've been using broadway_sqs to consume AWS SQS FIFO queues and I noticed some unexpected behaviours when processing the messages since sometimes those were processed out of order or more than one time.

Initially I didn't had the Broadway partition_by configured and once I did that, things seemed to improve but I can still see some double processing and out of order processing occurring. For example, looking at the below logs – organized by process identifier to help readability – we can see that:

  • PID 383 consumed the second message from group C, without waiting for the first message of the same group to be acknowledge and thus removed from queue. Visibility timeout is of 10 seconds and even that wasn't guaranteed.
  • PID 419 processed the same fifth message two times.
16:39:34.610 [info] [#PID<0.382.0>] Handling message: "B" / "B1"
16:39:37.270 [info] [#PID<0.382.0>] Message acknowledge: "B" / "B1"
16:39:37.300 [info] [#PID<0.382.0>] Handling message: "B" / "B2"
16:39:39.633 [info] [#PID<0.382.0>] Message acknowledge: "B" / "B2"

16:39:34.610 [info] [#PID<0.427.0>] Handling message: "C" / "C1"
16:39:39.830 [info] [#PID<0.427.0>] Message processing failed : "C" / "C1"
16:39:39.830 [info] [#PID<0.427.0>] timeout: "C" / "C1"
16:39:39.830 [info] [#PID<0.427.0>] Handling message: "C" / "C2"
16:39:42.194 [info] [#PID<0.427.0>] Message acknowledge: "C" / "C2"
16:39:45.814 [info] [#PID<0.427.0>] Handling message: "C" / "C1"
16:39:47.528 [info] [#PID<0.427.0>] Message acknowledge: "C" / "C1"

16:39:34.610 [info] [#PID<0.419.0>] Handling message: "A" / "A1"
16:39:37.504 [info] [#PID<0.419.0>] Message acknowledge: "A" / "A1"
16:39:37.521 [info] [#PID<0.419.0>] Handling message: "A" / "A2"
16:39:40.242 [info] [#PID<0.419.0>] Message acknowledge: "A" / "A2"
16:39:40.263 [info] [#PID<0.419.0>] Handling message: "A" / "A3"
16:39:42.404 [info] [#PID<0.419.0>] Message acknowledge: "A" / "A3"
16:39:42.422 [info] [#PID<0.419.0>] Handling message: "A" / "A4"
16:39:44.933 [info] [#PID<0.419.0>] Message acknowledge: "A" / "A4"
16:39:44.954 [info] [#PID<0.419.0>] Handling message: "A" / "A5"
16:39:47.037 [info] [#PID<0.419.0>] Message acknowledge: "A" / "A5"
16:39:47.056 [info] [#PID<0.419.0>] Handling message: "A" / "A5"
16:39:50.001 [info] [#PID<0.419.0>] Message acknowledge: "A" / "A5"

Before setting up the partition_by the behaviour was even more awkward with different consumers handling messages from the same message_group_id:

16:25:33.076 [info] [#PID<0.423.0>] Handling message: "A" / "A1"
16:25:36.304 [info] [#PID<0.423.0>] Message acknowledge: "A" / "A1"
16:25:36.326 [info] [#PID<0.423.0>] Handling message: "A" / "A2"
16:25:38.859 [info] [#PID<0.423.0>] Message acknowledge: "A" / "A2"
16:25:38.869 [info] [#PID<0.423.0>] Handling message: "A" / "A3"
16:25:41.709 [info] [#PID<0.423.0>] Message acknowledge: "A" / "A3"
16:25:41.728 [info] [#PID<0.423.0>] Handling message: "A" / "A4"
16:25:44.206 [info] [#PID<0.423.0>] Message acknowledge: "A" / "A4"
16:25:44.230 [info] [#PID<0.423.0>] Handling message: "A" / "A5"
16:25:46.592 [info] [#PID<0.423.0>] Message acknowledge: "A" / "A5"
16:25:46.613 [info] [#PID<0.423.0>] Handling message: "C" / "C1"
16:25:48.219 [info] [#PID<0.423.0>] Message acknowledge: "C" / "C1"
16:25:48.233 [info] [#PID<0.423.0>] Handling message: "C" / "C2"
16:25:50.134 [info] [#PID<0.423.0>] Message acknowledge: "C" / "C2"
16:25:50.152 [info] [#PID<0.423.0>] Handling message: "B" / "B1"
16:25:52.073 [info] [#PID<0.423.0>] Message acknowledge: "B" / "B1"
16:25:52.087 [info] [#PID<0.423.0>] Handling message: "B" / "B2"
16:25:53.323 [info] [#PID<0.423.0>] Message acknowledge: "B" / "B2"

16:25:44.286 [info] [#PID<0.424.0>] Handling message: "C" / "C1"
16:25:46.866 [info] [#PID<0.424.0>] Message acknowledge: "C" / "C1"
16:25:46.877 [info] [#PID<0.424.0>] Handling message: "C" / "C2"
16:25:48.653 [info] [#PID<0.424.0>] Message acknowledge: "C" / "C2"
16:25:48.661 [info] [#PID<0.424.0>] Handling message: "A" / "A5"
16:25:50.938 [info] [#PID<0.424.0>] Message acknowledge: "A" / "A5"
16:25:50.958 [info] [#PID<0.424.0>] Handling message: "B" / "B1"
16:25:52.517 [info] [#PID<0.424.0>] Message acknowledge: "B" / "B1"
16:25:52.536 [info] [#PID<0.424.0>] Handling message: "B" / "B2"
16:25:54.045 [info] [#PID<0.424.0>] Message acknowledge: "B" / "B2"

My understanding is that AWS SQS FIFO queues, using the message_group_id, should guarantee message order within the same message group identifier and that once a message has been received, during its visibility timeout, no other consumer can receive the same message.

I'll leave here the code for my SQS producer:

defmodule ElixirBroadwayPlayground.SQSProducer do
  use Broadway

  require Logger

  alias Broadway.Message

  def start_link(config) do
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module: {
          BroadwaySQS.Producer,
          queue_url: Keyword.get(config, :queue_url),
          receive_interval: 1000,
          on_success: :ack,
          on_failure: :noop,
          visibility_timeout: 10,
          attribute_names: [:message_group_id, :approximate_first_receive_timestamp]
        }
      ],
      processors: [
        default: [concurrency: Keyword.get(config, :num_workers, 1)]
      ],
      partition_by: &partition_by/1
    )
  end

  @impl true
  def handle_message(
        _,
        %Message{
          data: data,
          metadata: %{attributes: %{"message_group_id" => message_group_id}}
        } = message,
        _
      ) do
    log_event("Handling message", data, message_group_id)

    HTTPoison.get("https://swapi.dev/api/people/1", [{"Content-Type", "application/json"}],
      ssl: [verify: :verify_none]
    )
    |> handle_response(message)
  end

  defp handle_response(
         {:ok, _},
         %Message{
           data: data,
           metadata: %{attributes: %{"message_group_id" => message_group_id}}
         } = message
       ) do
    log_event("Message acknowledge", data, message_group_id)
    Message.ack_immediately(message)
  end

  defp handle_response(
         {:error, %HTTPoison.Error{reason: reason}},
         %Message{
           data: data,
           metadata: %{attributes: %{"message_group_id" => message_group_id}}
         } = message
       ) do
    log_event("Message processing failed ", data, message_group_id)
    log_event(reason, data, message_group_id)
    Message.failed(message, reason)
  end

  defp log_event(text, data, msg_group_id) do
    message_id = Jason.decode!(data)["id"]
    Logger.info("[#{inspect(self())}] #{text}: #{inspect(msg_group_id)} / #{inspect(message_id)}")
  end

  defp partition_by(%Message{
         metadata: %{attributes: %{"message_group_id" => message_group_id}}
       }) do
    :erlang.phash2(message_group_id)
  end
end

I'm I misinterpreting the behaviour that should be expected? Anyone has experienced the same behaviour?

nuno-barreiro avatar Sep 21 '23 16:09 nuno-barreiro