Investigate potential connection leak in HTTP/1.1 when HTTP/2 is enabled
Found by executing ClientServerSpec when "complete a request/response when request has Connection: close set" fails with leaking stages:
Details
activeShells (actor: akka://ClientServerSpecBase/system/StreamSupervisor-0/flow-11-0-unnamed)
GraphInterpreterShell(
logics: [
ServerImpl.netIn attrs: [Name(SubSink%28ServerImpl.netIn%29), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
ServerImpl.netOut attrs: [Name(SubSource%28ServerImpl.netOut%29), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
MapAsyncUnordered(1,akka.http.impl.engine.http2.Http2Ext$$Lambda$51006/764242924@61556c92) attrs: [Name(mapAsyncUnordered), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(<function1>), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
One2OneBidi attrs: [Name(One2OneBidi), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(<function1>), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
akka.http.impl.engine.server.HttpServerBluePrint$RequestTimeoutSupport@656ae460 attrs: [Name(RequestTimeoutSupport), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(<function1>), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
Identity attrs: [Name(identityOp), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(<function1>), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
akka.http.impl.engine.server.HttpServerBluePrint$PrepareRequests@1e312320 attrs: [CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(<function1>), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
akka.http.impl.engine.server.HttpServerBluePrint$ControllerStage@4bbad175 attrs: [Name(ControllerStage), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(<function1>), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
akka.http.impl.engine.rendering.HttpResponseRendererFactory$HttpResponseRenderer$@44493062 attrs: [Name(renderer), Name(renderer), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(<function1>), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)]
Map(akka.http.impl.engine.server.HttpServerBluePrint$$$Lambda$51025/1768981470@24faa018) attrs: [Name(map), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(<function1>), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
HttpRequestParser attrs: [Name(HttpRequestParser), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(<function1>), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
akka.http.impl.engine.server.HttpServerBluePrint$ProtocolSwitchStage@5f757a79 attrs: [Name(ProtocolSwitchStage), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(<function1>), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
Map(SendBytes) attrs: [Name(map), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(<function1>), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
Collect attrs: [Name(collect), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(<function1>), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
Log attrs: [LogLevels(LogLevel(4),LogLevel(4),LogLevel(4)), LogLevels(LogLevel(4),LogLevel(4),LogLevel(4)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(<function1>), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
Log attrs: [LogLevels(LogLevel(4),LogLevel(4),LogLevel(4)), LogLevels(LogLevel(4),LogLevel(4),LogLevel(4)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(<function1>), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
Identity attrs: [Name(identityOp), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(<function1>), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
akka.http.impl.util.StreamUtils$DelayCancellationStage@5ddfe509 attrs: [CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), CancellationStrategy(AfterDelay(100 milliseconds,FailStage)), InputBuffer(4,16), SupervisionStrategy(<function1>), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)]
],
connections: [
Connection(0, ServerImpl.netIn, Identity, Closed)
Connection(1, MapAsyncUnordered(1,akka.http.impl.engine.http2.Http2Ext$$Lambda$51006/764242924@61556c92), One2OneBidi, Closed)
Connection(2, One2OneBidi, akka.http.impl.engine.server.HttpServerBluePrint$RequestTimeoutSupport@656ae460, Closed)
Connection(3, One2OneBidi, MapAsyncUnordered(1,akka.http.impl.engine.http2.Http2Ext$$Lambda$51006/764242924@61556c92), Closed)
Connection(4, akka.http.impl.engine.server.HttpServerBluePrint$RequestTimeoutSupport@656ae460, akka.http.impl.engine.server.HttpServerBluePrint$PrepareRequests@1e312320, Closed)
Connection(5, akka.http.impl.engine.server.HttpServerBluePrint$RequestTimeoutSupport@656ae460, One2OneBidi, Closed)
Connection(6, Identity, akka.http.impl.engine.server.HttpServerBluePrint$RequestTimeoutSupport@656ae460, Closed)
Connection(7, akka.http.impl.engine.server.HttpServerBluePrint$PrepareRequests@1e312320, akka.http.impl.engine.server.HttpServerBluePrint$ControllerStage@4bbad175, Closed)
Connection(8, akka.http.impl.engine.server.HttpServerBluePrint$ControllerStage@4bbad175, Map(akka.http.impl.engine.server.HttpServerBluePrint$$$Lambda$51025/1768981470@24faa018), Closed)
Connection(9, akka.http.impl.engine.server.HttpServerBluePrint$ControllerStage@4bbad175, Identity, Closed)
Connection(10, akka.http.impl.engine.rendering.HttpResponseRendererFactory$HttpResponseRenderer$@44493062, akka.http.impl.engine.server.HttpServerBluePrint$ControllerStage@4bbad175, Closed)
Connection(11, Map(akka.http.impl.engine.server.HttpServerBluePrint$$$Lambda$51025/1768981470@24faa018), HttpRequestParser, Closed)
Connection(12, HttpRequestParser, akka.http.impl.engine.server.HttpServerBluePrint$ProtocolSwitchStage@5f757a79, Closed)
Connection(13, akka.http.impl.engine.server.HttpServerBluePrint$ProtocolSwitchStage@5f757a79, akka.http.impl.engine.rendering.HttpResponseRendererFactory$HttpResponseRenderer$@44493062, Closed)
Connection(14, akka.http.impl.engine.server.HttpServerBluePrint$ProtocolSwitchStage@5f757a79, Collect, Closed)
Connection(15, Map(SendBytes), akka.http.impl.engine.server.HttpServerBluePrint$ProtocolSwitchStage@5f757a79, Closed)
Connection(16, Collect, Log, Closed)
Connection(17, Log, Map(SendBytes), Closed)
Connection(18, Log, akka.http.impl.util.StreamUtils$DelayCancellationStage@5ddfe509, Closed)
Connection(19, Identity, Log, Closed)
Connection(20, akka.http.impl.util.StreamUtils$DelayCancellationStage@5ddfe509, ServerImpl.netOut, ShouldPush
]
)
dot format graph for deadlock analysis:
================================================================
digraph waits {
N0 [label="ServerImpl.netIn"];
N1 [label="ServerImpl.netOut"];
N2 [label="MapAsyncUnordered(1,akka.http.impl.engine.http2.Http2Ext$$Lambda$51006/764242924@61556c92)"];
N3 [label="One2OneBidi"];
N4 [label="akka.http.impl.engine.server.HttpServerBluePrint$RequestTimeoutSupport@656ae460"];
N5 [label="Identity"];
N6 [label="akka.http.impl.engine.server.HttpServerBluePrint$PrepareRequests@1e312320"];
N7 [label="akka.http.impl.engine.server.HttpServerBluePrint$ControllerStage@4bbad175"];
N8 [label="akka.http.impl.engine.rendering.HttpResponseRendererFactory$HttpResponseRenderer$@44493062"];
N9 [label="Map(akka.http.impl.engine.server.HttpServerBluePrint$$$Lambda$51025/1768981470@24faa018)"];
N10 [label="HttpRequestParser"];
N11 [label="akka.http.impl.engine.server.HttpServerBluePrint$ProtocolSwitchStage@5f757a79"]
N12 [label="Map(SendBytes)"];
N13 [label="Collect"];
N14 [label="Log"];
N15 [label="Log"];
N16 [label="Identity"];
N17 [label="akka.http.impl.util.StreamUtils$DelayCancellationStage@5ddfe509"];
N0 -> N16 [style=dotted, label=closed, dir=both];
N2 -> N3 [style=dotted, label=closed, dir=both];
N3 -> N4 [style=dotted, label=closed, dir=both];
N3 -> N2 [style=dotted, label=closed, dir=both];
N4 -> N6 [style=dotted, label=closed, dir=both];
N4 -> N3 [style=dotted, label=closed, dir=both];
N5 -> N4 [style=dotted, label=closed, dir=both];
N6 -> N7 [style=dotted, label=closed, dir=both];
N7 -> N9 [style=dotted, label=closed, dir=both];
N7 -> N5 [style=dotted, label=closed, dir=both];
N8 -> N7 [style=dotted, label=closed, dir=both];
N9 -> N10 [style=dotted, label=closed, dir=both];
N10 -> N11 [style=dotted, label=closed, dir=both];
N11 -> N8 [style=dotted, label=closed, dir=both];
N11 -> N13 [style=dotted, label=closed, dir=both];
N12 -> N11 [style=dotted, label=closed, dir=both];
N13 -> N15 [style=dotted, label=closed, dir=both];
N14 -> N12 [style=dotted, label=closed, dir=both];
N15 -> N17 [style=dotted, label=closed, dir=both];
N16 -> N14 [style=dotted, label=closed, dir=both];
N17 -> N1 [label=shouldPush, color=red];
}
================================================================
This is likely a leak in ProtocolSwitch where not all substreams are closed before the stage is terminated. This seems to be fixed with Akka 2.6.x which automatically closes all substreams that would otherwise be left hanging. Would still be good to fix it properly.
Let's not fix this right now as Akka 2.5 itself is not really supported any more. We can keep the ticket open until we reach 10.3 where we will drop Akka 2.5 support completely in which case the issue has solved itself.
In the meantime, if someone actually runs into the problem with 2.5, we can still try to see what needs to be done in ProtocolSwitch to resolve the problem.
Investigation branch: https://github.com/akka/akka-http/compare/main...jrudolph:3963-try-to-harden-protocolswitch