Connection not cleaned up after TCP timeout or connection refused.
Akka HTTP version
10.2.9, tested also 10.1.11
The case that I'm testing
- 3 parallel flows sending requests to Elasticsearch via Alpakka Elasticsearch
- sometimes the request fails with TCP timeout or connection refused exception
- the source of the documents and ES flow are wrapped in a
RestartSourceso that the flow can be restarted on failure caused by the mentioned exceptions
Expected behaviour
After the restart, the connection pool is ready to use, with all slots free.
Actual behaviour
After several restarts the connection pool becomes full, causing BufferOverflowExceptions and further restarts.
[WARN] [05/30/2022 16:23:33.640] [default-akka.actor.default-dispatcher-15] [RestartWithBackoffSource(akka://default)] Restarting graph due to failure. stack_trace:
java.lang.RuntimeException: Request failed for POST http://localhost:9200/_bulk
at akka.stream.alpakka.elasticsearch.impl.ElasticsearchSimpleFlowStage$StageLogic$$anonfun$onPush$4.applyOrElse(ElasticsearchSimpleFlowStage.scala:102)
at akka.stream.alpakka.elasticsearch.impl.ElasticsearchSimpleFlowStage$StageLogic$$anonfun$onPush$4.applyOrElse(ElasticsearchSimpleFlowStage.scala:100)
at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:490)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:94)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
Caused by: akka.stream.BufferOverflowException: Exceeded configured max-open-requests value of [16]. This means that the request queue of this pool (HostConnectionPoolSetup(localhost,9200,ConnectionPoolSetup(ConnectionPoolSettings(32,0,5,16,1,Duration.Inf,100 milliseconds,2 minutes,30 seconds,Duration.Inf,ClientConnectionSettings(Some(User-Agent: akka-http/10.2.9),10 seconds,5 seconds,512,None,WebSocketSettings(<function0>,ping,Duration.Inf,akka.http.impl.settings.WebSocketSettingsImpl$$$Lambda$254/0x0000000800332c40@708bef53,false),List(),ParserSettings(2048,16,64,64,8192,64,Some(9223372036854775807),8388608,256,1048576,5,Strict,RFC6265,true,Set(),Full,Error,Error,Error,HashMap(If-Range -> 0, If-Modified-Since -> 0, If-Unmodified-Since -> 0, default -> 12, If-None-Match -> 0, User-Agent -> 32, Content-MD5 -> 0, Date -> 0, If-Match -> 0),false,false,true,akka.util.ConstantFun$$$Lambda$202/0x00000008002c6040@253400a5,akka.util.ConstantFun$$$Lambda$202/0x00000008002c6040@253400a5,akka.util.ConstantFun$$$Lambda$203/0x00000008002c6840@82b8d2a),100 milliseconds,None,Http2ClientSettingsImpl(256,65536,10000000,512000,1024,false,0 seconds,0 seconds,0,3 seconds,100 milliseconds,2 minutes,None),TCPTransport),1 second,List()),akka.http.scaladsl.HttpConnectionContext$@27f50a0b,akka.event.MarkerLoggingAdapter@3b86e416))) has completely filled up because the pool currently does not process requests fast enough to handle the incoming request load. Please retry the request later. See https://doc.akka.io/docs/akka-http/current/scala/http/client-side/pool-overflow.html for more information.
at akka.http.impl.engine.client.PoolInterface$Logic.<init>(PoolInterface.scala:106)
at akka.http.impl.engine.client.PoolInterface$PoolInterfaceStage.createLogicAndMaterializedValue(PoolInterface.scala:94)
at akka.stream.impl.GraphStageIsland.materializeAtomic(PhasedFusingActorMaterializer.scala:699)
at akka.stream.impl.PhasedFusingActorMaterializer.materialize(PhasedFusingActorMaterializer.scala:498)
at akka.stream.impl.PhasedFusingActorMaterializer.materialize(PhasedFusingActorMaterializer.scala:448)
at akka.stream.impl.PhasedFusingActorMaterializer.materialize(PhasedFusingActorMaterializer.scala:440)
at akka.stream.scaladsl.RunnableGraph.run(Flow.scala:744)
at akka.http.impl.engine.client.PoolInterface$.apply(PoolInterface.scala:80)
at akka.http.impl.engine.client.PoolMasterActor.akka$http$impl$engine$client$PoolMasterActor$$startPoolInterface(PoolMasterActor.scala:131)
at akka.http.impl.engine.client.PoolMasterActor$$anonfun$receive$1.applyOrElse(PoolMasterActor.scala:162)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.http.impl.engine.client.PoolMasterActor.aroundReceive(PoolMasterActor.scala:111)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579)
at akka.actor.ActorCell.invoke(ActorCell.scala:547)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
... 5 more
Code used to reproduce the issue
The easiest way to reproduce the issue is to just run the program without any server on port 9200 so that it ends with connection refused. It also has the same behaviour when the exception is caused by a TCP timeout. Please note that the restarts are made in 20-30 seconds intervals, which seems to be reasonable period for the pool to clean up the slots.
ConnectionPoolTest.scala:
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.RestartSettings
import akka.stream.alpakka.elasticsearch._
import akka.stream.alpakka.elasticsearch.scaladsl.ElasticsearchFlow
import akka.stream.scaladsl.{RestartSource, Sink, Source}
import spray.json.DefaultJsonProtocol._
import spray.json._
import java.util.UUID
import scala.concurrent.duration.DurationInt
import scala.language.postfixOps
object ConnectionPoolTest extends App {
case class Book(title: String)
implicit val format: JsonFormat[Book] = jsonFormat1(Book)
implicit val actorSystem: ActorSystem = ActorSystem()
val index = "test-index"
val connectionSettings =
ElasticsearchConnectionSettings("http://localhost:9200")
.withCredentials("", "")
val batchCount = 100
val batchSize = 5
val parallelFlows = 3
val restartSettings = RestartSettings(
minBackoff = 20 seconds,
maxBackoff = 30 seconds,
randomFactor = 0.2
)
val source = Source(createBooks(batchSize, batchCount))
val flow = ElasticsearchFlow.createBulk[Book, NotUsed](
ElasticsearchParams.V7(indexName = index),
ElasticsearchWriteSettings(connectionSettings)
)
val graph = RestartSource
.onFailuresWithBackoff(restartSettings)(() => source.via(flow))
.to(Sink.ignore)
Iterator.continually(graph).take(parallelFlows).foreach { graph =>
Thread.sleep(1000) // Delay the streams start a bit
graph.run()
}
private def createBooks(batchSize: Int, batchCount: Int) =
Iterator
.continually {
Seq
.fill(batchSize)(
Book(s"Book-${UUID.randomUUID().toString}")
)
.map { book =>
WriteMessage
.createUpsertMessage(UUID.randomUUID().toString, book)
.withIndexName(index)
}
}
.take(batchCount)
.toSeq
}
application.conf:
akka {
loglevel = "DEBUG"
http {
host-connection-pool {
max-connections = 32
max-open-requests = 16
}
client {
idle-timeout = 5 s
}
}
}
build.sbt:
ThisBuild / version := "0.1.0-SNAPSHOT"
ThisBuild / scalaVersion := "2.13.7"
lazy val root = (project in file("."))
.settings(
name := "connection-pool-test"
)
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-http" % "10.2.9",
"com.typesafe.akka" %% "akka-http-spray-json" % "10.2.9",
"com.typesafe.akka" %% "akka-stream" % "2.6.14",
"com.lightbend.akka" %% "akka-stream-alpakka-elasticsearch" % "3.0.4"
)
Demonstrating the issue can be done even more simple. Looking at this code:
Http().singleRequest(HttpRequest(uri = "http://localhost:9093")).recover {
case ex => println(s"error: ${ex.getMessage}")
}
Which fails the future correctly with:
error: Tcp command [Connect(localhost:9093,None,List(),Some(10 seconds),true)] failed because of java.net.ConnectException: Connection refused
However, doing the following:
(0 to 30).map { i =>
Http().singleRequest(HttpRequest(uri = "http://localhost:9093")).recover {
case ex => println(s"error: ${ex.getMessage}")
}
}
Here we do not get the exception but rather a bunch of:
error: Exceeded configured max-open-requests value of [16]. This means that the request queue of this pool ...
error: Exceeded configured max-open-requests value of [16]. This means that the request queue of this pool ...
error: Exceeded configured max-open-requests value of [16]. This means that the request queue of this pool ...
...
[akka://default/system/IO-TCP/selectors/$a/8] - Message [akka.actor.ReceiveTimeout$] to Actor[akka://default/system/IO-TCP/selectors/$a/8#-748172926] was not delivered. [1] dead letters encountered. If this is not an expected behavior then Actor[akka://default/system/IO-TCP/selectors/$a/8#-748172926] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
I wouldn't say that's exactly the same. Please note that in code that I've provided we have restart every 20 to 30 seconds. Let's say that the restart is done every 20 seconds and we have 3 parallel flows. This means that every 20 seconds 3 requests fail. Isn't that 20 seconds enough time to clean up the occupied slot in the connection pool? It seems that the problem accumulates over time and eventually it ends up with the BufferOverflowException.
Thanks for the report and reproducer, @AvaPL.
It doesn't seem that there's an actual leak but it seems something is wrong with the buffer size calculation in PoolInterface.
It assumes that the pool will always pull at least max-connections connections which may not be true during situations of backoff due to connection problems. In that case, we might be missing entries in the buffer that the user expects.
As a workaround, can you try setting max-open-requests at as much above max-connections as you expect concurrent requests being dispatched to the pool? I.e. at least twice as high as max-connections.
It seems to work fine. What mean by that is we don't encounter BufferOverflowException anymore. It even enables some kind of backpressure after ~8 failed requests:
[DEBUG] [06/07/2022 13:40:52.280] [default-akka.actor.default-dispatcher-15] [default/Pool(shared->http://localhost:9200)] Dispatching request [POST /_bulk Strict(815 bytes)] to pool
[DEBUG] [06/07/2022 13:40:52.281] [default-akka.actor.default-dispatcher-15] [Buffer(akka://default)] Backpressuring because buffer is full and overflowStrategy is: [Backpressure] in stream [class akka.stream.impl.fusing.Buffer$$anon$26]
Not sure where this backpressure appears as adding .addAttributes(Attributes.inputBuffer(128, 1024)) to Elasticsearch flow doesn't change its behaviour. We are fine with this backpressure though.
I've used the example that I've pasted above, with 3 parallel streams, max-connections=6 and max-open-requests=12. Seems that it fixes the problem. We will change the configuration on our environments and test it in real scenario. I'll let you know in several days if this workaround helped. Thank you for this suggestion!
Good to hear!
The backpressure log message is internal and probably spurious and won't have any real effect.
It isn't spurious. It's clearly visible that after the backpressure kicks in, the time between "Dispatching request [POST /_bulk Strict(815 bytes)] to pool" and the actual request is at least a few seconds. As it should not appear frequently, we are fine with it.
It isn't spurious. It's clearly visible that after the backpressure kicks in, the time between "Dispatching request [POST /_bulk Strict(815 bytes)] to pool" and the actual request is at least a few seconds. As it should not appear frequently, we are fine with it.
It might just mean that you are maxing out the buffer exactly and the next request over that might fail again with the BufferOverflowException. There isn't real backpressue in the Alpakka ES connector, it just sends out a single request per flow materialization.