akka-http icon indicating copy to clipboard operation
akka-http copied to clipboard

Connection not cleaned up after TCP timeout or connection refused.

Open AvaPL opened this issue 3 years ago • 7 comments

Akka HTTP version

10.2.9, tested also 10.1.11

The case that I'm testing

  • 3 parallel flows sending requests to Elasticsearch via Alpakka Elasticsearch
  • sometimes the request fails with TCP timeout or connection refused exception
  • the source of the documents and ES flow are wrapped in a RestartSource so that the flow can be restarted on failure caused by the mentioned exceptions

Expected behaviour

After the restart, the connection pool is ready to use, with all slots free.

Actual behaviour

After several restarts the connection pool becomes full, causing BufferOverflowExceptions and further restarts.

[WARN] [05/30/2022 16:23:33.640] [default-akka.actor.default-dispatcher-15] [RestartWithBackoffSource(akka://default)] Restarting graph due to failure. stack_trace: 
java.lang.RuntimeException: Request failed for POST http://localhost:9200/_bulk
	at akka.stream.alpakka.elasticsearch.impl.ElasticsearchSimpleFlowStage$StageLogic$$anonfun$onPush$4.applyOrElse(ElasticsearchSimpleFlowStage.scala:102)
	at akka.stream.alpakka.elasticsearch.impl.ElasticsearchSimpleFlowStage$StageLogic$$anonfun$onPush$4.applyOrElse(ElasticsearchSimpleFlowStage.scala:100)
	at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:490)
	at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
	at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
	at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:94)
	at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
	at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
	at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
Caused by: akka.stream.BufferOverflowException: Exceeded configured max-open-requests value of [16]. This means that the request queue of this pool (HostConnectionPoolSetup(localhost,9200,ConnectionPoolSetup(ConnectionPoolSettings(32,0,5,16,1,Duration.Inf,100 milliseconds,2 minutes,30 seconds,Duration.Inf,ClientConnectionSettings(Some(User-Agent: akka-http/10.2.9),10 seconds,5 seconds,512,None,WebSocketSettings(<function0>,ping,Duration.Inf,akka.http.impl.settings.WebSocketSettingsImpl$$$Lambda$254/0x0000000800332c40@708bef53,false),List(),ParserSettings(2048,16,64,64,8192,64,Some(9223372036854775807),8388608,256,1048576,5,Strict,RFC6265,true,Set(),Full,Error,Error,Error,HashMap(If-Range -> 0, If-Modified-Since -> 0, If-Unmodified-Since -> 0, default -> 12, If-None-Match -> 0, User-Agent -> 32, Content-MD5 -> 0, Date -> 0, If-Match -> 0),false,false,true,akka.util.ConstantFun$$$Lambda$202/0x00000008002c6040@253400a5,akka.util.ConstantFun$$$Lambda$202/0x00000008002c6040@253400a5,akka.util.ConstantFun$$$Lambda$203/0x00000008002c6840@82b8d2a),100 milliseconds,None,Http2ClientSettingsImpl(256,65536,10000000,512000,1024,false,0 seconds,0 seconds,0,3 seconds,100 milliseconds,2 minutes,None),TCPTransport),1 second,List()),akka.http.scaladsl.HttpConnectionContext$@27f50a0b,akka.event.MarkerLoggingAdapter@3b86e416))) has completely filled up because the pool currently does not process requests fast enough to handle the incoming request load. Please retry the request later. See https://doc.akka.io/docs/akka-http/current/scala/http/client-side/pool-overflow.html for more information.
	at akka.http.impl.engine.client.PoolInterface$Logic.<init>(PoolInterface.scala:106)
	at akka.http.impl.engine.client.PoolInterface$PoolInterfaceStage.createLogicAndMaterializedValue(PoolInterface.scala:94)
	at akka.stream.impl.GraphStageIsland.materializeAtomic(PhasedFusingActorMaterializer.scala:699)
	at akka.stream.impl.PhasedFusingActorMaterializer.materialize(PhasedFusingActorMaterializer.scala:498)
	at akka.stream.impl.PhasedFusingActorMaterializer.materialize(PhasedFusingActorMaterializer.scala:448)
	at akka.stream.impl.PhasedFusingActorMaterializer.materialize(PhasedFusingActorMaterializer.scala:440)
	at akka.stream.scaladsl.RunnableGraph.run(Flow.scala:744)
	at akka.http.impl.engine.client.PoolInterface$.apply(PoolInterface.scala:80)
	at akka.http.impl.engine.client.PoolMasterActor.akka$http$impl$engine$client$PoolMasterActor$$startPoolInterface(PoolMasterActor.scala:131)
	at akka.http.impl.engine.client.PoolMasterActor$$anonfun$receive$1.applyOrElse(PoolMasterActor.scala:162)
	at akka.actor.Actor.aroundReceive(Actor.scala:537)
	at akka.actor.Actor.aroundReceive$(Actor.scala:535)
	at akka.http.impl.engine.client.PoolMasterActor.aroundReceive(PoolMasterActor.scala:111)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579)
	at akka.actor.ActorCell.invoke(ActorCell.scala:547)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
	at akka.dispatch.Mailbox.run(Mailbox.scala:231)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
	... 5 more

Code used to reproduce the issue

The easiest way to reproduce the issue is to just run the program without any server on port 9200 so that it ends with connection refused. It also has the same behaviour when the exception is caused by a TCP timeout. Please note that the restarts are made in 20-30 seconds intervals, which seems to be reasonable period for the pool to clean up the slots.

ConnectionPoolTest.scala:

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.RestartSettings
import akka.stream.alpakka.elasticsearch._
import akka.stream.alpakka.elasticsearch.scaladsl.ElasticsearchFlow
import akka.stream.scaladsl.{RestartSource, Sink, Source}
import spray.json.DefaultJsonProtocol._
import spray.json._

import java.util.UUID
import scala.concurrent.duration.DurationInt
import scala.language.postfixOps

object ConnectionPoolTest extends App {
  case class Book(title: String)
  implicit val format: JsonFormat[Book] = jsonFormat1(Book)

  implicit val actorSystem: ActorSystem = ActorSystem()

  val index = "test-index"
  val connectionSettings =
    ElasticsearchConnectionSettings("http://localhost:9200")
      .withCredentials("", "")

  val batchCount = 100
  val batchSize = 5
  val parallelFlows = 3

  val restartSettings = RestartSettings(
    minBackoff = 20 seconds,
    maxBackoff = 30 seconds,
    randomFactor = 0.2
  )

  val source = Source(createBooks(batchSize, batchCount))
  val flow = ElasticsearchFlow.createBulk[Book, NotUsed](
    ElasticsearchParams.V7(indexName = index),
    ElasticsearchWriteSettings(connectionSettings)
  )
  val graph = RestartSource
    .onFailuresWithBackoff(restartSettings)(() => source.via(flow))
    .to(Sink.ignore)

  Iterator.continually(graph).take(parallelFlows).foreach { graph =>
    Thread.sleep(1000) // Delay the streams start a bit
    graph.run()
  }

  private def createBooks(batchSize: Int, batchCount: Int) =
    Iterator
      .continually {
        Seq
          .fill(batchSize)(
            Book(s"Book-${UUID.randomUUID().toString}")
          )
          .map { book =>
            WriteMessage
              .createUpsertMessage(UUID.randomUUID().toString, book)
              .withIndexName(index)
          }
      }
      .take(batchCount)
      .toSeq
}

application.conf:

akka {
  loglevel = "DEBUG"

  http {
    host-connection-pool {
      max-connections = 32
      max-open-requests = 16
    }
    client {
      idle-timeout = 5 s
    }
  }
}

build.sbt:

ThisBuild / version := "0.1.0-SNAPSHOT"

ThisBuild / scalaVersion := "2.13.7"

lazy val root = (project in file("."))
  .settings(
    name := "connection-pool-test"
  )

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-http" % "10.2.9",
  "com.typesafe.akka" %% "akka-http-spray-json" % "10.2.9",
  "com.typesafe.akka" %% "akka-stream" % "2.6.14",
  "com.lightbend.akka" %% "akka-stream-alpakka-elasticsearch" % "3.0.4"
)

AvaPL avatar May 30 '22 14:05 AvaPL

Demonstrating the issue can be done even more simple. Looking at this code:

Http().singleRequest(HttpRequest(uri = "http://localhost:9093")).recover {
      case ex => println(s"error: ${ex.getMessage}")
    }

Which fails the future correctly with:

error: Tcp command [Connect(localhost:9093,None,List(),Some(10 seconds),true)] failed because of java.net.ConnectException: Connection refused

However, doing the following:

(0 to 30).map { i =>
    Http().singleRequest(HttpRequest(uri = "http://localhost:9093")).recover {
      case ex => println(s"error: ${ex.getMessage}")
    }
  }

Here we do not get the exception but rather a bunch of:

error: Exceeded configured max-open-requests value of [16]. This means that the request queue of this pool ...
error: Exceeded configured max-open-requests value of [16]. This means that the request queue of this pool ...
error: Exceeded configured max-open-requests value of [16]. This means that the request queue of this pool ...

...


[akka://default/system/IO-TCP/selectors/$a/8] - Message [akka.actor.ReceiveTimeout$] to Actor[akka://default/system/IO-TCP/selectors/$a/8#-748172926] was not delivered. [1] dead letters encountered. If this is not an expected behavior then Actor[akka://default/system/IO-TCP/selectors/$a/8#-748172926] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

sebastian-alfers avatar Jun 02 '22 08:06 sebastian-alfers

I wouldn't say that's exactly the same. Please note that in code that I've provided we have restart every 20 to 30 seconds. Let's say that the restart is done every 20 seconds and we have 3 parallel flows. This means that every 20 seconds 3 requests fail. Isn't that 20 seconds enough time to clean up the occupied slot in the connection pool? It seems that the problem accumulates over time and eventually it ends up with the BufferOverflowException.

AvaPL avatar Jun 02 '22 08:06 AvaPL

Thanks for the report and reproducer, @AvaPL.

It doesn't seem that there's an actual leak but it seems something is wrong with the buffer size calculation in PoolInterface.

It assumes that the pool will always pull at least max-connections connections which may not be true during situations of backoff due to connection problems. In that case, we might be missing entries in the buffer that the user expects.

As a workaround, can you try setting max-open-requests at as much above max-connections as you expect concurrent requests being dispatched to the pool? I.e. at least twice as high as max-connections.

jrudolph avatar Jun 07 '22 11:06 jrudolph

It seems to work fine. What mean by that is we don't encounter BufferOverflowException anymore. It even enables some kind of backpressure after ~8 failed requests:

[DEBUG] [06/07/2022 13:40:52.280] [default-akka.actor.default-dispatcher-15] [default/Pool(shared->http://localhost:9200)] Dispatching request [POST /_bulk Strict(815 bytes)] to pool
[DEBUG] [06/07/2022 13:40:52.281] [default-akka.actor.default-dispatcher-15] [Buffer(akka://default)] Backpressuring because buffer is full and overflowStrategy is: [Backpressure] in stream [class akka.stream.impl.fusing.Buffer$$anon$26]

Not sure where this backpressure appears as adding .addAttributes(Attributes.inputBuffer(128, 1024)) to Elasticsearch flow doesn't change its behaviour. We are fine with this backpressure though.

I've used the example that I've pasted above, with 3 parallel streams, max-connections=6 and max-open-requests=12. Seems that it fixes the problem. We will change the configuration on our environments and test it in real scenario. I'll let you know in several days if this workaround helped. Thank you for this suggestion!

AvaPL avatar Jun 07 '22 12:06 AvaPL

Good to hear!

The backpressure log message is internal and probably spurious and won't have any real effect.

jrudolph avatar Jun 07 '22 12:06 jrudolph

It isn't spurious. It's clearly visible that after the backpressure kicks in, the time between "Dispatching request [POST /_bulk Strict(815 bytes)] to pool" and the actual request is at least a few seconds. As it should not appear frequently, we are fine with it.

AvaPL avatar Jun 07 '22 12:06 AvaPL

It isn't spurious. It's clearly visible that after the backpressure kicks in, the time between "Dispatching request [POST /_bulk Strict(815 bytes)] to pool" and the actual request is at least a few seconds. As it should not appear frequently, we are fine with it.

It might just mean that you are maxing out the buffer exactly and the next request over that might fail again with the BufferOverflowException. There isn't real backpressue in the Alpakka ES connector, it just sends out a single request per flow materialization.

jrudolph avatar Jun 07 '22 12:06 jrudolph