akka-http icon indicating copy to clipboard operation
akka-http copied to clipboard

Investigate potential connection leak in HTTP/1.1 when HTTP/2 is enabled

Open jrudolph opened this issue 4 years ago • 3 comments

Found by executing ClientServerSpec when "complete a request/response when request has Connection: close set" fails with leaking stages:

Details
activeShells (actor: akka://ClientServerSpecBase/system/StreamSupervisor-0/flow-11-0-unnamed)
  GraphInterpreterShell(
  logics: [
    ServerImpl.netIn attrs: [Name(SubSink%28ServerImpl.netIn%29), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
    ServerImpl.netOut attrs: [Name(SubSource%28ServerImpl.netOut%29), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
    MapAsyncUnordered(1,akka.http.impl.engine.http2.Http2Ext$$Lambda$51006/764242924@61556c92) attrs: [Name(mapAsyncUnordered), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(<function1>), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
    One2OneBidi attrs: [Name(One2OneBidi), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(<function1>), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
    akka.http.impl.engine.server.HttpServerBluePrint$RequestTimeoutSupport@656ae460 attrs: [Name(RequestTimeoutSupport), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(<function1>), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
    Identity attrs: [Name(identityOp), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(<function1>), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
    akka.http.impl.engine.server.HttpServerBluePrint$PrepareRequests@1e312320 attrs: [CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(<function1>), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
    akka.http.impl.engine.server.HttpServerBluePrint$ControllerStage@4bbad175 attrs: [Name(ControllerStage), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(<function1>), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
    akka.http.impl.engine.rendering.HttpResponseRendererFactory$HttpResponseRenderer$@44493062 attrs: [Name(renderer), Name(renderer), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(<function1>), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)]
    Map(akka.http.impl.engine.server.HttpServerBluePrint$$$Lambda$51025/1768981470@24faa018) attrs: [Name(map), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(<function1>), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
    HttpRequestParser attrs: [Name(HttpRequestParser), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(<function1>), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
    akka.http.impl.engine.server.HttpServerBluePrint$ProtocolSwitchStage@5f757a79 attrs: [Name(ProtocolSwitchStage), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(<function1>), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
    Map(SendBytes) attrs: [Name(map), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(<function1>), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
    Collect attrs: [Name(collect), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(<function1>), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
    Log attrs: [LogLevels(LogLevel(4),LogLevel(4),LogLevel(4)), LogLevels(LogLevel(4),LogLevel(4),LogLevel(4)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(<function1>), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
    Log attrs: [LogLevels(LogLevel(4),LogLevel(4),LogLevel(4)), LogLevels(LogLevel(4),LogLevel(4),LogLevel(4)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(<function1>), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
    Identity attrs: [Name(identityOp), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(<function1>), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
    akka.http.impl.util.StreamUtils$DelayCancellationStage@5ddfe509 attrs: [CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(<function1>), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)]
  ],
  connections: [
    Connection(0, ServerImpl.netIn, Identity, Closed)
    Connection(1, MapAsyncUnordered(1,akka.http.impl.engine.http2.Http2Ext$$Lambda$51006/764242924@61556c92), One2OneBidi, Closed)
    Connection(2, One2OneBidi, akka.http.impl.engine.server.HttpServerBluePrint$RequestTimeoutSupport@656ae460, Closed)
    Connection(3, One2OneBidi, MapAsyncUnordered(1,akka.http.impl.engine.http2.Http2Ext$$Lambda$51006/764242924@61556c92), Closed)
    Connection(4, akka.http.impl.engine.server.HttpServerBluePrint$RequestTimeoutSupport@656ae460, akka.http.impl.engine.server.HttpServerBluePrint$PrepareRequests@1e312320, Closed)
    Connection(5, akka.http.impl.engine.server.HttpServerBluePrint$RequestTimeoutSupport@656ae460, One2OneBidi, Closed)
    Connection(6, Identity, akka.http.impl.engine.server.HttpServerBluePrint$RequestTimeoutSupport@656ae460, Closed)
    Connection(7, akka.http.impl.engine.server.HttpServerBluePrint$PrepareRequests@1e312320, akka.http.impl.engine.server.HttpServerBluePrint$ControllerStage@4bbad175, Closed)
    Connection(8, akka.http.impl.engine.server.HttpServerBluePrint$ControllerStage@4bbad175, Map(akka.http.impl.engine.server.HttpServerBluePrint$$$Lambda$51025/1768981470@24faa018), Closed)
    Connection(9, akka.http.impl.engine.server.HttpServerBluePrint$ControllerStage@4bbad175, Identity, Closed)
    Connection(10, akka.http.impl.engine.rendering.HttpResponseRendererFactory$HttpResponseRenderer$@44493062, akka.http.impl.engine.server.HttpServerBluePrint$ControllerStage@4bbad175, Closed)
    Connection(11, Map(akka.http.impl.engine.server.HttpServerBluePrint$$$Lambda$51025/1768981470@24faa018), HttpRequestParser, Closed)
    Connection(12, HttpRequestParser, akka.http.impl.engine.server.HttpServerBluePrint$ProtocolSwitchStage@5f757a79, Closed)
    Connection(13, akka.http.impl.engine.server.HttpServerBluePrint$ProtocolSwitchStage@5f757a79, akka.http.impl.engine.rendering.HttpResponseRendererFactory$HttpResponseRenderer$@44493062, Closed)
    Connection(14, akka.http.impl.engine.server.HttpServerBluePrint$ProtocolSwitchStage@5f757a79, Collect, Closed)
    Connection(15, Map(SendBytes), akka.http.impl.engine.server.HttpServerBluePrint$ProtocolSwitchStage@5f757a79, Closed)
    Connection(16, Collect, Log, Closed)
    Connection(17, Log, Map(SendBytes), Closed)
    Connection(18, Log, akka.http.impl.util.StreamUtils$DelayCancellationStage@5ddfe509, Closed)
    Connection(19, Identity, Log, Closed)
    Connection(20, akka.http.impl.util.StreamUtils$DelayCancellationStage@5ddfe509, ServerImpl.netOut, ShouldPush
  ]
)

dot format graph for deadlock analysis:
================================================================
digraph waits {
  N0 [label="ServerImpl.netIn"];
  N1 [label="ServerImpl.netOut"];
  N2 [label="MapAsyncUnordered(1,akka.http.impl.engine.http2.Http2Ext$$Lambda$51006/764242924@61556c92)"];
  N3 [label="One2OneBidi"];
  N4 [label="akka.http.impl.engine.server.HttpServerBluePrint$RequestTimeoutSupport@656ae460"];
  N5 [label="Identity"];
  N6 [label="akka.http.impl.engine.server.HttpServerBluePrint$PrepareRequests@1e312320"];
  N7 [label="akka.http.impl.engine.server.HttpServerBluePrint$ControllerStage@4bbad175"];
  N8 [label="akka.http.impl.engine.rendering.HttpResponseRendererFactory$HttpResponseRenderer$@44493062"];
  N9 [label="Map(akka.http.impl.engine.server.HttpServerBluePrint$$$Lambda$51025/1768981470@24faa018)"];
  N10 [label="HttpRequestParser"];
  N11 [label="akka.http.impl.engine.server.HttpServerBluePrint$ProtocolSwitchStage@5f757a79"]
  N12 [label="Map(SendBytes)"];
  N13 [label="Collect"];
  N14 [label="Log"];
  N15 [label="Log"];
  N16 [label="Identity"];
  N17 [label="akka.http.impl.util.StreamUtils$DelayCancellationStage@5ddfe509"];
  N0 -> N16 [style=dotted, label=closed, dir=both];
  N2 -> N3 [style=dotted, label=closed, dir=both];
  N3 -> N4 [style=dotted, label=closed, dir=both];
  N3 -> N2 [style=dotted, label=closed, dir=both];
  N4 -> N6 [style=dotted, label=closed, dir=both];
  N4 -> N3 [style=dotted, label=closed, dir=both];
  N5 -> N4 [style=dotted, label=closed, dir=both];
  N6 -> N7 [style=dotted, label=closed, dir=both];
  N7 -> N9 [style=dotted, label=closed, dir=both];
  N7 -> N5 [style=dotted, label=closed, dir=both];
  N8 -> N7 [style=dotted, label=closed, dir=both];
  N9 -> N10 [style=dotted, label=closed, dir=both];
  N10 -> N11 [style=dotted, label=closed, dir=both];
  N11 -> N8 [style=dotted, label=closed, dir=both];
  N11 -> N13 [style=dotted, label=closed, dir=both];
  N12 -> N11 [style=dotted, label=closed, dir=both];
  N13 -> N15 [style=dotted, label=closed, dir=both];
  N14 -> N12 [style=dotted, label=closed, dir=both];
  N15 -> N17 [style=dotted, label=closed, dir=both];
  N16 -> N14 [style=dotted, label=closed, dir=both];
  N17 -> N1 [label=shouldPush, color=red];
}
================================================================

jrudolph avatar Dec 06 '21 11:12 jrudolph

This is likely a leak in ProtocolSwitch where not all substreams are closed before the stage is terminated. This seems to be fixed with Akka 2.6.x which automatically closes all substreams that would otherwise be left hanging. Would still be good to fix it properly.

jrudolph avatar Dec 06 '21 11:12 jrudolph

Let's not fix this right now as Akka 2.5 itself is not really supported any more. We can keep the ticket open until we reach 10.3 where we will drop Akka 2.5 support completely in which case the issue has solved itself.

In the meantime, if someone actually runs into the problem with 2.5, we can still try to see what needs to be done in ProtocolSwitch to resolve the problem.

jrudolph avatar Feb 14 '22 09:02 jrudolph

Investigation branch: https://github.com/akka/akka-http/compare/main...jrudolph:3963-try-to-harden-protocolswitch

jrudolph avatar Feb 14 '22 10:02 jrudolph