xchange-stream
xchange-stream copied to clipboard
Exchange doesn't disconnect gracefully
When I repeat following steps as in example:
- Connect to GDAX exchange.
- Subscribe to trades.
- Check that there really come trades.
- Dispose all my subscriptions.
- Disconnect from exchange.
- Program main loop goes to its end.
But program doesn't stop! There is still threads running somewhere.
I'm using Spring Boot web app and I'm stopping web server via POST /shutdown request. All other services are then shutdown.
This is not big deal. Currently I call System.exit() 15 s after that request. But this is annoying and smells that there is thread leak somewhere.
@jpink Yeah, I can reproduce that and it's bug of exchanges based on NettyStreamingService
.
Did anyone ever do anything about this? It's the nioEventLoopGroup thread that's still running I think, but I don't know enough about these things. I got around it by assigning the result of new NioEventLoopGroup()
in connect() to a class variable and calling eventLoopGroup.shutdownGracefully()
on disconnect(), but I'm not sure if that's the most elegant solution
This one is pretty annoying, sure I can add a System.exit() call so that I'm not chasing down PIDs but it breaks WAR hot deploy as I need to stop tomcat and restart it to pick up code changes, I should be able to just trigger the deploy from rebuilding.
This issue also seems to effect Gemini.
Possibly related - I get an NPE whenever I disconnect from GDAX due to a background socket thread:
java.lang.NullPointerException: null
at info.bitrich.xchangestream.service.netty.NettyStreamingService.handleChannelMessage(NettyStreamingService.java:288)
at info.bitrich.xchangestream.service.netty.NettyStreamingService.handleMessage(NettyStreamingService.java:278)
at info.bitrich.xchangestream.gdax.GDAXStreamingService.handleMessage(GDAXStreamingService.java:84)
at info.bitrich.xchangestream.gdax.GDAXStreamingService.handleMessage(GDAXStreamingService.java:21)
at info.bitrich.xchangestream.service.netty.JsonNettyStreamingService.messageHandler(JsonNettyStreamingService.java:42)
at info.bitrich.xchangestream.service.netty.WebSocketClientHandler.channelRead0(WebSocketClientHandler.java:79)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1388)
at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1159)
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1202)
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:489)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:428)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:265)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1414)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:945)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:141)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:745)
I'm trying to make some progress with all this stuff on #191 (a little bit blindly as I'm not familiar with Netty) and am keeping the PR rebased against develop.
In the meantime, in the absence of PR approvals right now, I am producing snapshot builds myself which pull together develop
, all the unmerged PRs on here (which I like and need), XChange 4.3.6 support, updated dependencies, plus my work in progress. You're welcome to use it:
<repositories>
<repository>
<id>xchange-stream-mvn-repo</id>
<url>https://raw.github.com/badgerwithagun/xchange-stream/mvn-repo/</url>
<snapshots>
<enabled>true</enabled>
<updatePolicy>always</updatePolicy>
</snapshots>
</repository>
</repositories>
Just make sure you're running your Maven build with --update-snapshots
.
Test to simulate thread leak:
@Test
public void testSimulateMemoryLeak() throws InterruptedException, IOException {
try {
for (int i = 0 ; i < 5 ; i++) {
StreamingExchange exchange = StreamingExchangeFactory.INSTANCE.createExchange(GDAXStreamingExchange.class.getName());
exchange.connect(ProductSubscription.create().addAll(BTC_USD).build()).blockingAwait();
try {
CountDownLatch latch = new CountDownLatch(3);
Disposable sub = exchange.getStreamingMarketDataService().getOrderBook(BTC_USD).subscribe(o -> latch.countDown());
latch.await();
sub.dispose();
} finally {
exchange.disconnect().blockingAwait();
}
Thread.sleep(2000);
}
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("Stopped. Press a key to terminate VM");
System.in.read();
}
Now, for no apparent reason, it's behaving. Heisenbug.
Try to update netty service dependency
looks like i have fixed problem here: https://github.com/bitrich-info/xchange-stream/pull/225
Not sure if related but I just go my process dying with Too many open files, and it turns out Netty spun up a LOT of threads, at the moment 651 alive of format:
nioEventLoopGroup-4-1
...
nioEventLoopGroup-4208277-1
and clearly a lot more were actually created, about 4 millions judging by the thread names above, all with the same stacktrace:
"nioEventLoopGroup-4600-1" #4144 prio=10 os_prio=0 tid=0x00007f67f01ca000 nid=0x192c0 runnable [0x00007f668d153000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x00000000b26c7a38> (a io.netty.channel.nio.SelectedSelectionKeySet)
- locked <0x00000000b26c7a28> (a java.util.Collections$UnmodifiableSet)
- locked <0x00000000b26c7a50> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at io.netty.channel.nio.SelectedSelectionKeySetSelector.select(SelectedSelectionKeySetSelector.java:62)
at io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:786)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:434)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
and 4150 (!) sockets opened...