xchange-stream icon indicating copy to clipboard operation
xchange-stream copied to clipboard

Exchange doesn't disconnect gracefully

Open jpink opened this issue 7 years ago • 11 comments

When I repeat following steps as in example:

  1. Connect to GDAX exchange.
  2. Subscribe to trades.
  3. Check that there really come trades.
  4. Dispose all my subscriptions.
  5. Disconnect from exchange.
  6. Program main loop goes to its end.

But program doesn't stop! There is still threads running somewhere.

I'm using Spring Boot web app and I'm stopping web server via POST /shutdown request. All other services are then shutdown.

This is not big deal. Currently I call System.exit() 15 s after that request. But this is annoying and smells that there is thread leak somewhere.

jpink avatar Nov 18 '17 20:11 jpink

@jpink Yeah, I can reproduce that and it's bug of exchanges based on NettyStreamingService.

dozd avatar Nov 19 '17 09:11 dozd

Did anyone ever do anything about this? It's the nioEventLoopGroup thread that's still running I think, but I don't know enough about these things. I got around it by assigning the result of new NioEventLoopGroup() in connect() to a class variable and calling eventLoopGroup.shutdownGracefully() on disconnect(), but I'm not sure if that's the most elegant solution

dcarr45 avatar Jan 20 '18 06:01 dcarr45

This one is pretty annoying, sure I can add a System.exit() call so that I'm not chasing down PIDs but it breaks WAR hot deploy as I need to stop tomcat and restart it to pick up code changes, I should be able to just trigger the deploy from rebuilding.

bryantharris avatar Feb 15 '18 22:02 bryantharris

This issue also seems to effect Gemini.

bryantharris avatar Mar 16 '18 00:03 bryantharris

Possibly related - I get an NPE whenever I disconnect from GDAX due to a background socket thread:

java.lang.NullPointerException: null
	at info.bitrich.xchangestream.service.netty.NettyStreamingService.handleChannelMessage(NettyStreamingService.java:288)
	at info.bitrich.xchangestream.service.netty.NettyStreamingService.handleMessage(NettyStreamingService.java:278)
	at info.bitrich.xchangestream.gdax.GDAXStreamingService.handleMessage(GDAXStreamingService.java:84)
	at info.bitrich.xchangestream.gdax.GDAXStreamingService.handleMessage(GDAXStreamingService.java:21)
	at info.bitrich.xchangestream.service.netty.JsonNettyStreamingService.messageHandler(JsonNettyStreamingService.java:42)
	at info.bitrich.xchangestream.service.netty.WebSocketClientHandler.channelRead0(WebSocketClientHandler.java:79)
	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1388)
	at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1159)
	at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1202)
	at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:489)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:428)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:265)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1414)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:945)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:141)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:745)

badgerwithagun avatar May 20 '18 12:05 badgerwithagun

I'm trying to make some progress with all this stuff on #191 (a little bit blindly as I'm not familiar with Netty) and am keeping the PR rebased against develop.

In the meantime, in the absence of PR approvals right now, I am producing snapshot builds myself which pull together develop, all the unmerged PRs on here (which I like and need), XChange 4.3.6 support, updated dependencies, plus my work in progress. You're welcome to use it:

	<repositories>
	    <repository>
	        <id>xchange-stream-mvn-repo</id>
	        <url>https://raw.github.com/badgerwithagun/xchange-stream/mvn-repo/</url>
	        <snapshots>
	            <enabled>true</enabled>
	            <updatePolicy>always</updatePolicy>
	        </snapshots>
	    </repository>
	</repositories>

Just make sure you're running your Maven build with --update-snapshots.

badgerwithagun avatar May 24 '18 08:05 badgerwithagun

Test to simulate thread leak:

  @Test
  public void testSimulateMemoryLeak() throws InterruptedException, IOException {
    try {
      for (int i = 0 ; i < 5 ; i++) {
        StreamingExchange exchange = StreamingExchangeFactory.INSTANCE.createExchange(GDAXStreamingExchange.class.getName());
        exchange.connect(ProductSubscription.create().addAll(BTC_USD).build()).blockingAwait();
        try {
          CountDownLatch latch = new CountDownLatch(3);
          Disposable sub = exchange.getStreamingMarketDataService().getOrderBook(BTC_USD).subscribe(o -> latch.countDown());
          latch.await();
          sub.dispose();
        } finally {
          exchange.disconnect().blockingAwait();
        }
        Thread.sleep(2000);
      }
    } catch (Exception e) {
      e.printStackTrace();
    }
    System.out.println("Stopped. Press a key to terminate VM");
    System.in.read();
  }

image

badgerwithagun avatar May 24 '18 08:05 badgerwithagun

Now, for no apparent reason, it's behaving. Heisenbug. image

badgerwithagun avatar May 24 '18 08:05 badgerwithagun

Try to update netty service dependency <groupId>info.bitrich.xchange-stream</groupId> <artifactId>service-netty</artifactId> 4.3.3-SNAPSHOT

tomislav011 avatar Aug 17 '18 13:08 tomislav011

looks like i have fixed problem here: https://github.com/bitrich-info/xchange-stream/pull/225

pchertalev avatar Oct 02 '18 16:10 pchertalev

Not sure if related but I just go my process dying with Too many open files, and it turns out Netty spun up a LOT of threads, at the moment 651 alive of format:

nioEventLoopGroup-4-1
...
nioEventLoopGroup-4208277-1

and clearly a lot more were actually created, about 4 millions judging by the thread names above, all with the same stacktrace:

"nioEventLoopGroup-4600-1" #4144 prio=10 os_prio=0 tid=0x00007f67f01ca000 nid=0x192c0 runnable [0x00007f668d153000]
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        - locked <0x00000000b26c7a38> (a io.netty.channel.nio.SelectedSelectionKeySet)
        - locked <0x00000000b26c7a28> (a java.util.Collections$UnmodifiableSet)
        - locked <0x00000000b26c7a50> (a sun.nio.ch.EPollSelectorImpl)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at io.netty.channel.nio.SelectedSelectionKeySetSelector.select(SelectedSelectionKeySetSelector.java:62)
        at io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:786)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:434)
        at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)

and 4150 (!) sockets opened...

dpisklov avatar Sep 09 '19 12:09 dpisklov