alpakka
alpakka copied to clipboard
SQS: Long polling on empty queue makes other pollers wait
Version
AkkaHttpVersion = "10.2.9" AlpakkaVersion = "3.0.4"
Scenario
Create two SQS pollers, one per queue.
Config:
SqsSourceSettings settings = SqsSourceSettings.create()
.withWaitTime(Duration.ofSeconds(10))
.withMaxBatchSize(10)
Queue A has 10,000 messages Queue B has 0 messages
Message processing blocks for about 100ms.
CompletableFuture.supplyAsync(() -> handleSqsMessage(pollRequest, msg), workerDispatcher
// uses a "blocking dispatcher"
);
Expected behaviour:
Both queues are polled concurrently and messages flow continuously when available
Actual behaviour
- Some messages are polled from queue A
- Polling stalls on queue A while waiting 10 seconds for queue B to complete polling
- Repeat
Workaround
- Set wait time to zero
- Close on empty receive (to avoid hammering SQS API)
- When stream closes on empty receive for queue B, delay next poll by 10 seconds
SqsSourceSettings settings = SqsSourceSettings.create()
.withWaitTime(Duration.ofSeconds(0))
.withCloseOnEmptyReceive(true);
streamCompletion.thenAccept(done -> {
actorSystem.scheduler().scheduleOnce(Duration.ofSeconds(SQS_POLL_INTERVAL_EMPTY_RECEIVE), sender, pollingRequest, getContext().dispatcher(), getSelf());
}
The issue is with akka.stream.alpakka.sqs.impl.BalancingMapAsync Setting SqsSourceSettings.withParallelRequests to a value other than 1 uses that class, which slows things down
This 'fixes' long polling, but causes other issues - message delays.
Even with parallelRequests=1, Alpakka seems to open two connections to AWS.
- Calls ReceiveMessage, processes messages
- Calls ReceiveMessage, ignores messages
Because request #2 ignores messages, they become invisible, and only available after visibilityTimeout expires. At that point, the message is either received (by 1st connection), or ignored (by 2nd connection). If ignored, it triggers invisibility again.
Can you please advise some workarounds?